You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/09/05 06:35:37 UTC
[2/3] HBASE-11072 Abstract WAL splitting from ZK (Sergey Soldatov)
http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 61394c6..46531b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.zookeeper.KeeperException;
/**
* This class abstracts a bunch of operations the HMaster needs to interact with
@@ -91,12 +90,14 @@ public class MasterFileSystem {
private final MasterServices services;
final static PathFilter META_FILTER = new PathFilter() {
+ @Override
public boolean accept(Path p) {
return HLogUtil.isMetaFile(p);
}
};
final static PathFilter NON_META_FILTER = new PathFilter() {
+ @Override
public boolean accept(Path p) {
return !HLogUtil.isMetaFile(p);
}
@@ -123,14 +124,10 @@ public class MasterFileSystem {
// set up the archived logs path
this.oldLogDir = createInitialFileSystemLayout();
HFileSystem.addLocationsOrderInterceptor(conf);
- try {
- this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
- master.getConfiguration(), master, services,
- master.getServerName());
- } catch (KeeperException e) {
- throw new IOException(e);
- }
- this.distributedLogReplay = (this.splitLogManager.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
+ this.splitLogManager =
+ new SplitLogManager(master, master.getConfiguration(), master, services,
+ master.getServerName());
+ this.distributedLogReplay = this.splitLogManager.isLogReplaying();
}
/**
@@ -350,11 +347,7 @@ public class MasterFileSystem {
if (regions == null || regions.isEmpty()) {
return;
}
- try {
- this.splitLogManager.markRegionsRecoveringInZK(serverName, regions);
- } catch (KeeperException e) {
- throw new IOException(e);
- }
+ this.splitLogManager.markRegionsRecovering(serverName, regions);
}
public void splitLog(final Set<ServerName> serverNames) throws IOException {
@@ -362,13 +355,13 @@ public class MasterFileSystem {
}
/**
- * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegionsFromZK(Set)}
+ * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)}
* @param failedServers
- * @throws KeeperException
+ * @throws IOException
*/
void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
- throws KeeperException, InterruptedIOException {
- this.splitLogManager.removeStaleRecoveringRegionsFromZK(failedServers);
+ throws IOException, InterruptedIOException {
+ this.splitLogManager.removeStaleRecoveringRegions(failedServers);
}
/**
@@ -459,7 +452,7 @@ public class MasterFileSystem {
org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
.migrateFSTableDescriptorsIfNecessary(fs, rd);
}
-
+
// Create tableinfo-s for hbase:meta if not already there.
new FSTableDescriptors(fs, rd).createTableDescriptor(HTableDescriptor.META_TABLEDESC);
@@ -650,15 +643,10 @@ public class MasterFileSystem {
/**
* The function is used in SSH to set recovery mode based on configuration after all outstanding
* log split tasks drained.
- * @throws KeeperException
- * @throws InterruptedIOException
+ * @throws IOException
*/
public void setLogRecoveryMode() throws IOException {
- try {
this.splitLogManager.setRecoveryMode(false);
- } catch (KeeperException e) {
- throw new IOException(e);
- }
}
public RecoveryMode getLogRecoveryMode() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index b65b57e..3b59509 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -46,56 +46,42 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
-import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
+import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
/**
* Distributes the task of log splitting to the available region servers.
- * Coordination happens via zookeeper. For every log file that has to be split a
- * znode is created under <code>/hbase/splitlog</code>. SplitLogWorkers race to grab a task.
+ * Coordination happens via coordination engine. For every log file that has to be split a
+ * task is created. SplitLogWorkers race to grab a task.
*
- * <p>SplitLogManager monitors the task znodes that it creates using the
+ * <p>SplitLogManager monitors the tasks that it creates using the
* timeoutMonitor thread. If a task's progress is slow then
- * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner
- * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the
- * task's znode is deleted by SplitLogManager.
+ * {@link SplitLogManagerCoordination#checkTasks} will take away the
+ * task from the owner {@link SplitLogWorker} and the task will be up for grabs again. When the
+ * task is done then it is deleted by SplitLogManager.
*
* <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
* log files. The caller thread waits in this method until all the log files
* have been split.
*
- * <p>All the zookeeper calls made by this class are asynchronous. This is mainly
+ * <p>All the coordination calls made by this class are asynchronous. This is mainly
* to help reduce response time seen by the callers.
*
* <p>There is race in this design between the SplitLogManager and the
@@ -109,30 +95,19 @@ import com.google.common.annotations.VisibleForTesting;
* can delete the re-submission.
*/
@InterfaceAudience.Private
-public class SplitLogManager extends ZooKeeperListener {
+public class SplitLogManager {
private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
- public static final int DEFAULT_TIMEOUT = 120000;
- public static final int DEFAULT_ZK_RETRIES = 3;
- public static final int DEFAULT_MAX_RESUBMIT = 3;
- public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
+ private Server server;
private final Stoppable stopper;
- private final MasterServices master;
- private final ServerName serverName;
- private final TaskFinisher taskFinisher;
private FileSystem fs;
private Configuration conf;
- private long zkretries;
- private long resubmit_threshold;
- private long timeout;
+ public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
+
private long unassignedTimeout;
private long lastTaskCreateTime = Long.MAX_VALUE;
- public boolean ignoreZKDeleteForTesting = false;
- private volatile long lastRecoveringNodeCreationTime = 0;
- // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
- // whether to GC stale recovering znodes
private long checkRecoveringTimeThreshold = 15000; // 15 seconds
private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
.synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
@@ -143,94 +118,45 @@ public class SplitLogManager extends ZooKeeperListener {
*/
protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
- private volatile RecoveryMode recoveryMode;
- private volatile boolean isDrainingDone = false;
-
private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
private TimeoutMonitor timeoutMonitor;
private volatile Set<ServerName> deadWorkers = null;
private final Object deadWorkersLock = new Object();
- private Set<String> failedDeletions = null;
-
- /**
- * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
- * Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)}
- * that provides a task finisher for copying recovered edits to their final destination.
- * The task finisher has to be robust because it can be arbitrarily restarted or called
- * multiple times.
- *
- * @param zkw the ZK watcher
- * @param conf the HBase configuration
- * @param stopper the stoppable in case anything is wrong
- * @param master the master services
- * @param serverName the master server name
- * @throws KeeperException
- * @throws InterruptedIOException
- */
- public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
- Stoppable stopper, MasterServices master, ServerName serverName)
- throws InterruptedIOException, KeeperException {
- this(zkw, conf, stopper, master, serverName, new TaskFinisher() {
- @Override
- public Status finish(ServerName workerName, String logfile) {
- try {
- HLogSplitter.finishSplitLogFile(logfile, conf);
- } catch (IOException e) {
- LOG.warn("Could not finish splitting of log file " + logfile, e);
- return Status.ERR;
- }
- return Status.DONE;
- }
- });
- }
-
/**
* Its OK to construct this object even when region-servers are not online. It does lookup the
- * orphan tasks in zk but it doesn't block waiting for them to be done.
- * @param zkw the ZK watcher
+ * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
+ * @param server the server instance
* @param conf the HBase configuration
* @param stopper the stoppable in case anything is wrong
* @param master the master services
* @param serverName the master server name
- * @param tf task finisher
- * @throws KeeperException
- * @throws InterruptedIOException
+ * @throws IOException
*/
- public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper,
- MasterServices master, ServerName serverName, TaskFinisher tf) throws InterruptedIOException,
- KeeperException {
- super(zkw);
- this.taskFinisher = tf;
+ public SplitLogManager(Server server, Configuration conf, Stoppable stopper,
+ MasterServices master, ServerName serverName) throws IOException {
+ this.server = server;
this.conf = conf;
this.stopper = stopper;
- this.master = master;
- this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
- this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
- this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
+ if (server.getCoordinatedStateManager() != null) {
+ SplitLogManagerCoordination coordination =
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination();
+ Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
+ SplitLogManagerDetails details =
+ new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
+ coordination.init();
+ coordination.setDetails(details);
+ // Determine recovery mode
+ }
this.unassignedTimeout =
- conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
-
- // Determine recovery mode
- setRecoveryMode(true);
-
- LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
- ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY));
-
- this.serverName = serverName;
- this.timeoutMonitor = new TimeoutMonitor(
- conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
-
- this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
-
+ conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
+ this.timeoutMonitor =
+ new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
+ stopper);
Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
- + ".splitLogManagerTimeoutMonitor");
- // Watcher can be null during tests with Mock'd servers.
- if (this.watcher != null) {
- this.watcher.registerListener(this);
- lookForOrphans();
- }
+ + ".splitLogManagerTimeoutMonitor");
}
private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
@@ -254,10 +180,8 @@ public class SplitLogManager extends ZooKeeperListener {
}
/**
- * @param logDir
- * one region sever hlog dir path in .logs
- * @throws IOException
- * if there was an error while splitting any log file
+ * @param logDir one region sever hlog dir path in .logs
+ * @throws IOException if there was an error while splitting any log file
* @return cumulative size of the logfiles split
* @throws IOException
*/
@@ -268,11 +192,9 @@ public class SplitLogManager extends ZooKeeperListener {
}
/**
- * The caller will block until all the log files of the given region server
- * have been processed - successfully split or an error is encountered - by an
- * available worker region server. This method must only be called after the
- * region servers have been brought online.
- *
+ * The caller will block until all the log files of the given region server have been processed -
+ * successfully split or an error is encountered - by an available worker region server. This
+ * method must only be called after the region servers have been brought online.
* @param logDirs List of log dirs to split
* @throws IOException If there was an error while splitting any log file
* @return cumulative size of the logfiles split
@@ -297,11 +219,9 @@ public class SplitLogManager extends ZooKeeperListener {
}
/**
- * The caller will block until all the hbase:meta log files of the given region server
- * have been processed - successfully split or an error is encountered - by an
- * available worker region server. This method must only be called after the
- * region servers have been brought online.
- *
+ * The caller will block until all the hbase:meta log files of the given region server have been
+ * processed - successfully split or an error is encountered - by an available worker region
+ * server. This method must only be called after the region servers have been brought online.
* @param logDirs List of log dirs to split
* @param filter the Path filter to select specific files for considering
* @throws IOException If there was an error while splitting any log file
@@ -309,8 +229,8 @@ public class SplitLogManager extends ZooKeeperListener {
*/
public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
PathFilter filter) throws IOException {
- MonitoredTask status = TaskMonitor.get().createStatus(
- "Doing distributed log split in " + logDirs);
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Doing distributed log split in " + logDirs);
FileStatus[] logfiles = getFileList(logDirs, filter);
status.setStatus("Checking directory contents...");
LOG.debug("Scheduling batch of logs to split");
@@ -333,25 +253,24 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
waitForSplittingCompletion(batch, status);
- // remove recovering regions from ZK
+ // remove recovering regions
if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
// we split meta regions and user regions separately therefore logfiles are either all for
// meta or user regions but won't for both( we could have mixed situations in tests)
isMetaRecovery = true;
}
- this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
+ removeRecoveringRegions(serverNames, isMetaRecovery);
if (batch.done != batch.installed) {
batch.isDead = true;
SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
- LOG.warn("error while splitting logs in " + logDirs +
- " installed = " + batch.installed + " but only " + batch.done + " done");
- String msg = "error or interrupted while splitting logs in "
- + logDirs + " Task = " + batch;
+ LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
+ + " but only " + batch.done + " done");
+ String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
status.abort(msg);
throw new IOException(msg);
}
- for(Path logDir: logDirs){
+ for (Path logDir : logDirs) {
status.setStatus("Cleaning up log directory...");
try {
if (fs.exists(logDir) && !fs.delete(logDir, false)) {
@@ -360,39 +279,39 @@ public class SplitLogManager extends ZooKeeperListener {
} catch (IOException ioe) {
FileStatus[] files = fs.listStatus(logDir);
if (files != null && files.length > 0) {
- LOG.warn("returning success without actually splitting and " +
- "deleting all the log files in path " + logDir);
+ LOG.warn("returning success without actually splitting and "
+ + "deleting all the log files in path " + logDir);
} else {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
}
}
SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
}
- String msg = "finished splitting (more than or equal to) " + totalSize +
- " bytes in " + batch.installed + " log files in " + logDirs + " in " +
- (EnvironmentEdgeManager.currentTime() - t) + "ms";
+ String msg =
+ "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
+ + " log files in " + logDirs + " in "
+ + (EnvironmentEdgeManager.currentTime() - t) + "ms";
status.markComplete(msg);
LOG.info(msg);
return totalSize;
}
/**
- * Add a task entry to splitlog znode if it is not already there.
- *
+ * Add a task entry to coordination if it is not already there.
* @param taskname the path of the log to be split
* @param batch the batch this task belongs to
* @return true if a new entry is created, false if it is already there.
*/
boolean enqueueSplitTask(String taskname, TaskBatch batch) {
- SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
- // This is a znode path under the splitlog dir with the rest of the path made up of an
- // url encoding of the passed in log to split.
- String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
- Task oldtask = createTaskIfAbsent(path, batch);
+ String task =
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().prepareTask(taskname);
+ Task oldtask = createTaskIfAbsent(task, batch);
if (oldtask == null) {
- // publish the task in zk
- createNode(path, zkretries);
+ // publish the task in the coordination engine
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().submitTask(task);
return true;
}
return false;
@@ -402,26 +321,25 @@ public class SplitLogManager extends ZooKeeperListener {
synchronized (batch) {
while ((batch.done + batch.error) != batch.installed) {
try {
- status.setStatus("Waiting for distributed tasks to finish. "
- + " scheduled=" + batch.installed
- + " done=" + batch.done
- + " error=" + batch.error);
+ status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
+ + batch.installed + " done=" + batch.done + " error=" + batch.error);
int remaining = batch.installed - (batch.done + batch.error);
int actual = activeTasks(batch);
if (remaining != actual) {
- LOG.warn("Expected " + remaining
- + " active tasks, but actually there are " + actual);
+ LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
}
- int remainingInZK = remainingTasksInZK();
- if (remainingInZK >= 0 && actual > remainingInZK) {
- LOG.warn("Expected at least" + actual
- + " tasks in ZK, but actually there are " + remainingInZK);
+ int remainingTasks =
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().remainingTasksInCoordination();
+ if (remainingTasks >= 0 && actual > remainingTasks) {
+ LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
+ + remainingTasks);
}
- if (remainingInZK == 0 || actual == 0) {
- LOG.warn("No more task remaining (ZK or task map), splitting "
- + "should have completed. Remaining tasks in ZK " + remainingInZK
- + ", active tasks in map " + actual);
- if (remainingInZK == 0 && actual == 0) {
+ if (remainingTasks == 0 || actual == 0) {
+ LOG.warn("No more task remaining, splitting "
+ + "should have completed. Remaining tasks is " + remainingTasks
+ + ", active tasks in map " + actual);
+ if (remainingTasks == 0 && actual == 0) {
return;
}
}
@@ -446,31 +364,13 @@ public class SplitLogManager extends ZooKeeperListener {
private int activeTasks(final TaskBatch batch) {
int count = 0;
- for (Task t: tasks.values()) {
+ for (Task t : tasks.values()) {
if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
count++;
}
}
return count;
- }
- private int remainingTasksInZK() {
- int count = 0;
- try {
- List<String> tasks =
- ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
- if (tasks != null) {
- for (String t: tasks) {
- if (!ZKSplitLog.isRescanNode(watcher, t)) {
- count++;
- }
- }
- }
- } catch (KeeperException ke) {
- LOG.warn("Failed to check remaining tasks", ke);
- count = -1;
- }
- return count;
}
/**
@@ -480,15 +380,12 @@ public class SplitLogManager extends ZooKeeperListener {
* @param isMetaRecovery whether current recovery is for the meta region on
* <code>serverNames<code>
*/
- private void
- removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
- if (this.recoveryMode != RecoveryMode.LOG_REPLAY) {
+ private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
+ if (!isLogReplaying()) {
// the function is only used in WALEdit direct replay mode
return;
}
- final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
- int count = 0;
Set<String> recoveredServerNameSet = new HashSet<String>();
if (serverNames != null) {
for (ServerName tmpServerName : serverNames) {
@@ -498,56 +395,11 @@ public class SplitLogManager extends ZooKeeperListener {
try {
this.recoveringRegionLock.lock();
-
- List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
- if (tasks != null) {
- for (String t : tasks) {
- if (!ZKSplitLog.isRescanNode(watcher, t)) {
- count++;
- }
- }
- }
- if (count == 0 && this.master.isInitialized()
- && !this.master.getServerManager().areDeadServersInProgress()) {
- // no splitting work items left
- deleteRecoveringRegionZNodes(watcher, null);
- // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
- // this point.
- lastRecoveringNodeCreationTime = Long.MAX_VALUE;
- } else if (!recoveredServerNameSet.isEmpty()) {
- // remove recovering regions which doesn't have any RS associated with it
- List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
- if (regions != null) {
- for (String region : regions) {
- if(isMetaRecovery != null) {
- if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
- || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
- // skip non-meta regions when recovering the meta region or
- // skip the meta region when recovering user regions
- continue;
- }
- }
- String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
- List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
- if (failedServers == null || failedServers.isEmpty()) {
- ZKUtil.deleteNode(watcher, nodePath);
- continue;
- }
- if (recoveredServerNameSet.containsAll(failedServers)) {
- ZKUtil.deleteNodeRecursively(watcher, nodePath);
- } else {
- for (String failedServer : failedServers) {
- if (recoveredServerNameSet.contains(failedServer)) {
- String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
- ZKUtil.deleteNode(watcher, tmpPath);
- }
- }
- }
- }
- }
- }
- } catch (KeeperException ke) {
- LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
+ isMetaRecovery);
+ } catch (IOException e) {
+ LOG.warn("removeRecoveringRegions got exception. Will retry", e);
if (serverNames != null && !serverNames.isEmpty()) {
this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
isMetaRecovery));
@@ -561,11 +413,10 @@ public class SplitLogManager extends ZooKeeperListener {
* It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
* during master initialization phase.
* @param failedServers A set of known failed servers
- * @throws KeeperException
+ * @throws IOException
*/
- void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
- throws KeeperException, InterruptedIOException {
-
+ void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException,
+ InterruptedIOException {
Set<String> knownFailedServers = new HashSet<String>();
if (failedServers != null) {
for (ServerName tmpServerName : failedServers) {
@@ -575,406 +426,13 @@ public class SplitLogManager extends ZooKeeperListener {
this.recoveringRegionLock.lock();
try {
- List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
- if (tasks != null) {
- for (String t : tasks) {
- byte[] data;
- try {
- data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- if (data != null) {
- SplitLogTask slt = null;
- try {
- slt = SplitLogTask.parseFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed parse data for znode " + t, e);
- }
- if (slt != null && slt.isDone()) {
- continue;
- }
- }
- // decode the file name
- t = ZKSplitLog.getFileName(t);
- ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
- if (serverName != null) {
- knownFailedServers.add(serverName.getServerName());
- } else {
- LOG.warn("Found invalid WAL log file name:" + t);
- }
- }
- }
-
- // remove recovering regions which doesn't have any RS associated with it
- List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
- if (regions != null) {
- for (String region : regions) {
- String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
- List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
- if (regionFailedServers == null || regionFailedServers.isEmpty()) {
- ZKUtil.deleteNode(watcher, nodePath);
- continue;
- }
- boolean needMoreRecovery = false;
- for (String tmpFailedServer : regionFailedServers) {
- if (knownFailedServers.contains(tmpFailedServer)) {
- needMoreRecovery = true;
- break;
- }
- }
- if (!needMoreRecovery) {
- ZKUtil.deleteNodeRecursively(watcher, nodePath);
- }
- }
- }
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
} finally {
this.recoveringRegionLock.unlock();
}
}
- public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
- try {
- if (regions == null) {
- // remove all children under /home/recovering-regions
- LOG.debug("Garbage collecting all recovering region znodes");
- ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
- } else {
- for (String curRegion : regions) {
- String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
- ZKUtil.deleteNodeRecursively(watcher, nodePath);
- }
- }
- } catch (KeeperException e) {
- LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
- }
- }
-
- private void setDone(String path, TerminationStatus status) {
- Task task = tasks.get(path);
- if (task == null) {
- if (!ZKSplitLog.isRescanNode(watcher, path)) {
- SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
- LOG.debug("unacquired orphan task is done " + path);
- }
- } else {
- synchronized (task) {
- if (task.status == IN_PROGRESS) {
- if (status == SUCCESS) {
- SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
- LOG.info("Done splitting " + path);
- } else {
- SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
- LOG.warn("Error splitting " + path);
- }
- task.status = status;
- if (task.batch != null) {
- synchronized (task.batch) {
- if (status == SUCCESS) {
- task.batch.done++;
- } else {
- task.batch.error++;
- }
- task.batch.notify();
- }
- }
- }
- }
- }
- // delete the task node in zk. It's an async
- // call and no one is blocked waiting for this node to be deleted. All
- // task names are unique (log.<timestamp>) there is no risk of deleting
- // a future task.
- // if a deletion fails, TimeoutMonitor will retry the same deletion later
- deleteNode(path, zkretries);
- return;
- }
-
- private void createNode(String path, Long retry_count) {
- SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode);
- ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
- SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
- return;
- }
-
- private void createNodeSuccess(String path) {
- LOG.debug("put up splitlog task at znode " + path);
- getDataSetWatch(path, zkretries);
- }
-
- private void createNodeFailure(String path) {
- // TODO the Manager should split the log locally instead of giving up
- LOG.warn("failed to create task node" + path);
- setDone(path, FAILURE);
- }
-
-
- private void getDataSetWatch(String path, Long retry_count) {
- this.watcher.getRecoverableZooKeeper().getZooKeeper().
- getData(path, this.watcher,
- new GetDataAsyncCallback(), retry_count);
- SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
- }
-
- private void tryGetDataSetWatch(String path) {
- // A negative retry count will lead to ignoring all error processing.
- this.watcher.getRecoverableZooKeeper().getZooKeeper().
- getData(path, this.watcher,
- new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */);
- SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
- }
-
- private void getDataSetWatchSuccess(String path, byte[] data, int version)
- throws DeserializationException {
- if (data == null) {
- if (version == Integer.MIN_VALUE) {
- // assume all done. The task znode suddenly disappeared.
- setDone(path, SUCCESS);
- return;
- }
- SplitLogCounters.tot_mgr_null_data.incrementAndGet();
- LOG.fatal("logic error - got null data " + path);
- setDone(path, FAILURE);
- return;
- }
- data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
- SplitLogTask slt = SplitLogTask.parseFrom(data);
- if (slt.isUnassigned()) {
- LOG.debug("task not yet acquired " + path + " ver = " + version);
- handleUnassignedTask(path);
- } else if (slt.isOwned()) {
- heartbeat(path, version, slt.getServerName());
- } else if (slt.isResigned()) {
- LOG.info("task " + path + " entered state: " + slt.toString());
- resubmitOrFail(path, FORCE);
- } else if (slt.isDone()) {
- LOG.info("task " + path + " entered state: " + slt.toString());
- if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
- if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
- setDone(path, SUCCESS);
- } else {
- resubmitOrFail(path, CHECK);
- }
- } else {
- setDone(path, SUCCESS);
- }
- } else if (slt.isErr()) {
- LOG.info("task " + path + " entered state: " + slt.toString());
- resubmitOrFail(path, CHECK);
- } else {
- LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
- setDone(path, FAILURE);
- }
- }
-
- private void getDataSetWatchFailure(String path) {
- LOG.warn("failed to set data watch " + path);
- setDone(path, FAILURE);
- }
-
- /**
- * It is possible for a task to stay in UNASSIGNED state indefinitely - say
- * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED
- * state but it dies before it could create the RESCAN task node to signal
- * the SplitLogWorkers to pick up the task. To prevent this scenario the
- * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
- *
- * @param path
- */
- private void handleUnassignedTask(String path) {
- if (ZKSplitLog.isRescanNode(watcher, path)) {
- return;
- }
- Task task = findOrCreateOrphanTask(path);
- if (task.isOrphan() && (task.incarnation == 0)) {
- LOG.info("resubmitting unassigned orphan task " + path);
- // ignore failure to resubmit. The timeout-monitor will handle it later
- // albeit in a more crude fashion
- resubmit(path, task, FORCE);
- }
- }
-
- /**
- * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
- * @param statusCode integer value of a ZooKeeper exception code
- * @param action description message about the retried action
- * @return true when need to abandon retries otherwise false
- */
- private boolean needAbandonRetries(int statusCode, String action) {
- if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
- LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
- + "action=" + action);
- return true;
- }
- return false;
- }
-
- private void heartbeat(String path, int new_version, ServerName workerName) {
- Task task = findOrCreateOrphanTask(path);
- if (new_version != task.last_version) {
- if (task.isUnassigned()) {
- LOG.info("task " + path + " acquired by " + workerName);
- }
- task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
- SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
- } else {
- // duplicate heartbeats - heartbeats w/o zk node version
- // changing - are possible. The timeout thread does
- // getDataSetWatch() just to check whether a node still
- // exists or not
- }
- return;
- }
-
- private boolean resubmit(String path, Task task, ResubmitDirective directive) {
- // its ok if this thread misses the update to task.deleted. It will fail later
- if (task.status != IN_PROGRESS) {
- return false;
- }
- int version;
- if (directive != FORCE) {
- // We're going to resubmit:
- // 1) immediately if the worker server is now marked as dead
- // 2) after a configurable timeout if the server is not marked as dead but has still not
- // finished the task. This allows to continue if the worker cannot actually handle it,
- // for any reason.
- final long time = EnvironmentEdgeManager.currentTime() - task.last_update;
- final boolean alive = master.getServerManager() != null ?
- master.getServerManager().isServerOnline(task.cur_worker_name) : true;
- if (alive && time < timeout) {
- LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " +
- task.cur_worker_name + " is not marked as dead, we waited for " + time +
- " while the timeout is " + timeout);
- return false;
- }
- if (task.unforcedResubmits.get() >= resubmit_threshold) {
- if (!task.resubmitThresholdReached) {
- task.resubmitThresholdReached = true;
- SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
- LOG.info("Skipping resubmissions of task " + path +
- " because threshold " + resubmit_threshold + " reached");
- }
- return false;
- }
- // race with heartbeat() that might be changing last_version
- version = task.last_version;
- } else {
- SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
- version = -1;
- }
- LOG.info("resubmitting task " + path);
- task.incarnation++;
- try {
- // blocking zk call but this is done from the timeout thread
- SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode);
- if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
- LOG.debug("failed to resubmit task " + path +
- " version changed");
- task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
- return false;
- }
- } catch (NoNodeException e) {
- LOG.warn("failed to resubmit because znode doesn't exist " + path +
- " task done (or forced done by removing the znode)");
- try {
- getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
- } catch (DeserializationException e1) {
- LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
- task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
- return false;
- }
- return false;
- } catch (KeeperException.BadVersionException e) {
- LOG.debug("failed to resubmit task " + path + " version changed");
- task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
- return false;
- } catch (KeeperException e) {
- SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
- LOG.warn("failed to resubmit " + path, e);
- return false;
- }
- // don't count forced resubmits
- if (directive != FORCE) {
- task.unforcedResubmits.incrementAndGet();
- }
- task.setUnassigned();
- createRescanNode(Long.MAX_VALUE);
- SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
- return true;
- }
-
- private void resubmitOrFail(String path, ResubmitDirective directive) {
- if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
- setDone(path, FAILURE);
- }
- }
-
- private void deleteNode(String path, Long retries) {
- SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
- // Once a task znode is ready for delete, that is it is in the TASK_DONE
- // state, then no one should be writing to it anymore. That is no one
- // will be updating the znode version any more.
- this.watcher.getRecoverableZooKeeper().getZooKeeper().
- delete(path, -1, new DeleteAsyncCallback(),
- retries);
- }
-
- private void deleteNodeSuccess(String path) {
- if (ignoreZKDeleteForTesting) {
- return;
- }
- Task task;
- task = tasks.remove(path);
- if (task == null) {
- if (ZKSplitLog.isRescanNode(watcher, path)) {
- SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
- }
- SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
- LOG.debug("deleted task without in memory state " + path);
- return;
- }
- synchronized (task) {
- task.status = DELETED;
- task.notify();
- }
- SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
- }
-
- private void deleteNodeFailure(String path) {
- LOG.info("Failed to delete node " + path + " and will retry soon.");
- return;
- }
-
- /**
- * signal the workers that a task was resubmitted by creating the
- * RESCAN node.
- * @throws KeeperException
- */
- private void createRescanNode(long retries) {
- // The RESCAN node will be deleted almost immediately by the
- // SplitLogManager as soon as it is created because it is being
- // created in the DONE state. This behavior prevents a buildup
- // of RESCAN nodes. But there is also a chance that a SplitLogWorker
- // might miss the watch-trigger that creation of RESCAN node provides.
- // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
- // therefore this behavior is safe.
- lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
- SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode);
- this.watcher.getRecoverableZooKeeper().getZooKeeper().
- create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
- new CreateRescanAsyncCallback(), Long.valueOf(retries));
- }
-
- private void createRescanSuccess(String path) {
- SplitLogCounters.tot_mgr_rescan.incrementAndGet();
- getDataSetWatch(path, zkretries);
- }
-
- private void createRescanFailure() {
- LOG.fatal("logic failure, rescan failure must not happen");
- }
-
/**
* @param path
* @param batch
@@ -989,7 +447,7 @@ public class SplitLogManager extends ZooKeeperListener {
oldtask = tasks.putIfAbsent(path, newtask);
if (oldtask == null) {
batch.installed++;
- return null;
+ return null;
}
// new task was not used.
synchronized (oldtask) {
@@ -1020,16 +478,15 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
if (oldtask.status != DELETED) {
- LOG.warn("Failure because previously failed task" +
- " state still present. Waiting for znode delete callback" +
- " path=" + path);
+ LOG.warn("Failure because previously failed task"
+ + " state still present. Waiting for znode delete callback" + " path=" + path);
return oldtask;
}
// reinsert the newTask and it must succeed this time
Task t = tasks.putIfAbsent(path, newtask);
if (t == null) {
batch.installed++;
- return null;
+ return null;
}
LOG.fatal("Logic error. Deleted task still present in tasks map");
assert false : "Deleted task still present in tasks map";
@@ -1052,308 +509,86 @@ public class SplitLogManager extends ZooKeeperListener {
return task;
}
- @Override
- public void nodeDataChanged(String path) {
- Task task;
- task = tasks.get(path);
- if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
- if (task != null) {
- task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
- }
- getDataSetWatch(path, zkretries);
- }
- }
-
public void stop() {
if (timeoutMonitor != null) {
timeoutMonitor.interrupt();
}
}
- private void lookForOrphans() {
- List<String> orphans;
- try {
- orphans = ZKUtil.listChildrenNoWatch(this.watcher,
- this.watcher.splitLogZNode);
- if (orphans == null) {
- LOG.warn("could not get children of " + this.watcher.splitLogZNode);
- return;
+ void handleDeadWorker(ServerName workerName) {
+ // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
+ // to reason about concurrency. Makes it easier to retry.
+ synchronized (deadWorkersLock) {
+ if (deadWorkers == null) {
+ deadWorkers = new HashSet<ServerName>(100);
}
- } catch (KeeperException e) {
- LOG.warn("could not get children of " + this.watcher.splitLogZNode +
- " " + StringUtils.stringifyException(e));
- return;
+ deadWorkers.add(workerName);
}
- int rescan_nodes = 0;
- for (String path : orphans) {
- String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
- if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
- rescan_nodes++;
- LOG.debug("found orphan rescan node " + path);
- } else {
- LOG.info("found orphan task " + path);
+ LOG.info("dead splitlog worker " + workerName);
+ }
+
+ void handleDeadWorkers(Set<ServerName> serverNames) {
+ synchronized (deadWorkersLock) {
+ if (deadWorkers == null) {
+ deadWorkers = new HashSet<ServerName>(100);
}
- getDataSetWatch(nodepath, zkretries);
+ deadWorkers.addAll(serverNames);
}
- LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
- rescan_nodes + " rescan nodes");
+ LOG.info("dead splitlog workers " + serverNames);
}
/**
- * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
- * all regions of the passed in region servers
- * @param serverName the name of a region server
- * @param userRegions user regiones assigned on the region server
+ * This function is to set recovery mode from outstanding split log tasks from before or current
+ * configuration setting
+ * @param isForInitialization
+ * @throws IOException throws if it's impossible to set recovery mode
*/
- void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
- throws KeeperException, InterruptedIOException {
- if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) {
+ public void setRecoveryMode(boolean isForInitialization) throws IOException {
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
+
+ }
+
+ public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
+ throws InterruptedIOException, IOException {
+ if (userRegions == null || (!isLogReplaying())) {
return;
}
-
try {
this.recoveringRegionLock.lock();
- // mark that we're creating recovering znodes
- this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTime();
-
- for (HRegionInfo region : userRegions) {
- String regionEncodeName = region.getEncodedName();
- long retries = this.zkretries;
-
- do {
- String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
- long lastRecordedFlushedSequenceId = -1;
- try {
- long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
- regionEncodeName.getBytes());
-
- /*
- * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
- * flushed sequence id for the server]
- */
- byte[] data = ZKUtil.getData(this.watcher, nodePath);
- if (data == null) {
- ZKUtil.createSetData(this.watcher, nodePath,
- ZKUtil.positionToByteArray(lastSequenceId));
- } else {
- lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
- if (lastRecordedFlushedSequenceId < lastSequenceId) {
- // update last flushed sequence id in the region level
- ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
- }
- }
- // go one level deeper with server name
- nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
- if (lastSequenceId <= lastRecordedFlushedSequenceId) {
- // the newly assigned RS failed even before any flush to the region
- lastSequenceId = lastRecordedFlushedSequenceId;
- }
- ZKUtil.createSetData(this.watcher, nodePath,
- ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
- LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
- + serverName);
-
- // break retry loop
- break;
- } catch (KeeperException e) {
- // ignore ZooKeeper exceptions inside retry loop
- if (retries <= 1) {
- throw e;
- }
- // wait a little bit for retry
- try {
- Thread.sleep(20);
- } catch (InterruptedException e1) {
- throw new InterruptedIOException();
- }
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- } while ((--retries) > 0 && (!this.stopper.isStopped()));
- }
+ // mark that we're creating recovering regions
+ ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
} finally {
this.recoveringRegionLock.unlock();
}
- }
- /**
- * @param bytes - Content of a failed region server or recovering region znode.
- * @return long - The last flushed sequence Id for the region server
- */
- public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
- long lastRecordedFlushedSequenceId = -1l;
- try {
- lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
- } catch (DeserializationException e) {
- lastRecordedFlushedSequenceId = -1l;
- LOG.warn("Can't parse last flushed sequence Id", e);
- }
- return lastRecordedFlushedSequenceId;
}
/**
- * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
- * and set watcher as well.
- * @param zkw
- * @param regionEncodedName region encode name
- * @return true when /hbase/recovering-regions/<current region encoded name> exists
- * @throws KeeperException
+ * @return whether log is replaying
*/
- public static boolean
- isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
- throws KeeperException {
- boolean result = false;
- String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
-
- byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
- if (node != null) {
- result = true;
- }
- return result;
+ public boolean isLogReplaying() {
+ if (server.getCoordinatedStateManager() == null) return false;
+ return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().isReplaying();
}
/**
- * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
- * @param zkw
- * @param serverName
- * @param encodedRegionName
- * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
- * @throws IOException
+ * @return whether log is splitting
*/
- public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
- String serverName, String encodedRegionName) throws IOException {
- // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
- // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
- // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
- // sequence Id name space (sequence Id only valid for a particular RS instance), changes
- // when different newly assigned RS flushes the region.
- // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
- // last flushed sequence Id for each failed RS instance.
- RegionStoreSequenceIds result = null;
- String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
- nodePath = ZKUtil.joinZNode(nodePath, serverName);
- try {
- byte[] data;
- try {
- data = ZKUtil.getData(zkw, nodePath);
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- if (data != null) {
- result = ZKUtil.parseRegionStoreSequenceIds(data);
- }
- } catch (KeeperException e) {
- throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
- + serverName + "; region=" + encodedRegionName, e);
- } catch (DeserializationException e) {
- LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
- }
- return result;
+ public boolean isLogSplitting() {
+ if (server.getCoordinatedStateManager() == null) return false;
+ return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().isSplitting();
}
/**
- * This function is to set recovery mode from outstanding split log tasks from before or
- * current configuration setting
- * @param isForInitialization
- * @throws KeeperException
- * @throws InterruptedIOException
+ * @return the current log recovery mode
*/
- public void setRecoveryMode(boolean isForInitialization) throws KeeperException,
- InterruptedIOException {
- if(this.isDrainingDone) {
- // when there is no outstanding splitlogtask after master start up, we already have up to date
- // recovery mode
- return;
- }
- if(this.watcher == null) {
- // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
- this.isDrainingDone = true;
- this.recoveryMode = RecoveryMode.LOG_SPLITTING;
- return;
- }
- boolean hasSplitLogTask = false;
- boolean hasRecoveringRegions = false;
- RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
- RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ?
- RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
-
- // Firstly check if there are outstanding recovering regions
- List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
- if (regions != null && !regions.isEmpty()) {
- hasRecoveringRegions = true;
- previousRecoveryMode = RecoveryMode.LOG_REPLAY;
- }
- if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
- // Secondly check if there are outstanding split log task
- List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
- if (tasks != null && !tasks.isEmpty()) {
- hasSplitLogTask = true;
- if (isForInitialization) {
- // during initialization, try to get recovery mode from splitlogtask
- for (String task : tasks) {
- try {
- byte[] data = ZKUtil.getData(this.watcher,
- ZKUtil.joinZNode(watcher.splitLogZNode, task));
- if (data == null) continue;
- SplitLogTask slt = SplitLogTask.parseFrom(data);
- previousRecoveryMode = slt.getMode();
- if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
- // created by old code base where we don't set recovery mode in splitlogtask
- // we can safely set to LOG_SPLITTING because we're in master initialization code
- // before SSH is enabled & there is no outstanding recovering regions
- previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
- }
- break;
- } catch (DeserializationException e) {
- LOG.warn("Failed parse data for znode " + task, e);
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- }
- }
- }
- }
-
- synchronized(this) {
- if(this.isDrainingDone) {
- return;
- }
- if (!hasSplitLogTask && !hasRecoveringRegions) {
- this.isDrainingDone = true;
- this.recoveryMode = recoveryModeInConfig;
- return;
- } else if (!isForInitialization) {
- // splitlogtask hasn't drained yet, keep existing recovery mode
- return;
- }
-
- if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
- this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
- this.recoveryMode = previousRecoveryMode;
- } else {
- this.recoveryMode = recoveryModeInConfig;
- }
- }
- }
-
public RecoveryMode getRecoveryMode() {
- return this.recoveryMode;
- }
-
- /**
- * Returns if distributed log replay is turned on or not
- * @param conf
- * @return true when distributed log replay is turned on
- */
- private boolean isDistributedLogReplay(Configuration conf) {
- boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
- HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
- int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
- }
- // For distributed log replay, hfile version must be 3 at least; we need tag support.
- return dlr && (version >= 3);
+ return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().getRecoveryMode();
}
/**
@@ -1362,11 +597,12 @@ public class SplitLogManager extends ZooKeeperListener {
* <p>
* All access is synchronized.
*/
- static class TaskBatch {
- int installed = 0;
- int done = 0;
- int error = 0;
- volatile boolean isDead = false;
+ @InterfaceAudience.Private
+ public static class TaskBatch {
+ public int installed = 0;
+ public int done = 0;
+ public int error = 0;
+ public volatile boolean isDead = false;
@Override
public String toString() {
@@ -1377,28 +613,25 @@ public class SplitLogManager extends ZooKeeperListener {
/**
* in memory state of an active task.
*/
- static class Task {
- volatile long last_update;
- volatile int last_version;
- volatile ServerName cur_worker_name;
- volatile TaskBatch batch;
- volatile TerminationStatus status;
- volatile int incarnation;
- final AtomicInteger unforcedResubmits = new AtomicInteger();
- volatile boolean resubmitThresholdReached;
+ @InterfaceAudience.Private
+ public static class Task {
+ public volatile long last_update;
+ public volatile int last_version;
+ public volatile ServerName cur_worker_name;
+ public volatile TaskBatch batch;
+ public volatile TerminationStatus status;
+ public volatile int incarnation;
+ public final AtomicInteger unforcedResubmits = new AtomicInteger();
+ public volatile boolean resubmitThresholdReached;
@Override
public String toString() {
- return ("last_update = " + last_update +
- " last_version = " + last_version +
- " cur_worker_name = " + cur_worker_name +
- " status = " + status +
- " incarnation = " + incarnation +
- " resubmits = " + unforcedResubmits.get() +
- " batch = " + batch);
+ return ("last_update = " + last_update + " last_version = " + last_version
+ + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
+ + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
}
- Task() {
+ public Task() {
incarnation = 0;
last_version = -1;
status = IN_PROGRESS;
@@ -1429,31 +662,8 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
- void handleDeadWorker(ServerName workerName) {
- // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
- // to reason about concurrency. Makes it easier to retry.
- synchronized (deadWorkersLock) {
- if (deadWorkers == null) {
- deadWorkers = new HashSet<ServerName>(100);
- }
- deadWorkers.add(workerName);
- }
- LOG.info("dead splitlog worker " + workerName);
- }
-
- void handleDeadWorkers(Set<ServerName> serverNames) {
- synchronized (deadWorkersLock) {
- if (deadWorkers == null) {
- deadWorkers = new HashSet<ServerName>(100);
- }
- deadWorkers.addAll(serverNames);
- }
- LOG.info("dead splitlog workers " + serverNames);
- }
-
/**
- * Periodically checks all active tasks and resubmits the ones that have timed
- * out
+ * Periodically checks all active tasks and resubmits the ones that have timed out
*/
private class TimeoutMonitor extends Chore {
private long lastLog = 0;
@@ -1492,14 +702,16 @@ public class SplitLogManager extends ZooKeeperListener {
found_assigned_task = true;
if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
- if (resubmit(path, task, FORCE)) {
+ if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
resubmitted++;
} else {
handleDeadWorker(cur_worker);
- LOG.warn("Failed to resubmit task " + path + " owned by dead " +
- cur_worker + ", will retry.");
+ LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
+ + ", will retry.");
}
- } else if (resubmit(path, task, CHECK)) {
+ } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
resubmitted++;
}
}
@@ -1522,39 +734,46 @@ public class SplitLogManager extends ZooKeeperListener {
// manager will be indefinitely creating RESCAN nodes. TODO may be the
// master should spawn both a manager and a worker thread to guarantee
// that there is always one worker in the system
- if (tot > 0 && !found_assigned_task &&
- ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) >
- unassignedTimeout)) {
+ if (tot > 0
+ && !found_assigned_task
+ && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
for (Map.Entry<String, Task> e : tasks.entrySet()) {
- String path = e.getKey();
+ String key = e.getKey();
Task task = e.getValue();
// we have to do task.isUnassigned() check again because tasks might
// have been asynchronously assigned. There is no locking required
// for these checks ... it is OK even if tryGetDataSetWatch() is
- // called unnecessarily for a task
+ // called unnecessarily for a taskpath
if (task.isUnassigned() && (task.status != FAILURE)) {
// We just touch the znode to make sure its still there
- tryGetDataSetWatch(path);
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().checkTaskStillAvailable(key);
}
}
- createRescanNode(Long.MAX_VALUE);
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().checkTasks();
SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
LOG.debug("resubmitting unassigned task(s) after timeout");
}
-
+ Set<String> failedDeletions =
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().getDetails().getFailedDeletions();
// Retry previously failed deletes
if (failedDeletions.size() > 0) {
List<String> tmpPaths = new ArrayList<String>(failedDeletions);
for (String tmpPath : tmpPaths) {
// deleteNode is an async call
- deleteNode(tmpPath, zkretries);
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().deleteTask(tmpPath);
}
failedDeletions.removeAll(tmpPaths);
}
- // Garbage collect left-over /hbase/recovering-regions/... znode
- long timeInterval = EnvironmentEdgeManager.currentTime()
- - lastRecoveringNodeCreationTime;
+ // Garbage collect left-over
+ long timeInterval =
+ EnvironmentEdgeManager.currentTime()
+ - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getSplitLogManagerCoordination().getLastRecoveryTime();
if (!failedRecoveringRegionDeletions.isEmpty()
|| (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
// inside the function there have more checks before GC anything
@@ -1563,223 +782,24 @@ public class SplitLogManager extends ZooKeeperListener {
new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
- removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond());
- }
- } else {
- removeRecoveringRegionsFromZK(null, null);
- }
- }
- }
- }
-
- /**
- * Asynchronous handler for zk create node results.
- * Retries on failures.
- */
- class CreateAsyncCallback implements AsyncCallback.StringCallback {
- private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
-
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
- if (rc != 0) {
- if (needAbandonRetries(rc, "Create znode " + path)) {
- createNodeFailure(path);
- return;
- }
- if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
- // What if there is a delete pending against this pre-existing
- // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
- // state. Only operations that will be carried out on this node by
- // this manager are get-znode-data, task-finisher and delete-znode.
- // And all code pieces correctly handle the case of suddenly
- // disappearing task-znode.
- LOG.debug("found pre-existing znode " + path);
- SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
- } else {
- Long retry_count = (Long)ctx;
- LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
- path + " remaining retries=" + retry_count);
- if (retry_count == 0) {
- SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
- createNodeFailure(path);
- } else {
- SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
- createNode(path, retry_count - 1);
- }
- return;
- }
- }
- createNodeSuccess(path);
- }
- }
-
- /**
- * Asynchronous handler for zk get-data-set-watch on node results.
- * Retries on failures.
- */
- class GetDataAsyncCallback implements AsyncCallback.DataCallback {
- private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
-
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data,
- Stat stat) {
- SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
- if (rc != 0) {
- if (needAbandonRetries(rc, "GetData from znode " + path)) {
- return;
- }
- if (rc == KeeperException.Code.NONODE.intValue()) {
- SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
- LOG.warn("task znode " + path + " vanished or not created yet.");
- // ignore since we should not end up in a case where there is in-memory task,
- // but no znode. The only case is between the time task is created in-memory
- // and the znode is created. See HBASE-11217.
- return;
- }
- Long retry_count = (Long) ctx;
-
- if (retry_count < 0) {
- LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
- path + ". Ignoring error. No error handling. No retrying.");
- return;
- }
- LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
- path + " remaining retries=" + retry_count);
- if (retry_count == 0) {
- SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
- getDataSetWatchFailure(path);
- } else {
- SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
- getDataSetWatch(path, retry_count - 1);
- }
- return;
- }
- try {
- getDataSetWatchSuccess(path, data, stat.getVersion());
- } catch (DeserializationException e) {
- LOG.warn("Deserialization problem", e);
- }
- return;
- }
- }
-
- /**
- * Asynchronous handler for zk delete node results.
- * Retries on failures.
- */
- class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
- private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
-
- @Override
- public void processResult(int rc, String path, Object ctx) {
- SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
- if (rc != 0) {
- if (needAbandonRetries(rc, "Delete znode " + path)) {
- failedDeletions.add(path);
- return;
- }
- if (rc != KeeperException.Code.NONODE.intValue()) {
- SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
- Long retry_count = (Long) ctx;
- LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
- path + " remaining retries=" + retry_count);
- if (retry_count == 0) {
- LOG.warn("delete failed " + path);
- failedDeletions.add(path);
- deleteNodeFailure(path);
- } else {
- deleteNode(path, retry_count - 1);
+ removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond());
}
- return;
} else {
- LOG.info(path +
- " does not exist. Either was created but deleted behind our" +
- " back by another pending delete OR was deleted" +
- " in earlier retry rounds. zkretries = " + (Long) ctx);
+ removeRecoveringRegions(null, null);
}
- } else {
- LOG.debug("deleted " + path);
}
- deleteNodeSuccess(path);
}
}
- /**
- * Asynchronous handler for zk create RESCAN-node results.
- * Retries on failures.
- * <p>
- * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal
- * for all the {@link SplitLogWorker}s to rescan for new tasks.
- */
- class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
- private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
-
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- if (rc != 0) {
- if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
- return;
- }
- Long retry_count = (Long)ctx;
- LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
- " remaining retries=" + retry_count);
- if (retry_count == 0) {
- createRescanFailure();
- } else {
- createRescanNode(retry_count - 1);
- }
- return;
- }
- // path is the original arg, name is the actual name that was created
- createRescanSuccess(name);
- }
- }
-
- /**
- * {@link SplitLogManager} can use objects implementing this interface to
- * finish off a partially done task by {@link SplitLogWorker}. This provides
- * a serialization point at the end of the task processing. Must be
- * restartable and idempotent.
- */
- public interface TaskFinisher {
- /**
- * status that can be returned finish()
- */
- enum Status {
- /**
- * task completed successfully
- */
- DONE(),
- /**
- * task completed with error
- */
- ERR();
- }
- /**
- * finish the partially done task. workername provides clue to where the
- * partial results of the partially done tasks are present. taskname is the
- * name of the task that was put up in zookeeper.
- * <p>
- * @param workerName
- * @param taskname
- * @return DONE if task completed successfully, ERR otherwise
- */
- Status finish(ServerName workerName, String taskname);
+ public enum ResubmitDirective {
+ CHECK(), FORCE();
}
- enum ResubmitDirective {
- CHECK(),
- FORCE();
- }
-
- enum TerminationStatus {
- IN_PROGRESS("in_progress"),
- SUCCESS("success"),
- FAILURE("failure"),
- DELETED("deleted");
+ public enum TerminationStatus {
+ IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
String statusMsg;
+
TerminationStatus(String msg) {
statusMsg = msg;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 32ff083..dc13090 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -90,7 +91,6 @@ import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -1525,8 +1526,8 @@ public class HRegionServer extends HasThread implements
this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
}
- this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
- conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS));
+ this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
+ "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
@@ -1579,7 +1580,7 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
- this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
+ this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this);
splitLogWorker.start();
}
@@ -2855,7 +2856,7 @@ public class HRegionServer extends HasThread implements
minSeqIdForLogReplay = storeSeqIdForReplay;
}
}
-
+
try {
long lastRecordedFlushedSequenceId = -1;
String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
@@ -2868,7 +2869,7 @@ public class HRegionServer extends HasThread implements
throw new InterruptedIOException();
}
if (data != null) {
- lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
+ lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
}
if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
@@ -2881,11 +2882,11 @@ public class HRegionServer extends HasThread implements
LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
+ previousRSName);
} else {
- LOG.warn("Can't find failed region server for recovering region " +
+ LOG.warn("Can't find failed region server for recovering region " +
region.getEncodedName());
}
} catch (NoNodeException ignore) {
- LOG.debug("Region " + region.getEncodedName() +
+ LOG.debug("Region " + region.getEncodedName() +
" must have completed recovery because its recovery znode has been removed", ignore);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7d25bcd..0bd9067 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -160,6 +159,7 @@ import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
@@ -1306,7 +1306,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (previous == null) {
// check if the region to be opened is marked in recovering state in ZK
- if (SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
+ if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
region.getEncodedName())) {
// check if current region open is for distributedLogReplay. This check is to support
// rolling restart/upgrade where we want to Master/RS see same configuration
@@ -1318,7 +1318,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// could happen when turn distributedLogReplay off from on.
List<String> tmpRegions = new ArrayList<String>();
tmpRegions.add(region.getEncodedName());
- SplitLogManager.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), tmpRegions);
+ ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
+ tmpRegions);
}
}
// If there is no action in progress, we can submit a specific handler.