You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/04/05 03:21:14 UTC
svn commit: r1464804 -
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
Author: sershe
Date: Fri Apr 5 01:21:14 2013
New Revision: 1464804
URL: http://svn.apache.org/r1464804
Log:
HBASE-8092 bulk assignment in 0.94 doesn't handle ZK errors very well
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1464804&r1=1464803&r2=1464804&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Fri Apr 5 01:21:14 2013
@@ -1468,13 +1468,21 @@ public class AssignmentManager extends Z
for (int oldCounter = 0; true;) {
int count = counter.get();
if (oldCounter != count) {
- LOG.info(destination.toString() + " unassigned znodes=" + count +
+ LOG.info(destination.toString() + " outstanding calls=" + count +
" of total=" + total);
oldCounter = count;
}
if (count == total) break;
Threads.sleep(1);
}
+ // Check if any failed.
+ if (cb.hasErrors()) {
+ // TODO: createOrForceNodeOffline actually handles this condition; whereas this
+ // code used to just abort master. Now, it will bail more "gracefully".
+ LOG.error("Error creating nodes for some of the regions we are trying to bulk assign");
+ return;
+ }
+
// Move on to open regions.
try {
// Send OPEN RPC. If it fails on a IOE or RemoteException, the
@@ -1523,6 +1531,7 @@ public class AssignmentManager extends Z
private final ZooKeeperWatcher zkw;
private final ServerName destination;
private final AtomicInteger counter;
+ private final AtomicInteger errorCount = new AtomicInteger(0);
CreateUnassignedAsyncCallback(final ZooKeeperWatcher zkw,
final ServerName destination, final AtomicInteger counter) {
@@ -1531,13 +1540,22 @@ public class AssignmentManager extends Z
this.counter = counter;
}
+ boolean hasErrors() {
+ return this.errorCount.get() > 0;
+ }
+
@Override
public void processResult(int rc, String path, Object ctx, String name) {
+ if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+ LOG.warn("Node for " + path + " already exists");
+ reportCompletion(false);
+ return;
+ }
if (rc != 0) {
- // This is resultcode. If non-zero, need to resubmit.
- LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
- "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
- this.zkw.abort("Connectionloss writing unassigned at " + path +
+ // This is resultcode. If non-zero, we will abort :(
+ LOG.warn("rc != 0 for " + path + " -- some error, may be retryable connection loss -- "
+ + "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
+ this.zkw.abort("Some error, may be connection loss writing unassigned at " + path +
", rc=" + rc, null);
return;
}
@@ -1545,7 +1563,14 @@ public class AssignmentManager extends Z
// Async exists to set a watcher so we'll get triggered when
// unassigned node changes.
this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
- new ExistsUnassignedAsyncCallback(this.counter, destination), ctx);
+ new ExistsUnassignedAsyncCallback(this, destination), ctx);
+ }
+
+ void reportCompletion(boolean success) {
+ if (!success) {
+ this.errorCount.incrementAndGet();
+ }
+ this.counter.incrementAndGet();
}
}
@@ -1555,20 +1580,22 @@ public class AssignmentManager extends Z
*/
static class ExistsUnassignedAsyncCallback implements AsyncCallback.StatCallback {
private final Log LOG = LogFactory.getLog(ExistsUnassignedAsyncCallback.class);
- private final AtomicInteger counter;
private ServerName destination;
+ private CreateUnassignedAsyncCallback parent;
- ExistsUnassignedAsyncCallback(final AtomicInteger counter, ServerName destination) {
- this.counter = counter;
+ ExistsUnassignedAsyncCallback(
+ CreateUnassignedAsyncCallback parent, ServerName destination) {
+ this.parent = parent;
this.destination = destination;
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (rc != 0) {
- // Thisis resultcode. If non-zero, need to resubmit.
- LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
+ // This is resultcode. If non-zero, need to resubmit.
+ LOG.warn("rc != 0 for " + path + " -- some error, may be connection loss -- " +
"FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
+ parent.reportCompletion(false);
return;
}
RegionState state = (RegionState)ctx;
@@ -1579,7 +1606,7 @@ public class AssignmentManager extends Z
// call to open risks our writing PENDING_OPEN after state has been moved
// to OPENING by the regionserver.
state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), destination);
- this.counter.addAndGet(1);
+ parent.reportCompletion(true);
}
}
@@ -1909,6 +1936,7 @@ public class AssignmentManager extends Z
ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
this.master.getServerName(), cb, ctx);
} catch (KeeperException e) {
+ // TODO: this error handling will never execute, as the callback is async.
if (e instanceof NodeExistsException) {
LOG.warn("Node for " + state.getRegion() + " already exists");
} else {