You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2015/05/11 23:19:50 UTC

hbase git commit: HBASE-13217 Procedure fails due to ZK issue

Repository: hbase
Updated Branches:
  refs/heads/master fa6dc9c44 -> 174632111


HBASE-13217 Procedure fails due to ZK issue


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/17463211
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/17463211
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/17463211

Branch: refs/heads/master
Commit: 174632111c1f844151eea682331d9b5b157e4dd4
Parents: fa6dc9c
Author: Jerry He <je...@apache.org>
Authored: Mon May 11 14:19:13 2015 -0700
Committer: Jerry He <je...@apache.org>
Committed: Mon May 11 14:19:13 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/procedure/ProcedureMember.java | 22 ++++++++------------
 .../hadoop/hbase/procedure/Subprocedure.java    |  9 ++++++--
 .../hbase/procedure/ZKProcedureMemberRpcs.java  | 18 ++++++++--------
 3 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/17463211/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
index 30b964f..1f22022 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
@@ -39,10 +38,6 @@ import com.google.common.collect.MapMaker;
  * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
  * specialized part of a {@link Procedure} that actually does procedure type-specific work
  * and reports back to the coordinator as it completes each phase.
- * <p>
- * If there is a connection error ({@link #controllerConnectionFailure(String, IOException)}), all
- * currently running subprocedures are notify to failed since there is no longer a way to reach any
- * other members or coordinators since the rpcs are down.
  */
 @InterfaceAudience.Private
 public class ProcedureMember implements Closeable {
@@ -213,16 +208,17 @@ public class ProcedureMember implements Closeable {
    * other members since we cannot reach them anymore.
    * @param message description of the error
    * @param cause the actual cause of the failure
-   *
-   * TODO i'm tempted to just remove this code completely and treat it like any other abort.
-   * Implementation wise, if this happens it is a ZK failure which means the RS will abort.
+   * @param procName the name of the procedure we'd cancel due to the error.
    */
-  public void controllerConnectionFailure(final String message, final IOException cause) {
-    Collection<Subprocedure> toNotify = subprocs.values();
+  public void controllerConnectionFailure(final String message, final Throwable cause,
+      final String procName) {
     LOG.error(message, cause);
-    for (Subprocedure sub : toNotify) {
-      // TODO notify the elements, if they aren't null
-      sub.cancel(message, cause);
+    if (procName == null) {
+      return;
+    }
+    Subprocedure toNotify = subprocs.get(procName);
+    if (toNotify != null) {
+      toNotify.cancel(message, cause);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/17463211/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
index 9ef5d23..8927338 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Distributed procedure member's Subprocedure.  A procedure is sarted on a ProcedureCoordinator
@@ -106,8 +107,12 @@ abstract public class Subprocedure implements Callable<Void> {
           LOG.debug("Was remote foreign exception, not redispatching error", ee);
           return;
         }
-
-        // if it is local, then send it to the coordinator
+        // if this is a local KeeperException, don't attempt to notify other members
+        if (ee.getCause() instanceof KeeperException) {
+          LOG.debug("Was KeeperException, not redispatching error", ee);
+          return;
+        }
+        // if it is other local error, then send it to the coordinator
         try {
           rpcs.sendMemberAborted(Subprocedure.this, ee);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/17463211/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
index 387c2e4..6ac2a47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
@@ -143,7 +143,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
       }
     } catch (KeeperException e) {
       member.controllerConnectionFailure("Failed to list children for abort node:"
-          + zkController.getAbortZnode(), new IOException(e));
+          + zkController.getAbortZnode(), e, null);
     }
   }
 
@@ -160,7 +160,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
       }
     } catch (KeeperException e) {
       member.controllerConnectionFailure("General failure when watching for new procedures",
-        new IOException(e));
+        e, null);
     }
     if (runningProcedures == null) {
       LOG.debug("No running procedures.");
@@ -192,7 +192,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
       }
     } catch (KeeperException e) {
       member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
-          + ") for procedure :" + opName, new IOException(e));
+          + ") for procedure :" + opName, e, opName);
       return;
     }
 
@@ -220,10 +220,10 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
       sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
     } catch (KeeperException e) {
       member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
-        new IOException(e));
+        e, opName);
     } catch (InterruptedException e) {
       member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
-          new IOException(e));
+        e, opName);
       Thread.currentThread().interrupt();
     }
   }
@@ -252,7 +252,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
       }
     } catch (KeeperException e) {
       member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
-          + procName + " and member: " + memberName, new IOException(e));
+          + procName + " and member: " + memberName, e, procName);
     }
   }
 
@@ -274,7 +274,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
         ProtobufUtil.prependPBMagic(data));
     } catch (KeeperException e) {
       member.controllerConnectionFailure("Failed to post zk node:" + joinPath
-          + " to join procedure barrier.", new IOException(e));
+          + " to join procedure barrier.", e, procName);
     }
   }
 
@@ -301,7 +301,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
       // that case we should still get an error for that procedure anyways
       zkController.logZKTree(zkController.getBaseZnode());
       member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
-          + " to abort procedure", new IOException(e));
+          + " to abort procedure", e, procName);
     }
   }
 
@@ -338,7 +338,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
       this.member.receiveAbortProcedure(opName, ee);
     } catch (KeeperException e) {
       member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
-          + zkController.getAbortZnode(), new IOException(e));
+          + zkController.getAbortZnode(), e, opName);
     } catch (InterruptedException e) {
       LOG.warn("abort already in progress", e);
       Thread.currentThread().interrupt();