You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2014/08/01 01:03:50 UTC

git commit: [0.94] port HBASE-11217 Race between SplitLogManager task creation + TimeoutMonitor. (LarsH - original patch by Enis)

Repository: hbase
Updated Branches:
  refs/heads/0.94 197d3657c -> 812be0ca4


[0.94] port HBASE-11217 Race between SplitLogManager task creation + TimeoutMonitor. (LarsH - original patch by Enis)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/812be0ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/812be0ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/812be0ca

Branch: refs/heads/0.94
Commit: 812be0ca43f8ea0b6ce8e78249b170afa04b5767
Parents: 197d365
Author: Lars Hofhansl <la...@apache.org>
Authored: Thu Jul 31 16:03:12 2014 -0700
Committer: Lars Hofhansl <la...@apache.org>
Committed: Thu Jul 31 16:03:12 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/master/SplitLogManager.java    | 43 ++++++++++++-------
 .../hbase/master/TestSplitLogManager.java       | 45 --------------------
 2 files changed, 28 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/812be0ca/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 161a731..31981e7 100644
--- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -111,7 +111,7 @@ public class SplitLogManager extends ZooKeeperListener {
   private long resubmit_threshold;
   private long timeout;
   private long unassignedTimeout;
-  private long lastNodeCreateTime = Long.MAX_VALUE;
+  private long lastTaskCreateTime = Long.MAX_VALUE;
   public boolean ignoreZKDeleteForTesting = false;
 
   private final ConcurrentMap<String, Task> tasks =
@@ -229,7 +229,7 @@ public class SplitLogManager extends ZooKeeperListener {
    * @throws IOException
    *             if there was an error while splitting any log file
    * @return cumulative size of the logfiles split
-   * @throws IOException 
+   * @throws IOException
    */
   public long splitLogDistributed(final Path logDir) throws IOException {
     List<Path> logDirs = new ArrayList<Path>();
@@ -307,7 +307,7 @@ public class SplitLogManager extends ZooKeeperListener {
       } catch (IOException ioe) {
         FileStatus[] files = fs.listStatus(logDir);
         if (files != null && files.length > 0) {
-          LOG.warn("returning success without actually splitting and " + 
+          LOG.warn("returning success without actually splitting and " +
               "deleting all the log files in path " + logDir);
         } else {
           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
@@ -325,7 +325,7 @@ public class SplitLogManager extends ZooKeeperListener {
 
   /**
    * Add a task entry to splitlog znode if it is not already there.
-   * 
+   *
    * @param taskname the path of the log to be split
    * @param batch the batch this task belongs to
    * @return true if a new entry is created, false if it is already there.
@@ -333,6 +333,7 @@ public class SplitLogManager extends ZooKeeperListener {
   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
     tot_mgr_log_split_start.incrementAndGet();
     String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
+    lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
     Task oldtask = createTaskIfAbsent(path, batch);
     if (oldtask == null) {
       // publish the task in zk
@@ -461,7 +462,6 @@ public class SplitLogManager extends ZooKeeperListener {
   }
 
   private void createNodeSuccess(String path) {
-    lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
     LOG.debug("put up splitlog task at znode " + path);
     getDataSetWatch(path, zkretries);
   }
@@ -476,7 +476,7 @@ public class SplitLogManager extends ZooKeeperListener {
   private void getDataSetWatch(String path, Long retry_count) {
     this.watcher.getRecoverableZooKeeper().getZooKeeper().
         getData(path, this.watcher,
-        new GetDataAsyncCallback(), retry_count);
+        new GetDataAsyncCallback(true), retry_count);
     tot_mgr_get_data_queued.incrementAndGet();
   }
 
@@ -484,7 +484,7 @@ public class SplitLogManager extends ZooKeeperListener {
     // A negative retry count will lead to ignoring all error processing.
     this.watcher.getRecoverableZooKeeper().getZooKeeper().
         getData(path, this.watcher,
-        new GetDataAsyncCallback(), new Long(-1) /* retry count */);
+        new GetDataAsyncCallback(false), new Long(-1) /* retry count */);
     tot_mgr_get_data_queued.incrementAndGet();
   }
 
@@ -726,7 +726,7 @@ public class SplitLogManager extends ZooKeeperListener {
   /**
    * signal the workers that a task was resubmitted by creating the
    * RESCAN node.
-   * @throws KeeperException 
+   * @throws KeeperException
    */
   private void createRescanNode(long retries) {
     // The RESCAN node will be deleted almost immediately by the
@@ -736,6 +736,7 @@ public class SplitLogManager extends ZooKeeperListener {
     // might miss the watch-trigger that creation of RESCAN node provides.
     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
     // therefore this behavior is safe.
+    lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
     this.watcher.getRecoverableZooKeeper().getZooKeeper().
       create(ZKSplitLog.getRescanNode(watcher),
         TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE,
@@ -744,7 +745,6 @@ public class SplitLogManager extends ZooKeeperListener {
   }
 
   private void createRescanSuccess(String path) {
-    lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
     tot_mgr_rescan.incrementAndGet();
     getDataSetWatch(path, zkretries);
   }
@@ -1047,7 +1047,7 @@ public class SplitLogManager extends ZooKeeperListener {
       // master should spawn both a manager and a worker thread to guarantee
       // that there is always one worker in the system
       if (tot > 0 && !found_assigned_task &&
-          ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
+          ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) >
           unassignedTimeout)) {
         for (Map.Entry<String, Task> e : tasks.entrySet()) {
           String path = e.getKey();
@@ -1126,6 +1126,17 @@ public class SplitLogManager extends ZooKeeperListener {
    */
   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
+    private boolean completeTaskOnNoNode;
+
+    /**
+     * @param completeTaskOnNoNode Complete the task if the znode cannot be found.
+     * Since in-memory task creation and znode creation are not atomic, there might be
+     * a race where there is a task in memory but the znode is not created yet (TimeoutMonitor).
+     * In this case completeTaskOnNoNode should be set to false. See HBASE-11217.
+     */
+    public GetDataAsyncCallback(boolean completeTaskOnNoNode) {
+      this.completeTaskOnNoNode = completeTaskOnNoNode;
+    }
 
     @Override
     public void processResult(int rc, String path, Object ctx, byte[] data,
@@ -1137,11 +1148,13 @@ public class SplitLogManager extends ZooKeeperListener {
         }
         if (rc == KeeperException.Code.NONODE.intValue()) {
           tot_mgr_get_data_nonode.incrementAndGet();
-          // The task znode has been deleted. Must be some pending delete
-          // that deleted the task. Assume success because a task-znode is
-          // is only deleted after TaskFinisher is successful.
           LOG.warn("task znode " + path + " vanished.");
-          getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+          if (completeTaskOnNoNode) {
+            // The task znode has been deleted. Must be some pending delete
+            // that deleted the task. Assume success because a task-znode is
+            // is only deleted after TaskFinisher is successful.
+            getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+          }
           return;
         }
         Long retry_count = (Long) ctx;
@@ -1284,7 +1297,7 @@ public class SplitLogManager extends ZooKeeperListener {
     TerminationStatus(String msg) {
       statusMsg = msg;
     }
-    
+
     @Override
     public String toString() {
       return statusMsg;

http://git-wip-us.apache.org/repos/asf/hbase/blob/812be0ca/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index ff87e3c..a60bc95 100644
--- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -439,51 +439,6 @@ public class TestSplitLogManager {
     assertFalse(fs.exists(emptyLogDirPath));
   }
 
-  @Test(timeout=45000)
-  public void testVanishingTaskZNode() throws Exception {
-    LOG.info("testVanishingTaskZNode");
-    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
-    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 1000);
-    slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null);
-    slm.finishInitialization();
-    FileSystem fs = TEST_UTIL.getTestFileSystem();
-    final Path logDir = new Path(fs.getWorkingDirectory(),
-        UUID.randomUUID().toString());
-    fs.mkdirs(logDir);
-    Thread thread = null;
-    try {
-      Path logFile = new Path(logDir, UUID.randomUUID().toString());
-      fs.createNewFile(logFile);
-      thread = new Thread() {
-        public void run() {
-          try {
-            // this call will block because there are no SplitLogWorkers,
-            // until the task znode is deleted below. Then the call will
-            // complete successfully, assuming the log is split.
-            slm.splitLogDistributed(logDir);
-          } catch (Exception e) {
-            LOG.warn("splitLogDistributed failed", e);
-          }
-        }
-      };
-      thread.start();
-      waitForCounter(tot_mgr_node_create_result, 0, 1, 10000);
-      String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString());
-      // remove the task znode, to finish the distributed log splitting
-      ZKUtil.deleteNode(zkw, znode);
-      waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000);
-      waitForCounter(tot_mgr_log_split_batch_success, 0, 1, to/2);
-      assertTrue(fs.exists(logFile));
-    } finally {
-      if (thread != null) {
-        // interrupt the thread in case the test fails in the middle.
-        // it has no effect if the thread is already terminated.
-        thread.interrupt();
-      }
-      fs.delete(logDir, true);
-    }
-  }
-
   @Test
   public void testWorkerCrash() throws Exception {
     conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);