You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2013/06/24 10:15:56 UTC

svn commit: r1495946 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/procedure/ main/java/org/apache/hadoop/hbase/regionserver/snapshot/ test/java/org/apache/hadoop/hbase/procedure/

Author: mbertozzi
Date: Mon Jun 24 08:15:56 2013
New Revision: 1495946

URL: http://svn.apache.org/r1495946
Log:
HBASE-8783 RSSnapshotManager.ZKProcedureMemberRpcs may be initialized with the wrong server name

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java Mon Jun 24 08:15:56 2013
@@ -35,7 +35,7 @@ public interface ProcedureMemberRpcs ext
   /**
    * Initialize and start any threads or connections the member needs.
    */
-  public void start(ProcedureMember member);
+  public void start(final String memberName, final ProcedureMember member);
 
   /**
    * Each subprocedure is being executed on a member.  This is the identifier for the member.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java Mon Jun 24 08:15:56 2013
@@ -164,7 +164,7 @@ public class ZKProcedureCoordinatorRpcs 
     this.coordinator = coordinator;
 
     try {
-      this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
+      this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
         @Override
         public void nodeCreated(String path) {
           if (!isInProcedurePath(path)) return;
@@ -191,7 +191,7 @@ public class ZKProcedureCoordinatorRpcs 
       return false;
     }
 
-    LOG.debug("Starting the controller for procedure member:" + zkProc.getMemberName());
+    LOG.debug("Starting the controller for procedure member:" + coordName);
     return true;
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java Mon Jun 24 08:15:56 2013
@@ -55,24 +55,23 @@ import com.google.protobuf.InvalidProtoc
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
-
   private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
-  private final String memberName;
+
+  private final ZKProcedureUtil zkController;
 
   protected ProcedureMember member;
-  private ZKProcedureUtil zkController;
+  private String memberName;
 
   /**
-   * Must call {@link #start(ProcedureMember)} before this can be used.
+   * Must call {@link #start(String, ProcedureMember)} before this can be used.
    * @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
    *          {@link #close()}.
    * @param procType name of the znode describing the procedure type
-   * @param memberName name of the member to join the procedure
    * @throws KeeperException if we can't reach zookeeper
    */
-  public ZKProcedureMemberRpcs(ZooKeeperWatcher watcher,
-      String procType, String memberName) throws KeeperException {
-    this.zkController = new ZKProcedureUtil(watcher, procType, memberName) {
+  public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
+      throws KeeperException {
+    this.zkController = new ZKProcedureUtil(watcher, procType) {
       @Override
       public void nodeCreated(String path) {
         if (!isInProcedurePath(path)) {
@@ -114,7 +113,6 @@ public class ZKProcedureMemberRpcs imple
         }
       }
     };
-    this.memberName = memberName;
   }
 
   public ZKProcedureUtil getZkController() {
@@ -337,9 +335,10 @@ public class ZKProcedureMemberRpcs imple
     }
   }
 
-  public void start(ProcedureMember listener) {
+  public void start(final String memberName, final ProcedureMember listener) {
     LOG.debug("Starting procedure member '" + this.memberName + "'");
     this.member = listener;
+    this.memberName = memberName;
     watchForAbortedProcedures();
     waitForNewProcedures();
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java Mon Jun 24 08:15:56 2013
@@ -64,8 +64,6 @@ public abstract class ZKProcedureUtil
   protected final String reachedZnode;
   protected final String abortZnode;
 
-  protected final String memberName;
-
   /**
    * Top-level watcher/controller for procedures across the cluster.
    * <p>
@@ -74,13 +72,11 @@ public abstract class ZKProcedureUtil
    * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
    *          {@link #close()}
    * @param procDescription name of the znode describing the procedure to run
-   * @param memberName name of the member from which we are interacting with running procedures
    * @throws KeeperException when the procedure znodes cannot be created
    */
-  public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription,
-      String memberName) throws KeeperException {
+  public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription)
+      throws KeeperException {
     super(watcher);
-    this.memberName = memberName;
     // make sure we are listening for events
     watcher.registerListener(this);
     // setup paths for the zknodes used in procedures
@@ -127,10 +123,6 @@ public abstract class ZKProcedureUtil
     return acquiredZnode;
   }
 
-  public String getMemberName() {
-    return memberName;
-  }
-
   /**
    * Get the full znode path for the node used by the coordinator to trigger a global barrier
    * acquire on each subprocedure.
@@ -189,7 +181,7 @@ public abstract class ZKProcedureUtil
     return path.equals(acquiredZnode);
   }
 
-  
+
   /**
    * Is this in the procedure barrier acquired znode path
    */

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java Mon Jun 24 08:15:56 2013
@@ -119,9 +119,8 @@ public class RegionServerSnapshotManager
       throws KeeperException {
     this.rss = rss;
     ZooKeeperWatcher zkw = rss.getZooKeeper();
-    String nodeName = rss.getServerName().toString();
     this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
-        SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, nodeName);
+        SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
 
     // read in the snapshot request configuration properties
     Configuration conf = rss.getConfiguration();
@@ -129,7 +128,8 @@ public class RegionServerSnapshotManager
     int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
 
     // create the actual snapshot procedure member
-    ThreadPoolExecutor pool = ProcedureMember.defaultPool(nodeName, opThreads, keepAlive);
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
+      opThreads, keepAlive);
     this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
   }
 
@@ -137,7 +137,8 @@ public class RegionServerSnapshotManager
    * Start accepting snapshot requests.
    */
   public void start() {
-    this.memberRpcs.start(member);
+    LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
+    this.memberRpcs.start(rss.getServerName().toString(), member);
   }
 
   /**
@@ -282,7 +283,7 @@ public class RegionServerSnapshotManager
     boolean hasTasks() {
       return futures.size() != 0;
     }
-    
+
     /**
      * Submit a task to the pool.
      *

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java Mon Jun 24 08:15:56 2013
@@ -145,11 +145,11 @@ public class TestZKProcedure {
     // start each member
     for (String member : members) {
       ZooKeeperWatcher watcher = newZooKeeperWatcher();
-      ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+      ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
       ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
       ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
       procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
-      comms.start(procMember);
+      comms.start(member, procMember);
     }
 
     // setup mock member subprocedures
@@ -219,11 +219,11 @@ public class TestZKProcedure {
         expected.size());
     for (String member : expected) {
       ZooKeeperWatcher watcher = newZooKeeperWatcher();
-      ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+      ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
       ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
       ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
       members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
-      controller.start(mem);
+      controller.start(member, mem);
     }
 
     // setup mock subprocedures
@@ -311,7 +311,7 @@ public class TestZKProcedure {
     try {
       task.waitForCompleted();
     } catch (ForeignException fe) {
-      // this may get caught or may not 
+      // this may get caught or may not
     }
 
     // -------------

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java Mon Jun 24 08:15:56 2013
@@ -88,7 +88,7 @@ public class TestZKProcedureControllers 
 
     final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
     final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
-        watcher, "testSimple", COHORT_NODE_NAME);
+        watcher, "testSimple");
 
     // mock out cohort member callbacks
     final ProcedureMember member = Mockito
@@ -112,7 +112,7 @@ public class TestZKProcedureControllers 
     }).when(member).receivedReachedGlobalBarrier(operationName);
 
     // start running the listener
-    controller.start(member);
+    controller.start(COHORT_NODE_NAME, member);
 
     // set a prepare node from a 'coordinator'
     String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
@@ -386,9 +386,8 @@ public class TestZKProcedureControllers 
 
       List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
       for (String nodeName : expected) {
-        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
-            watcher, operationName, nodeName);
-        cc.start(member);
+        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
+        cc.start(nodeName, member);
         cohortControllers.add(cc);
       }
       return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
@@ -411,9 +410,8 @@ public class TestZKProcedureControllers 
       // make a cohort controller for each expected node
       List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
       for (String nodeName : expected) {
-        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
-            watcher, operationName, nodeName);
-        cc.start(member);
+        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
+        cc.start(nodeName, member);
         cohortControllers.add(cc);
       }