You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2014/08/01 01:03:50 UTC
git commit: [0.94] port HBASE-11217 Race between SplitLogManager task
creation + TimeoutMonitor. (LarsH - original patch by Enis)
Repository: hbase
Updated Branches:
refs/heads/0.94 197d3657c -> 812be0ca4
[0.94] port HBASE-11217 Race between SplitLogManager task creation + TimeoutMonitor. (LarsH - original patch by Enis)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/812be0ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/812be0ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/812be0ca
Branch: refs/heads/0.94
Commit: 812be0ca43f8ea0b6ce8e78249b170afa04b5767
Parents: 197d365
Author: Lars Hofhansl <la...@apache.org>
Authored: Thu Jul 31 16:03:12 2014 -0700
Committer: Lars Hofhansl <la...@apache.org>
Committed: Thu Jul 31 16:03:12 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/master/SplitLogManager.java | 43 ++++++++++++-------
.../hbase/master/TestSplitLogManager.java | 45 --------------------
2 files changed, 28 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/812be0ca/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 161a731..31981e7 100644
--- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -111,7 +111,7 @@ public class SplitLogManager extends ZooKeeperListener {
private long resubmit_threshold;
private long timeout;
private long unassignedTimeout;
- private long lastNodeCreateTime = Long.MAX_VALUE;
+ private long lastTaskCreateTime = Long.MAX_VALUE;
public boolean ignoreZKDeleteForTesting = false;
private final ConcurrentMap<String, Task> tasks =
@@ -229,7 +229,7 @@ public class SplitLogManager extends ZooKeeperListener {
* @throws IOException
* if there was an error while splitting any log file
* @return cumulative size of the logfiles split
- * @throws IOException
+ * @throws IOException
*/
public long splitLogDistributed(final Path logDir) throws IOException {
List<Path> logDirs = new ArrayList<Path>();
@@ -307,7 +307,7 @@ public class SplitLogManager extends ZooKeeperListener {
} catch (IOException ioe) {
FileStatus[] files = fs.listStatus(logDir);
if (files != null && files.length > 0) {
- LOG.warn("returning success without actually splitting and " +
+ LOG.warn("returning success without actually splitting and " +
"deleting all the log files in path " + logDir);
} else {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
@@ -325,7 +325,7 @@ public class SplitLogManager extends ZooKeeperListener {
/**
* Add a task entry to splitlog znode if it is not already there.
- *
+ *
* @param taskname the path of the log to be split
* @param batch the batch this task belongs to
* @return true if a new entry is created, false if it is already there.
@@ -333,6 +333,7 @@ public class SplitLogManager extends ZooKeeperListener {
boolean enqueueSplitTask(String taskname, TaskBatch batch) {
tot_mgr_log_split_start.incrementAndGet();
String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
+ lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
Task oldtask = createTaskIfAbsent(path, batch);
if (oldtask == null) {
// publish the task in zk
@@ -461,7 +462,6 @@ public class SplitLogManager extends ZooKeeperListener {
}
private void createNodeSuccess(String path) {
- lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
LOG.debug("put up splitlog task at znode " + path);
getDataSetWatch(path, zkretries);
}
@@ -476,7 +476,7 @@ public class SplitLogManager extends ZooKeeperListener {
private void getDataSetWatch(String path, Long retry_count) {
this.watcher.getRecoverableZooKeeper().getZooKeeper().
getData(path, this.watcher,
- new GetDataAsyncCallback(), retry_count);
+ new GetDataAsyncCallback(true), retry_count);
tot_mgr_get_data_queued.incrementAndGet();
}
@@ -484,7 +484,7 @@ public class SplitLogManager extends ZooKeeperListener {
// A negative retry count will lead to ignoring all error processing.
this.watcher.getRecoverableZooKeeper().getZooKeeper().
getData(path, this.watcher,
- new GetDataAsyncCallback(), new Long(-1) /* retry count */);
+ new GetDataAsyncCallback(false), new Long(-1) /* retry count */);
tot_mgr_get_data_queued.incrementAndGet();
}
@@ -726,7 +726,7 @@ public class SplitLogManager extends ZooKeeperListener {
/**
* signal the workers that a task was resubmitted by creating the
* RESCAN node.
- * @throws KeeperException
+ * @throws KeeperException
*/
private void createRescanNode(long retries) {
// The RESCAN node will be deleted almost immediately by the
@@ -736,6 +736,7 @@ public class SplitLogManager extends ZooKeeperListener {
// might miss the watch-trigger that creation of RESCAN node provides.
// Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
// therefore this behavior is safe.
+ lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
this.watcher.getRecoverableZooKeeper().getZooKeeper().
create(ZKSplitLog.getRescanNode(watcher),
TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE,
@@ -744,7 +745,6 @@ public class SplitLogManager extends ZooKeeperListener {
}
private void createRescanSuccess(String path) {
- lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
tot_mgr_rescan.incrementAndGet();
getDataSetWatch(path, zkretries);
}
@@ -1047,7 +1047,7 @@ public class SplitLogManager extends ZooKeeperListener {
// master should spawn both a manager and a worker thread to guarantee
// that there is always one worker in the system
if (tot > 0 && !found_assigned_task &&
- ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
+ ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) >
unassignedTimeout)) {
for (Map.Entry<String, Task> e : tasks.entrySet()) {
String path = e.getKey();
@@ -1126,6 +1126,17 @@ public class SplitLogManager extends ZooKeeperListener {
*/
class GetDataAsyncCallback implements AsyncCallback.DataCallback {
private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
+ private boolean completeTaskOnNoNode;
+
+ /**
+ * @param completeTaskOnNoNode Complete the task if the znode cannot be found.
+ * Since in-memory task creation and znode creation are not atomic, there might be
+ * a race where there is a task in memory but the znode is not created yet (TimeoutMonitor).
+ * In this case completeTaskOnNoNode should be set to false. See HBASE-11217.
+ */
+ public GetDataAsyncCallback(boolean completeTaskOnNoNode) {
+ this.completeTaskOnNoNode = completeTaskOnNoNode;
+ }
@Override
public void processResult(int rc, String path, Object ctx, byte[] data,
@@ -1137,11 +1148,13 @@ public class SplitLogManager extends ZooKeeperListener {
}
if (rc == KeeperException.Code.NONODE.intValue()) {
tot_mgr_get_data_nonode.incrementAndGet();
- // The task znode has been deleted. Must be some pending delete
- // that deleted the task. Assume success because a task-znode is
- // is only deleted after TaskFinisher is successful.
LOG.warn("task znode " + path + " vanished.");
- getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+ if (completeTaskOnNoNode) {
+ // The task znode has been deleted. Must be some pending delete
+ // that deleted the task. Assume success because a task-znode is
+ // is only deleted after TaskFinisher is successful.
+ getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+ }
return;
}
Long retry_count = (Long) ctx;
@@ -1284,7 +1297,7 @@ public class SplitLogManager extends ZooKeeperListener {
TerminationStatus(String msg) {
statusMsg = msg;
}
-
+
@Override
public String toString() {
return statusMsg;
http://git-wip-us.apache.org/repos/asf/hbase/blob/812be0ca/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index ff87e3c..a60bc95 100644
--- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -439,51 +439,6 @@ public class TestSplitLogManager {
assertFalse(fs.exists(emptyLogDirPath));
}
- @Test(timeout=45000)
- public void testVanishingTaskZNode() throws Exception {
- LOG.info("testVanishingTaskZNode");
- conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
- conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 1000);
- slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null);
- slm.finishInitialization();
- FileSystem fs = TEST_UTIL.getTestFileSystem();
- final Path logDir = new Path(fs.getWorkingDirectory(),
- UUID.randomUUID().toString());
- fs.mkdirs(logDir);
- Thread thread = null;
- try {
- Path logFile = new Path(logDir, UUID.randomUUID().toString());
- fs.createNewFile(logFile);
- thread = new Thread() {
- public void run() {
- try {
- // this call will block because there are no SplitLogWorkers,
- // until the task znode is deleted below. Then the call will
- // complete successfully, assuming the log is split.
- slm.splitLogDistributed(logDir);
- } catch (Exception e) {
- LOG.warn("splitLogDistributed failed", e);
- }
- }
- };
- thread.start();
- waitForCounter(tot_mgr_node_create_result, 0, 1, 10000);
- String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString());
- // remove the task znode, to finish the distributed log splitting
- ZKUtil.deleteNode(zkw, znode);
- waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000);
- waitForCounter(tot_mgr_log_split_batch_success, 0, 1, to/2);
- assertTrue(fs.exists(logFile));
- } finally {
- if (thread != null) {
- // interrupt the thread in case the test fails in the middle.
- // it has no effect if the thread is already terminated.
- thread.interrupt();
- }
- fs.delete(logDir, true);
- }
- }
-
@Test
public void testWorkerCrash() throws Exception {
conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);