You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2013/06/24 10:15:56 UTC
svn commit: r1495946 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/procedure/
main/java/org/apache/hadoop/hbase/regionserver/snapshot/
test/java/org/apache/hadoop/hbase/procedure/
Author: mbertozzi
Date: Mon Jun 24 08:15:56 2013
New Revision: 1495946
URL: http://svn.apache.org/r1495946
Log:
HBASE-8783 RSSnapshotManager.ZKProcedureMemberRpcs may be initialized with the wrong server name
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java Mon Jun 24 08:15:56 2013
@@ -35,7 +35,7 @@ public interface ProcedureMemberRpcs ext
/**
* Initialize and start any threads or connections the member needs.
*/
- public void start(ProcedureMember member);
+ public void start(final String memberName, final ProcedureMember member);
/**
* Each subprocedure is being executed on a member. This is the identifier for the member.
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java Mon Jun 24 08:15:56 2013
@@ -164,7 +164,7 @@ public class ZKProcedureCoordinatorRpcs
this.coordinator = coordinator;
try {
- this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
+ this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
@Override
public void nodeCreated(String path) {
if (!isInProcedurePath(path)) return;
@@ -191,7 +191,7 @@ public class ZKProcedureCoordinatorRpcs
return false;
}
- LOG.debug("Starting the controller for procedure member:" + zkProc.getMemberName());
+ LOG.debug("Starting the controller for procedure member:" + coordName);
return true;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java Mon Jun 24 08:15:56 2013
@@ -55,24 +55,23 @@ import com.google.protobuf.InvalidProtoc
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
-
private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
- private final String memberName;
+
+ private final ZKProcedureUtil zkController;
protected ProcedureMember member;
- private ZKProcedureUtil zkController;
+ private String memberName;
/**
- * Must call {@link #start(ProcedureMember)} before this can be used.
+ * Must call {@link #start(String, ProcedureMember)} before this can be used.
* @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
* {@link #close()}.
* @param procType name of the znode describing the procedure type
- * @param memberName name of the member to join the procedure
* @throws KeeperException if we can't reach zookeeper
*/
- public ZKProcedureMemberRpcs(ZooKeeperWatcher watcher,
- String procType, String memberName) throws KeeperException {
- this.zkController = new ZKProcedureUtil(watcher, procType, memberName) {
+ public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
+ throws KeeperException {
+ this.zkController = new ZKProcedureUtil(watcher, procType) {
@Override
public void nodeCreated(String path) {
if (!isInProcedurePath(path)) {
@@ -114,7 +113,6 @@ public class ZKProcedureMemberRpcs imple
}
}
};
- this.memberName = memberName;
}
public ZKProcedureUtil getZkController() {
@@ -337,9 +335,10 @@ public class ZKProcedureMemberRpcs imple
}
}
- public void start(ProcedureMember listener) {
+ public void start(final String memberName, final ProcedureMember listener) {
LOG.debug("Starting procedure member '" + this.memberName + "'");
this.member = listener;
+ this.memberName = memberName;
watchForAbortedProcedures();
waitForNewProcedures();
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java Mon Jun 24 08:15:56 2013
@@ -64,8 +64,6 @@ public abstract class ZKProcedureUtil
protected final String reachedZnode;
protected final String abortZnode;
- protected final String memberName;
-
/**
* Top-level watcher/controller for procedures across the cluster.
* <p>
@@ -74,13 +72,11 @@ public abstract class ZKProcedureUtil
* @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
* {@link #close()}
* @param procDescription name of the znode describing the procedure to run
- * @param memberName name of the member from which we are interacting with running procedures
* @throws KeeperException when the procedure znodes cannot be created
*/
- public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription,
- String memberName) throws KeeperException {
+ public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription)
+ throws KeeperException {
super(watcher);
- this.memberName = memberName;
// make sure we are listening for events
watcher.registerListener(this);
// setup paths for the zknodes used in procedures
@@ -127,10 +123,6 @@ public abstract class ZKProcedureUtil
return acquiredZnode;
}
- public String getMemberName() {
- return memberName;
- }
-
/**
* Get the full znode path for the node used by the coordinator to trigger a global barrier
* acquire on each subprocedure.
@@ -189,7 +181,7 @@ public abstract class ZKProcedureUtil
return path.equals(acquiredZnode);
}
-
+
/**
* Is this in the procedure barrier acquired znode path
*/
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java Mon Jun 24 08:15:56 2013
@@ -119,9 +119,8 @@ public class RegionServerSnapshotManager
throws KeeperException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
- String nodeName = rss.getServerName().toString();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
- SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, nodeName);
+ SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
// read in the snapshot request configuration properties
Configuration conf = rss.getConfiguration();
@@ -129,7 +128,8 @@ public class RegionServerSnapshotManager
int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
// create the actual snapshot procedure member
- ThreadPoolExecutor pool = ProcedureMember.defaultPool(nodeName, opThreads, keepAlive);
+ ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
+ opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
}
@@ -137,7 +137,8 @@ public class RegionServerSnapshotManager
* Start accepting snapshot requests.
*/
public void start() {
- this.memberRpcs.start(member);
+ LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
+ this.memberRpcs.start(rss.getServerName().toString(), member);
}
/**
@@ -282,7 +283,7 @@ public class RegionServerSnapshotManager
boolean hasTasks() {
return futures.size() != 0;
}
-
+
/**
* Submit a task to the pool.
*
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java Mon Jun 24 08:15:56 2013
@@ -145,11 +145,11 @@ public class TestZKProcedure {
// start each member
for (String member : members) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
- ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+ ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
- comms.start(procMember);
+ comms.start(member, procMember);
}
// setup mock member subprocedures
@@ -219,11 +219,11 @@ public class TestZKProcedure {
expected.size());
for (String member : expected) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
- ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+ ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
- controller.start(mem);
+ controller.start(member, mem);
}
// setup mock subprocedures
@@ -311,7 +311,7 @@ public class TestZKProcedure {
try {
task.waitForCompleted();
} catch (ForeignException fe) {
- // this may get caught or may not
+ // this may get caught or may not
}
// -------------
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java?rev=1495946&r1=1495945&r2=1495946&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java Mon Jun 24 08:15:56 2013
@@ -88,7 +88,7 @@ public class TestZKProcedureControllers
final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
- watcher, "testSimple", COHORT_NODE_NAME);
+ watcher, "testSimple");
// mock out cohort member callbacks
final ProcedureMember member = Mockito
@@ -112,7 +112,7 @@ public class TestZKProcedureControllers
}).when(member).receivedReachedGlobalBarrier(operationName);
// start running the listener
- controller.start(member);
+ controller.start(COHORT_NODE_NAME, member);
// set a prepare node from a 'coordinator'
String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
@@ -386,9 +386,8 @@ public class TestZKProcedureControllers
List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
for (String nodeName : expected) {
- ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
- watcher, operationName, nodeName);
- cc.start(member);
+ ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
+ cc.start(nodeName, member);
cohortControllers.add(cc);
}
return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
@@ -411,9 +410,8 @@ public class TestZKProcedureControllers
// make a cohort controller for each expected node
List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
for (String nodeName : expected) {
- ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
- watcher, operationName, nodeName);
- cc.start(member);
+ ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
+ cc.start(nodeName, member);
cohortControllers.add(cc);
}