You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/04/05 03:21:14 UTC

svn commit: r1464804 - /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java

Author: sershe
Date: Fri Apr  5 01:21:14 2013
New Revision: 1464804

URL: http://svn.apache.org/r1464804
Log:
HBASE-8092 bulk assignment in 0.94 doesn't handle ZK errors very well

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1464804&r1=1464803&r2=1464804&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Fri Apr  5 01:21:14 2013
@@ -1468,13 +1468,21 @@ public class AssignmentManager extends Z
     for (int oldCounter = 0; true;) {
       int count = counter.get();
       if (oldCounter != count) {
-        LOG.info(destination.toString() + " unassigned znodes=" + count +
+        LOG.info(destination.toString() + " outstanding calls=" + count +
           " of total=" + total);
         oldCounter = count;
       }
       if (count == total) break;
       Threads.sleep(1);
     }
+    // Check if any failed.
+    if (cb.hasErrors()) {
+      // TODO: createOrForceNodeOffline actually handles this condition; whereas this
+      //       code used to just abort master. Now, it will bail more "gracefully".
+      LOG.error("Error creating nodes for some of the regions we are trying to bulk assign");
+      return;
+    }
+
     // Move on to open regions.
     try {
       // Send OPEN RPC. If it fails on a IOE or RemoteException, the
@@ -1523,6 +1531,7 @@ public class AssignmentManager extends Z
     private final ZooKeeperWatcher zkw;
     private final ServerName destination;
     private final AtomicInteger counter;
+    private final AtomicInteger errorCount = new AtomicInteger(0);
 
     CreateUnassignedAsyncCallback(final ZooKeeperWatcher zkw,
         final ServerName destination, final AtomicInteger counter) {
@@ -1531,13 +1540,22 @@ public class AssignmentManager extends Z
       this.counter = counter;
     }
 
+    boolean hasErrors() {
+      return this.errorCount.get() > 0;
+    }
+
     @Override
     public void processResult(int rc, String path, Object ctx, String name) {
+      if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+        LOG.warn("Node for " + path + " already exists");
+        reportCompletion(false);
+        return;
+      }
       if (rc != 0) {
-     // This is resultcode.  If non-zero, need to resubmit.
-        LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
-          "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
-        this.zkw.abort("Connectionloss writing unassigned at " + path +
+        // This is resultcode. If non-zero, we will abort :(
+        LOG.warn("rc != 0 for " + path + " -- some error, may be retryable connection loss -- "
+            + "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
+        this.zkw.abort("Some error, may be connection loss writing unassigned at " + path +
           ", rc=" + rc, null);
         return;
       }
@@ -1545,7 +1563,14 @@ public class AssignmentManager extends Z
       // Async exists to set a watcher so we'll get triggered when
       // unassigned node changes.
       this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
-        new ExistsUnassignedAsyncCallback(this.counter, destination), ctx);
+        new ExistsUnassignedAsyncCallback(this, destination), ctx);
+    }
+
+    void reportCompletion(boolean success) {
+      if (!success) {
+        this.errorCount.incrementAndGet();
+      }
+      this.counter.incrementAndGet();
     }
   }
 
@@ -1555,20 +1580,22 @@ public class AssignmentManager extends Z
    */
   static class ExistsUnassignedAsyncCallback implements AsyncCallback.StatCallback {
     private final Log LOG = LogFactory.getLog(ExistsUnassignedAsyncCallback.class);
-    private final AtomicInteger counter;
     private ServerName destination;
+    private CreateUnassignedAsyncCallback parent;
 
-    ExistsUnassignedAsyncCallback(final AtomicInteger counter, ServerName destination) {
-      this.counter = counter;
+    ExistsUnassignedAsyncCallback(
+        CreateUnassignedAsyncCallback parent, ServerName destination) {
+      this.parent = parent;
       this.destination = destination;
     }
 
     @Override
     public void processResult(int rc, String path, Object ctx, Stat stat) {
       if (rc != 0) {
-        // Thisis resultcode.  If non-zero, need to resubmit.
-        LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
+        // This is resultcode.  If non-zero, need to resubmit.
+        LOG.warn("rc != 0 for " + path + " -- some error, may be connection loss -- " +
           "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
+        parent.reportCompletion(false);
         return;
       }
       RegionState state = (RegionState)ctx;
@@ -1579,7 +1606,7 @@ public class AssignmentManager extends Z
       // call to open risks our writing PENDING_OPEN after state has been moved
       // to OPENING by the regionserver.
       state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), destination);
-      this.counter.addAndGet(1);
+      parent.reportCompletion(true);
     }
   }
 
@@ -1909,6 +1936,7 @@ public class AssignmentManager extends Z
       ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
         this.master.getServerName(), cb, ctx);
     } catch (KeeperException e) {
+      // TODO: this error handling will never execute, as the callback is async.
       if (e instanceof NodeExistsException) {
         LOG.warn("Node for " + state.getRegion() + " already exists");
       } else {