You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/13 19:39:52 UTC
svn commit: r1445818 [2/3] - in /hbase/branches/hbase-7290/hbase-server/src:
main/java/org/apache/hadoop/hbase/procedure/
main/java/org/apache/hadoop/hbase/zookeeper/
test/java/org/apache/hadoop/hbase/procedure/
Added: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This is a shared ZooKeeper-based znode management utils for distributed procedure. All znode
+ * operations should go through the provided methods in coordinators and members.
+ *
+ * Layout of nodes in ZK is
+ * /hbase/[op name]/acquired/
+ * [op instance] - op data/
+ * /[nodes that have acquired]
+ * /reached/
+ * [op instance]/
+ * /[nodes that have completed]
+ * /abort/
+ * [op instance] - failure data
+ *
+ * NOTE: while acquired and completed are znode dirs, abort is actually just a znode.
+ *
+ * Assumption here that procedure names are unique
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class ZKProcedureUtil
+ extends ZooKeeperListener implements Closeable {
+
+ private static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
+
+ public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
+ public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
+ public static final String ABORT_ZNODE_DEFAULT = "abort";
+
+ public final String baseZNode;
+ protected final String acquiredZnode;
+ protected final String reachedZnode;
+ protected final String abortZnode;
+
+ protected final String memberName;
+
+ /**
+ * Top-level watcher/controller for procedures across the cluster.
+ * <p>
+ * On instantiation, this ensures the procedure znodes exists. This however requires calling
+ * {@link #start} to start monitoring for running procedures.
+ * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
+ * {@link #close()}
+ * @param procDescription name of the znode describing the procedure to run
+ * @param memberName name of the member from which we are interacting with running procedures
+ * @throws KeeperException when the procedure znodes cannot be created
+ */
+ public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription,
+ String memberName) throws KeeperException {
+ super(watcher);
+ this.memberName = memberName;
+ // make sure we are listening for events
+ watcher.registerListener(this);
+ // setup paths for the zknodes used in procedures
+ this.baseZNode = ZKUtil.joinZNode(watcher.baseZNode, procDescription);
+ acquiredZnode = ZKUtil.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
+ reachedZnode = ZKUtil.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
+ abortZnode = ZKUtil.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
+
+ // first make sure all the ZK nodes exist
+ // make sure all the parents exist (sometimes not the case in tests)
+ ZKUtil.createWithParents(watcher, acquiredZnode);
+ // regular create because all the parents exist
+ ZKUtil.createAndFailSilent(watcher, reachedZnode);
+ ZKUtil.createAndFailSilent(watcher, abortZnode);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (watcher != null) {
+ watcher.close();
+ }
+ }
+
+ public String getAcquiredBarrierNode(String opInstanceName) {
+ return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
+ }
+
+ public String getReachedBarrierNode(String opInstanceName) {
+ return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
+ }
+
+ public String getAbortZNode(String opInstanceName) {
+ return ZKProcedureUtil.getAbortNode(this, opInstanceName);
+ }
+
+ public String getAbortZnode() {
+ return abortZnode;
+ }
+
+ public String getBaseZnode() {
+ return baseZNode;
+ }
+
+ public String getAcquiredBarrier() {
+ return acquiredZnode;
+ }
+
+ public String getMemberName() {
+ return memberName;
+ }
+
+ /**
+ * Get the full znode path for the node used by the coordinator to trigger a global barrier
+ * acquire on each subprocedure.
+ * @param controller controller running the procedure
+ * @param opInstanceName name of the running procedure instance (not the procedure description).
+ * @return full znode path to the prepare barrier/start node
+ */
+ public static String getAcquireBarrierNode(ZKProcedureUtil controller,
+ String opInstanceName) {
+ return ZKUtil.joinZNode(controller.acquiredZnode, opInstanceName);
+ }
+
+ /**
+ * Get the full znode path for the node used by the coordinator to trigger a global barrier
+ * execution and release on each subprocedure.
+ * @param controller controller running the procedure
+ * @param opInstanceName name of the running procedure instance (not the procedure description).
+ * @return full znode path to the commit barrier
+ */
+ public static String getReachedBarrierNode(ZKProcedureUtil controller,
+ String opInstanceName) {
+ return ZKUtil.joinZNode(controller.reachedZnode, opInstanceName);
+ }
+
+ /**
+ * Get the full znode path for the node used by the coordinator or member to trigger an abort
+ * of the global barrier acquisition or execution in subprocedures.
+ * @param controller controller running the procedure
+ * @param opInstanceName name of the running procedure instance (not the procedure description).
+ * @return full znode path to the abort znode
+ */
+ public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
+ return ZKUtil.joinZNode(controller.abortZnode, opInstanceName);
+ }
+
+ public ZooKeeperWatcher getWatcher() {
+ return watcher;
+ }
+
+ /**
+ * Is this a procedure related znode path?
+ *
+ * TODO: this is not strict, can return true if had name just starts with same prefix but is
+ * different zdir.
+ *
+ * @return true if starts with baseZnode
+ */
+ public boolean isInProcedurePath(String path) {
+ return path.startsWith(baseZNode);
+ }
+
+ /**
+ * Is this in the procedure barrier acquired znode path
+ */
+ public boolean isAcquiredPathNode(String path) {
+ return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode);
+ }
+
+ /**
+ * Is this in the procedure barrier reached znode path
+ */
+ public boolean isReachedPathNode(String path) {
+ return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode);
+ }
+
+ /**
+ * Is this in the procedure barrier abort znode path
+ */
+ public boolean isAbortPathNode(String path) {
+ return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
+ }
+
+ // --------------------------------------------------------------------------
+ // internal debugging methods
+ // --------------------------------------------------------------------------
+ /**
+ * Recursively print the current state of ZK (non-transactional)
+ * @param root name of the root directory in zk to print
+ * @throws KeeperException
+ */
+ public void logZKTree(String root) {
+ if (!LOG.isDebugEnabled()) return;
+ LOG.debug("Current zk system:");
+ String prefix = "|-";
+ LOG.debug(prefix + root);
+ try {
+ logZKTree(root, prefix);
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Helper method to print the current state of the ZK tree.
+ * @see #logZKTree(String)
+ * @throws KeeperException if an unexpected exception occurs
+ */
+ protected void logZKTree(String root, String prefix) throws KeeperException {
+ List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
+ if (children == null) return;
+ for (String child : children) {
+ LOG.debug(prefix + child);
+ String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
+ logZKTree(node, prefix + "---");
+ }
+ }
+
+ public void clearChildZNodes() throws KeeperException {
+ // TODO This is potentially racy since not atomic. update when we support zk that has multi
+
+ // If the coordinator was shutdown mid-procedure, then we are going to lose
+ // an procedure that was previously started by cleaning out all the previous state. Its much
+ // harder to figure out how to keep an procedure going and the subject of HBASE-5487.
+ ZKUtil.deleteChildrenRecursively(watcher, acquiredZnode);
+ ZKUtil.deleteChildrenRecursively(watcher, reachedZnode);
+ ZKUtil.deleteChildrenRecursively(watcher, abortZnode);
+ }
+
+ public void clearZNodes(String procedureName) throws KeeperException {
+ // TODO This is potentially racy since not atomic. update when we support zk that has multi
+
+ ZKUtil.deleteNodeRecursively(watcher, getAcquiredBarrierNode(procedureName));
+ ZKUtil.deleteNodeRecursively(watcher, getReachedBarrierNode(procedureName));
+ ZKUtil.deleteNodeRecursively(watcher, getAbortZNode(procedureName));
+ }
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1445818&r1=1445817&r2=1445818&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Wed Feb 13 18:39:51 2013
@@ -23,21 +23,16 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
-import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
-import javax.security.auth.login.LoginException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,6 +44,8 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -56,9 +53,9 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.server.ZooKeeperSaslServer;
/**
@@ -849,6 +846,10 @@ public class ZKUtil {
/**
* Set data into node creating node if it doesn't yet exist.
* Does not set watch.
+ *
+ * WARNING: this is not atomic -- it is possible to get a 0-byte data value in the znode before
+ * data is written
+ *
* @param zkw zk reference
* @param znode path of node
* @param data data to set for node
@@ -1070,7 +1071,7 @@ public class ZKUtil {
}
/**
- * Creates the specified node, if the node does not exist. Does not set a
+ * Creates the specified node, iff the node does not exist. Does not set a
* watch and fails silently if the node already exists.
*
* The node created is persistent and open access.
@@ -1082,11 +1083,28 @@ public class ZKUtil {
public static void createAndFailSilent(ZooKeeperWatcher zkw,
String znode)
throws KeeperException {
+ createAndFailSilent(zkw, znode, new byte[0]);
+ }
+
+ /**
+ * Creates the specified node containing specified data, iff the node does not exist. Does
+ * not set a watch and fails silently if the node already exists.
+ *
+ * The node created is persistent and open access.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @param data a byte array data to store in the znode
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static void createAndFailSilent(ZooKeeperWatcher zkw,
+ String znode, byte[] data)
+ throws KeeperException {
try {
RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
waitForZKConnectionIfAuthenticating(zkw);
if (zk.exists(znode, false) == null) {
- zk.create(znode, new byte[0], createACL(zkw,znode),
+ zk.create(znode, data, createACL(zkw,znode),
CreateMode.PERSISTENT);
}
} catch(KeeperException.NodeExistsException nee) {
@@ -1118,12 +1136,30 @@ public class ZKUtil {
*/
public static void createWithParents(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
+ createWithParents(zkw, znode, new byte[0]);
+ }
+
+ /**
+ * Creates the specified node and all parent nodes required for it to exist. The creation of
+ * parent znodes is not atomic with the leafe znode creation but the data is written atomically
+ * when the leaf node is created.
+ *
+ * No watches are set and no errors are thrown if the node already exists.
+ *
+ * The nodes created are persistent and open access.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static void createWithParents(ZooKeeperWatcher zkw, String znode, byte[] data)
+ throws KeeperException {
try {
if(znode == null) {
return;
}
waitForZKConnectionIfAuthenticating(zkw);
- zkw.getRecoverableZooKeeper().create(znode, new byte[0], createACL(zkw, znode),
+ zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
CreateMode.PERSISTENT);
} catch(KeeperException.NodeExistsException nee) {
return;
@@ -1422,4 +1458,37 @@ public class ZKUtil {
ke.initCause(e);
return ke;
}
+
+ /**
+ * Recursively print the current state of ZK (non-transactional)
+ * @param root name of the root directory in zk to print
+ * @throws KeeperException
+ */
+ public static void logZKTree(ZooKeeperWatcher zkw, String root) {
+ if (!LOG.isDebugEnabled()) return;
+ LOG.debug("Current zk system:");
+ String prefix = "|-";
+ LOG.debug(prefix + root);
+ try {
+ logZKTree(zkw, root, prefix);
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Helper method to print the current state of the ZK tree.
+ * @see #logZKTree(String)
+ * @throws KeeperException if an unexpected exception occurs
+ */
+ protected static void logZKTree(ZooKeeperWatcher zkw, String root, String prefix) throws KeeperException {
+ List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);
+ if (children == null) return;
+ for (String child : children) {
+ LOG.debug(prefix + child);
+ String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
+ logZKTree(zkw, node, prefix + "---");
+ }
+ }
+
}
Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Demonstrate how Procedure handles single members, multiple members, and errors semantics
+ */
+@Category(SmallTests.class)
+public class TestProcedure {
+
+ ProcedureCoordinator coord;
+
+ @Before
+ public void setup() {
+ coord = mock(ProcedureCoordinator.class);
+ final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class);
+ when(coord.getRpcs()).thenReturn(comms); // make it not null
+ }
+
+ class LatchedProcedure extends Procedure {
+ CountDownLatch startedAcquireBarrier = new CountDownLatch(1);
+ CountDownLatch startedDuringBarrier = new CountDownLatch(1);
+ CountDownLatch completedProcedure = new CountDownLatch(1);
+
+ public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor,
+ long wakeFreq, long timeout, String opName, byte[] data,
+ List<String> expectedMembers) {
+ super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers);
+ }
+
+ @Override
+ public void sendGlobalBarrierStart() {
+ startedAcquireBarrier.countDown();
+ }
+
+ @Override
+ public void sendGlobalBarrierReached() {
+ startedDuringBarrier.countDown();
+ }
+
+ @Override
+ public void sendGlobalBarrierComplete() {
+ completedProcedure.countDown();
+ }
+ };
+
+ /**
+ * With a single member, verify ordered execution. The Coordinator side is run in a separate
+ * thread so we can only trigger from members and wait for particular state latches.
+ */
+ @Test(timeout = 1000)
+ public void testSingleMember() throws Exception {
+ // The member
+ List<String> members = new ArrayList<String>();
+ members.add("member");
+ LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+ Integer.MAX_VALUE, "op", null, members);
+ final LatchedProcedure procspy = spy(proc);
+ // coordinator: start the barrier procedure
+ new Thread() {
+ public void run() {
+ procspy.call();
+ }
+ }.start();
+
+ // coordinator: wait for the barrier to be acquired, then send start barrier
+ proc.startedAcquireBarrier.await();
+
+ // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
+ verify(procspy).sendGlobalBarrierStart();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ verify(procspy, never()).sendGlobalBarrierComplete();
+ verify(procspy, never()).barrierAcquiredByMember(anyString());
+
+ // member: trigger global barrier acquisition
+ proc.barrierAcquiredByMember(members.get(0));
+
+ // coordinator: wait for global barrier to be acquired.
+ proc.acquiredBarrierLatch.await();
+ verify(procspy).sendGlobalBarrierStart(); // old news
+
+ // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
+ // or was not called here.
+
+ // member: trigger global barrier release
+ proc.barrierReleasedByMember(members.get(0));
+
+ // coordinator: wait for procedure to be completed
+ proc.completedProcedure.await();
+ verify(procspy).sendGlobalBarrierReached();
+ verify(procspy).sendGlobalBarrierComplete();
+ verify(procspy, never()).receive(any(ForeignException.class));
+ }
+
+ @Test(timeout=1000)
+ public void testMultipleMember() throws Exception {
+ // 2 members
+ List<String> members = new ArrayList<String>();
+ members.add("member1");
+ members.add("member2");
+
+ LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+ Integer.MAX_VALUE, "op", null, members);
+ final LatchedProcedure procspy = spy(proc);
+ // start the barrier procedure
+ new Thread() {
+ public void run() {
+ procspy.call();
+ }
+ }.start();
+
+ // coordinator: wait for the barrier to be acquired, then send start barrier
+ procspy.startedAcquireBarrier.await();
+
+ // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
+ verify(procspy).sendGlobalBarrierStart();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ verify(procspy, never()).sendGlobalBarrierComplete();
+ verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals
+
+ // member0: [1/2] trigger global barrier acquisition.
+ procspy.barrierAcquiredByMember(members.get(0));
+
+ // coordinator not satisified.
+ verify(procspy).sendGlobalBarrierStart();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ verify(procspy, never()).sendGlobalBarrierComplete();
+
+ // member 1: [2/2] trigger global barrier acquisition.
+ procspy.barrierAcquiredByMember(members.get(1));
+
+ // coordinator: wait for global barrier to be acquired.
+ procspy.startedDuringBarrier.await();
+ verify(procspy).sendGlobalBarrierStart(); // old news
+
+ // member 1, 2: trigger global barrier release
+ procspy.barrierReleasedByMember(members.get(0));
+ procspy.barrierReleasedByMember(members.get(1));
+
+ // coordinator wait for procedure to be completed
+ procspy.completedProcedure.await();
+ verify(procspy).sendGlobalBarrierReached();
+ verify(procspy).sendGlobalBarrierComplete();
+ verify(procspy, never()).receive(any(ForeignException.class));
+ }
+
+ @Test(timeout = 1000)
+ public void testErrorPropagation() throws Exception {
+ List<String> members = new ArrayList<String>();
+ members.add("member");
+ Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100,
+ Integer.MAX_VALUE, "op", null, members);
+ final Procedure procspy = spy(proc);
+
+ ForeignException cause = new ForeignException("SRC", "External Exception");
+ proc.receive(cause);
+
+ // start the barrier procedure
+ Thread t = new Thread() {
+ public void run() {
+ procspy.call();
+ }
+ };
+ t.start();
+ t.join();
+
+ verify(procspy, never()).sendGlobalBarrierStart();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ verify(procspy).sendGlobalBarrierComplete();
+ }
+
+ @Test(timeout = 1000)
+ public void testBarrieredErrorPropagation() throws Exception {
+ List<String> members = new ArrayList<String>();
+ members.add("member");
+ LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+ Integer.MAX_VALUE, "op", null, members);
+ final LatchedProcedure procspy = spy(proc);
+
+ // start the barrier procedure
+ Thread t = new Thread() {
+ public void run() {
+ procspy.call();
+ }
+ };
+ t.start();
+
+ // now test that we can put an error in before the commit phase runs
+ procspy.startedAcquireBarrier.await();
+ ForeignException cause = new ForeignException("SRC", "External Exception");
+ procspy.receive(cause);
+ procspy.barrierAcquiredByMember(members.get(0));
+ t.join();
+
+ // verify state of all the object
+ verify(procspy).sendGlobalBarrierStart();
+ verify(procspy).sendGlobalBarrierComplete();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ }
+}
\ No newline at end of file
Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test Procedure coordinator operation.
+ * <p>
+ * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
+ * level serialization this class will likely throw all kinds of errors.
+ */
+@Category(SmallTests.class)
+public class TestProcedureCoordinator {
+ // general test constants
+ private static final long WAKE_FREQUENCY = 1000;
+ private static final long TIMEOUT = 100000;
+ private static final long POOL_KEEP_ALIVE = 1;
+ private static final String nodeName = "node";
+ private static final String procName = "some op";
+ private static final byte[] procData = new byte[0];
+ private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
+
+ // setup the mocks
+ private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
+ private final Procedure task = mock(Procedure.class);
+ private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
+
+ // handle to the coordinator for each test
+ private ProcedureCoordinator coordinator;
+
+ @After
+ public void resetTest() throws IOException {
+ // reset all the mocks used for the tests
+ reset(controller, task, monitor);
+ // close the open coordinator, if it was used
+ if (coordinator != null) coordinator.close();
+ }
+
+ private ProcedureCoordinator buildNewCoordinator() {
+ ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY);
+ return spy(new ProcedureCoordinator(controller, pool));
+ }
+
+ /**
+ * Currently we can only handle one procedure at a time. This makes sure we handle that and
+ * reject submitting more.
+ */
+ @Test
+ public void testThreadPoolSize() throws Exception {
+ ProcedureCoordinator coordinator = buildNewCoordinator();
+ Procedure proc = new Procedure(coordinator, monitor,
+ WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
+ Procedure procSpy = spy(proc);
+
+ Procedure proc2 = new Procedure(coordinator, monitor,
+ WAKE_FREQUENCY, TIMEOUT, procName +"2", procData, expected);
+ Procedure procSpy2 = spy(proc2);
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+ .thenReturn(procSpy, procSpy2);
+
+ coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
+ // null here means second procedure failed to start.
+ assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
+ coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
+ }
+
+ /**
+ * Check handling a connection failure correctly if we get it during the acquiring phase
+ */
+ @Test(timeout = 5000)
+ public void testUnreachableControllerDuringPrepare() throws Exception {
+ coordinator = buildNewCoordinator();
+ // setup the proc
+ List<String> expected = Arrays.asList("cohort");
+ Procedure proc = new Procedure(coordinator, WAKE_FREQUENCY,
+ TIMEOUT, procName, procData, expected);
+ final Procedure procSpy = spy(proc);
+
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+ .thenReturn(procSpy);
+
+ // use the passed controller responses
+ IOException cause = new IOException("Failed to reach comms during acquire");
+ doThrow(cause).when(controller)
+ .sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyListOf(String.class));
+
+ // run the operation
+ proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
+ // and wait for it to finish
+ proc.waitForCompleted();
+ verify(procSpy, atLeastOnce()).receive(any(ForeignException.class));
+ verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
+ verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
+ verify(controller, never()).sendGlobalBarrierReached(any(Procedure.class),
+ anyListOf(String.class));
+ }
+
+ /**
+ * Check handling a connection failure correctly if we get it during the barrier phase
+ */
+ @Test(timeout = 5000)
+ public void testUnreachableControllerDuringCommit() throws Exception {
+ coordinator = buildNewCoordinator();
+
+ // setup the task and spy on it
+ List<String> expected = Arrays.asList("cohort");
+ final Procedure spy = spy(new Procedure(coordinator,
+ WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
+
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+ .thenReturn(spy);
+
+ // use the passed controller responses
+ IOException cause = new IOException("Failed to reach controller during prepare");
+ doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" }))
+ .when(controller).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class));
+ doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
+
+ // run the operation
+ Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
+ // and wait for it to finish
+ task.waitForCompleted();
+ verify(spy, atLeastOnce()).receive(any(ForeignException.class));
+ verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
+ verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy),
+ eq(procData), anyListOf(String.class));
+ verify(controller, times(1)).sendGlobalBarrierReached(any(Procedure.class),
+ anyListOf(String.class));
+ }
+
+ @Test(timeout = 1000)
+ public void testNoCohort() throws Exception {
+ runSimpleProcedure();
+ }
+
+ @Test(timeout = 1000)
+ public void testSingleCohortOrchestration() throws Exception {
+ runSimpleProcedure("one");
+ }
+
+ @Test(timeout = 1000)
+ public void testMultipleCohortOrchestration() throws Exception {
+ runSimpleProcedure("one", "two", "three", "four");
+ }
+
+ public void runSimpleProcedure(String... members) throws Exception {
+ coordinator = buildNewCoordinator();
+ Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
+ TIMEOUT, procName, procData, Arrays.asList(members));
+ final Procedure spy = spy(task);
+ runCoordinatedProcedure(spy, members);
+ }
+
+ /**
+ * Test that if nodes join the barrier early we still correctly handle the progress
+ */
+ @Test(timeout = 1000)
+ public void testEarlyJoiningBarrier() throws Exception {
+ final String[] cohort = new String[] { "one", "two", "three", "four" };
+ coordinator = buildNewCoordinator();
+ final ProcedureCoordinator ref = coordinator;
+ Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
+ TIMEOUT, procName, procData, Arrays.asList(cohort));
+ final Procedure spy = spy(task);
+
+ AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
+ public void doWork() {
+ // then do some fun where we commit before all nodes have prepared
+ // "one" commits before anyone else is done
+ ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
+ ref.memberFinishedBarrier(this.opName, this.cohort[0]);
+ // but "two" takes a while
+ ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
+ // "three"jumps ahead
+ ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
+ ref.memberFinishedBarrier(this.opName, this.cohort[2]);
+ // and "four" takes a while
+ ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
+ }
+ };
+
+ BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
+ @Override
+ public void doWork() {
+ ref.memberFinishedBarrier(opName, this.cohort[1]);
+ ref.memberFinishedBarrier(opName, this.cohort[3]);
+ }
+ };
+ runCoordinatedOperation(spy, prepare, commit, cohort);
+ }
+
+ /**
+ * Just run a procedure with the standard name and data, with not special task for the mock
+ * coordinator (it works just like a regular coordinator). For custom behavior see
+ * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])}
+ * .
+ * @param spy Spy on a real {@link Procedure}
+ * @param cohort expected cohort members
+ * @throws Exception on failure
+ */
+ public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
+ runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
+ new BarrierAnswer(procName, cohort), cohort);
+ }
+
+ public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare,
+ String... cohort) throws Exception {
+ runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
+ }
+
+ public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit,
+ String... cohort) throws Exception {
+ runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
+ }
+
+ public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
+ BarrierAnswer commitOperation, String... cohort) throws Exception {
+ List<String> expected = Arrays.asList(cohort);
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+ .thenReturn(spy);
+
+ // use the passed controller responses
+ doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
+ doAnswer(commitOperation).when(controller)
+ .sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
+
+ // run the operation
+ Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
+ // and wait for it to finish
+ task.waitForCompleted();
+
+ // make sure we mocked correctly
+ prepareOperation.ensureRan();
+ // we never got an exception
+ InOrder inorder = inOrder(spy, controller);
+ inorder.verify(spy).sendGlobalBarrierStart();
+ inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
+ inorder.verify(spy).sendGlobalBarrierReached();
+ inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class));
+ }
+
+ private abstract class OperationAnswer implements Answer<Void> {
+ private boolean ran = false;
+
+ public void ensureRan() {
+ assertTrue("Prepare mocking didn't actually run!", ran);
+ }
+
+ @Override
+ public final Void answer(InvocationOnMock invocation) throws Throwable {
+ this.ran = true;
+ doWork();
+ return null;
+ }
+
+ protected abstract void doWork() throws Throwable;
+ }
+
+ /**
+ * Just tell the current coordinator that each of the nodes has prepared
+ */
+ private class AcquireBarrierAnswer extends OperationAnswer {
+ protected final String[] cohort;
+ protected final String opName;
+
+ public AcquireBarrierAnswer(String opName, String... cohort) {
+ this.cohort = cohort;
+ this.opName = opName;
+ }
+
+ @Override
+ public void doWork() {
+ if (cohort == null) return;
+ for (String member : cohort) {
+ TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
+ }
+ }
+ }
+
+ /**
+ * Just tell the current coordinator that each of the nodes has committed
+ */
+ private class BarrierAnswer extends OperationAnswer {
+ protected final String[] cohort;
+ protected final String opName;
+
+ public BarrierAnswer(String opName, String... cohort) {
+ this.cohort = cohort;
+ this.opName = opName;
+ }
+
+ @Override
+ public void doWork() {
+ if (cohort == null) return;
+ for (String member : cohort) {
+ TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member);
+ }
+ }
+ }
+}
\ No newline at end of file
Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.TimeoutException;
+import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test the procedure member, and it's error handling mechanisms.
+ */
+@Category(SmallTests.class)
+public class TestProcedureMember {
+ private static final long WAKE_FREQUENCY = 100;
+ private static final long TIMEOUT = 100000;
+ private static final long POOL_KEEP_ALIVE = 1;
+
+ private final String op = "some op";
+ private final byte[] data = new byte[0];
+ private final ForeignExceptionDispatcher mockListener = Mockito
+ .spy(new ForeignExceptionDispatcher());
+ private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
+ private final ProcedureMemberRpcs mockMemberComms = Mockito
+ .mock(ProcedureMemberRpcs.class);
+ private ProcedureMember member;
+ private ForeignExceptionDispatcher dispatcher;
+ Subprocedure spySub;
+
+ /**
+ * Reset all the mock objects
+ */
+ @After
+ public void resetTest() {
+ reset(mockListener, mockBuilder, mockMemberComms);
+ if (member != null)
+ try {
+ member.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Build a member using the class level mocks
+ * @return member to use for tests
+ */
+ private ProcedureMember buildCohortMember() {
+ String name = "node";
+ ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
+ return new ProcedureMember(mockMemberComms, pool, mockBuilder);
+ }
+
+ /**
+ * Setup a procedure member that returns the spied-upon {@link Subprocedure}.
+ */
+ private void buildCohortMemberPair() throws IOException {
+ dispatcher = new ForeignExceptionDispatcher();
+ String name = "node";
+ ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
+ member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
+ when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception
+ Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
+ spySub = spy(subproc);
+ when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
+ addCommitAnswer();
+ }
+
+
+ /**
+ * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
+ */
+ private void addCommitAnswer() throws IOException {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ member.receivedReachedGlobalBarrier(op);
+ return null;
+ }
+ }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
+ }
+
+ /**
+ * Test the normal sub procedure execution case.
+ */
+ @Test(timeout = 500)
+ public void testSimpleRun() throws Exception {
+ member = buildCohortMember();
+ EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
+ EmptySubprocedure spy = spy(subproc);
+ when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
+
+ // when we get a prepare, then start the commit phase
+ addCommitAnswer();
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc1 = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc1);
+ // and wait for it to finish
+ subproc.waitForLocallyCompleted();
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spy);
+ order.verify(spy).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
+ order.verify(spy).insideBarrier();
+ order.verify(mockMemberComms).sendMemberCompleted(eq(spy));
+ order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
+ any(ForeignException.class));
+ }
+
+ /**
+ * Make sure we call cleanup etc, when we have an exception during
+ * {@link Subprocedure#acquireBarrier()}.
+ */
+ @Test(timeout = 1000)
+ public void testMemberPrepareException() throws Exception {
+ buildCohortMemberPair();
+
+ // mock an exception on Subprocedure's prepare
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ throw new IOException("Forced IOException in member acquireBarrier");
+ }
+ }).when(spySub).acquireBarrier();
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ // Later phases not run
+ order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
+ order.verify(spySub, never()).insideBarrier();
+ order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Make sure we call cleanup etc, when we have an exception during prepare.
+ */
+ @Test(timeout = 1000)
+ public void testSendMemberAcquiredCommsFailure() throws Exception {
+ buildCohortMemberPair();
+
+ // mock an exception on Subprocedure's prepare
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ throw new IOException("Forced IOException in memeber prepare");
+ }
+ }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+
+ // Later phases not run
+ order.verify(spySub, never()).insideBarrier();
+ order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Fail correctly if coordinator aborts the procedure. The subprocedure will not interrupt a
+ * running {@link Subprocedure#prepare} -- prepare needs to finish first, and the the abort
+ * is checked. Thus, the {@link Subprocedure#prepare} should succeed but later get rolled back
+ * via {@link Subprocedure#cleanup}.
+ */
+ @Test(timeout = 1000)
+ public void testCoordinatorAbort() throws Exception {
+ buildCohortMemberPair();
+
+ // mock that another node timed out or failed to prepare
+ final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ // inject a remote error (this would have come from an external thread)
+ spySub.cancel("bogus message", oate);
+ // sleep the wake frequency since that is what we promised
+ Thread.sleep(WAKE_FREQUENCY);
+ return null;
+ }
+ }).when(spySub).waitForReachedGlobalBarrier();
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+ // Later phases not run
+ order.verify(spySub, never()).insideBarrier();
+ order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Handle failures if a member's commit phase fails.
+ *
+ * NOTE: This is the core difference that makes this different from traditional 2PC. In true
+ * 2PC the transaction is committed just before the coordinator sends commit messages to the
+ * member. Members are then responsible for reading its TX log. This implementation actually
+ * rolls back, and thus breaks the normal TX guarantees.
+ */
+ @Test(timeout = 1000)
+ public void testMemberCommitException() throws Exception {
+ buildCohortMemberPair();
+
+ // mock an exception on Subprocedure's prepare
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ throw new IOException("Forced IOException in memeber prepare");
+ }
+ }).when(spySub).insideBarrier();
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+ order.verify(spySub).insideBarrier();
+
+ // Later phases not run
+ order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Handle Failures if a member's commit phase succeeds but notification to coordinator fails
+ *
+ * NOTE: This is the core difference that makes this different from traditional 2PC. In true
+ * 2PC the transaction is committed just before the coordinator sends commit messages to the
+ * member. Members are then responsible for reading its TX log. This implementation actually
+ * rolls back, and thus breaks the normal TX guarantees.
+ */
+ @Test(timeout = 1000)
+ public void testMemberCommitCommsFailure() throws Exception {
+ buildCohortMemberPair();
+ final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ // inject a remote error (this would have come from an external thread)
+ spySub.cancel("commit comms fail", oate);
+ // sleep the wake frequency since that is what we promised
+ Thread.sleep(WAKE_FREQUENCY);
+ return null;
+ }
+ }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class));
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+ order.verify(spySub).insideBarrier();
+ order.verify(mockMemberComms).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Fail correctly on getting an external error while waiting for the prepared latch
+ * @throws Exception on failure
+ */
+ @Test(timeout = 1000)
+ public void testPropagateConnectionErrorBackToManager() throws Exception {
+ // setup the operation
+ member = buildCohortMember();
+ ProcedureMember memberSpy = spy(member);
+
+ // setup the commit and the spy
+ final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
+ ForeignExceptionDispatcher dispSpy = spy(dispatcher);
+ Subprocedure commit = new EmptySubprocedure(member, dispatcher);
+ Subprocedure spy = spy(commit);
+ when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
+
+ // fail during the prepare phase
+ doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
+ // and throw a connection error when we try to tell the controller about it
+ doThrow(new IOException("Controller is down!")).when(mockMemberComms)
+ .sendMemberAborted(eq(spy), any(ForeignException.class));
+
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = memberSpy.createSubprocedure(op, data);
+ memberSpy.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ memberSpy.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spy, dispSpy);
+ // make sure we acquire.
+ order.verify(spy).acquireBarrier();
+ order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
+
+ // TODO Need to do another refactor to get this to propagate to the coordinator.
+ // make sure we pass a remote exception back the controller
+// order.verify(mockMemberComms).sendMemberAborted(eq(spy),
+// any(ExternalException.class));
+// order.verify(dispSpy).receiveError(anyString(),
+// any(ExternalException.class), any());
+ }
+
+ /**
+ * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
+ * correctly build a new task for the requested operation
+ * @throws Exception on failure
+ */
+ @Test
+ public void testNoTaskToBeRunFromRequest() throws Exception {
+ ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
+ when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
+ .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
+ member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
+ // builder returns null
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // throws an illegal state exception
+ try {
+ // build a new operation
+ Subprocedure subproc2 = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc2);
+ } catch (IllegalStateException ise) {
+ }
+ // throws an illegal argument exception
+ try {
+ // build a new operation
+ Subprocedure subproc3 = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc3);
+ } catch (IllegalArgumentException iae) {
+ }
+
+ // no request should reach the pool
+ verifyZeroInteractions(pool);
+ // get two abort requests
+ // TODO Need to do another refactor to get this to propagate to the coordinator.
+ // verify(mockMemberComms, times(2)).sendMemberAborted(any(Subprocedure.class), any(ExternalException.class));
+ }
+
+ /**
+ * Helper {@link Procedure} who's phase for each step is just empty
+ */
+ public class EmptySubprocedure extends SubprocedureImpl {
+ public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
+ super( member, op, dispatcher,
+ // TODO 1000000 is an arbitrary number that I picked.
+ WAKE_FREQUENCY, TIMEOUT);
+ }
+ }
+}
\ No newline at end of file
Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.TimeoutException;
+import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.internal.matchers.ArrayEquals;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
+ */
+@Category(MediumTests.class)
+public class TestZKProcedure {
+
+ private static final Log LOG = LogFactory.getLog(TestZKProcedure.class);
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final String COORDINATOR_NODE_NAME = "coordinator";
+ private static final long KEEP_ALIVE = 100; // seconds
+ private static final int POOL_SIZE = 1;
+ private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
+ private static final long WAKE_FREQUENCY = 500;
+ private static final String opName = "op";
+ private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
+ private static final VerificationMode once = Mockito.times(1);
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+ UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ UTIL.shutdownMiniZKCluster();
+ }
+
+ private static ZooKeeperWatcher newZooKeeperWatcher() throws IOException {
+ return new ZooKeeperWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
+ @Override
+ public void abort(String why, Throwable e) {
+ throw new RuntimeException(
+ "Unexpected abort in distributed three phase commit test:" + why, e);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void testEmptyMemberSet() throws Exception {
+ runCommit();
+ }
+
+ @Test
+ public void testSingleMember() throws Exception {
+ runCommit("one");
+ }
+
+ @Test
+ public void testMultipleMembers() throws Exception {
+ runCommit("one", "two", "three", "four" );
+ }
+
+ private void runCommit(String... members) throws Exception {
+ // make sure we just have an empty list
+ if (members == null) {
+ members = new String[0];
+ }
+ List<String> expected = Arrays.asList(members);
+
+ // setup the constants
+ ZooKeeperWatcher coordZkw = newZooKeeperWatcher();
+ String opDescription = "coordination test - " + members.length + " cohort members";
+
+ // start running the controller
+ ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
+ coordZkw, opDescription, COORDINATOR_NODE_NAME);
+ ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
+ ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
+ @Override
+ public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
+ List<String> expectedMembers) {
+ return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
+ }
+ };
+
+ // build and start members
+ // NOTE: There is a single subprocedure builder for all members here.
+ SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
+ List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
+ members.length);
+ // start each member
+ for (String member : members) {
+ ZooKeeperWatcher watcher = newZooKeeperWatcher();
+ ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+ ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
+ ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
+ procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
+ comms.start(procMember);
+ }
+
+ // setup mock member subprocedures
+ final List<Subprocedure> subprocs = new ArrayList<Subprocedure>();
+ for (int i = 0; i < procMembers.size(); i++) {
+ ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
+ Subprocedure commit = Mockito
+ .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
+ TIMEOUT, WAKE_FREQUENCY));
+ subprocs.add(commit);
+ }
+
+ // link subprocedure to buildNewOperation invocation.
+ final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
+ Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
+ (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
+ new Answer<Subprocedure>() {
+ @Override
+ public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
+ int index = i.getAndIncrement();
+ LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
+ Subprocedure commit = subprocs.get(index);
+ return commit;
+ }
+ });
+
+ // setup spying on the coordinator
+// Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
+// Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
+
+ // start running the operation
+ Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
+// assertEquals("Didn't mock coordinator task", proc, task);
+
+ // verify all things ran as expected
+// waitAndVerifyProc(proc, once, once, never(), once, false);
+ waitAndVerifyProc(task, once, once, never(), once, false);
+ verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
+
+ // close all the things
+ closeAll(coordinator, coordinatorComms, procMembers);
+ }
+
+ /**
+ * Test a distributed commit with multiple cohort members, where one of the cohort members has a
+ * timeout exception during the prepare stage.
+ */
+ @Test
+ public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
+ String opDescription = "error injection coordination";
+ String[] cohortMembers = new String[] { "one", "two", "three" };
+ List<String> expected = Lists.newArrayList(cohortMembers);
+ // error constants
+ final int memberErrorIndex = 2;
+ final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
+
+ // start running the coordinator and its controller
+ ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
+ ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
+ coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
+ ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
+ ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
+
+ // start a member for each node
+ SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
+ List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
+ expected.size());
+ for (String member : expected) {
+ ZooKeeperWatcher watcher = newZooKeeperWatcher();
+ ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+ ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
+ ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
+ members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
+ controller.start(mem);
+ }
+
+ // setup mock subprocedures
+ final List<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
+ final int[] elem = new int[1];
+ for (int i = 0; i < members.size(); i++) {
+ ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
+ ProcedureMember comms = members.get(i).getFirst();
+ Subprocedure commit = Mockito
+ .spy(new SubprocedureImpl(comms, opName, cohortMonitor, TIMEOUT, WAKE_FREQUENCY));
+ // This nasty bit has one of the impls throw a TimeoutException
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ int index = elem[0];
+ if (index == memberErrorIndex) {
+ LOG.debug("Sending error to coordinator");
+ ForeignException remoteCause = new ForeignException("TIMER",
+ new TimeoutException("subprocTimeout" , 1, 2, 0));
+ Subprocedure r = ((Subprocedure) invocation.getMock());
+ LOG.error("Remote commit failure, not propagating error:" + remoteCause);
+ r.monitor.receive(remoteCause);
+ // don't complete the error phase until the coordinator has gotten the error
+ // notification (which ensures that we never progress past prepare)
+ try {
+ Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
+ WAKE_FREQUENCY, "coordinator received error");
+ } catch (InterruptedException e) {
+ LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
+ // reset the interrupt status on the thread
+ Thread.currentThread().interrupt();
+ }
+ }
+ elem[0] = ++index;
+ return null;
+ }
+ }).when(commit).acquireBarrier();
+ cohortTasks.add(commit);
+ }
+
+ // pass out a task per member
+ final int[] i = new int[] { 0 };
+ Mockito.when(
+ subprocFactory.buildSubprocedure(Mockito.eq(opName),
+ (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
+ new Answer<Subprocedure>() {
+ @Override
+ public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
+ int index = i[0];
+ Subprocedure commit = cohortTasks.get(index);
+ index++;
+ i[0] = index;
+ return commit;
+ }
+ });
+
+ // setup spying on the coordinator
+ ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
+ .spy(new ForeignExceptionDispatcher());
+ Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
+ coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
+ opName, data, expected));
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(opName), eq(data), anyListOf(String.class)))
+ .thenReturn(coordinatorTask);
+ // count down the error latch when we get the remote error
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ // pass on the error to the master
+ invocation.callRealMethod();
+ // then count down the got error latch
+ coordinatorReceivedErrorLatch.countDown();
+ return null;
+ }
+ }).when(coordinatorTask).receive(Mockito.any(ForeignException.class));
+
+ // ----------------------------
+ // start running the operation
+ // ----------------------------
+
+ Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
+ assertEquals("Didn't mock coordinator task", coordinatorTask, task);
+
+ // wait for the task to complete
+ try {
+ task.waitForCompleted();
+ } catch (ForeignException fe) {
+ // this may get caught or may not
+ }
+
+ // -------------
+ // verification
+ // -------------
+ waitAndVerifyProc(coordinatorTask, once, never(), once, once, true);
+ verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
+ once, true);
+
+ // close all the open things
+ closeAll(coordinator, coordinatorController, members);
+ }
+
+ /**
+ * Wait for the coordinator task to complete, and verify all the mocks
+ * @param task to wait on
+ * @throws Exception on unexpected failure
+ */
+ private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
+ VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
+ throws Exception {
+ boolean caughtError = false;
+ try {
+ proc.waitForCompleted();
+ } catch (ForeignException fe) {
+ caughtError = true;
+ }
+ // make sure that the task called all the expected phases
+ Mockito.verify(proc, prepare).sendGlobalBarrierStart();
+ Mockito.verify(proc, commit).sendGlobalBarrierReached();
+ Mockito.verify(proc, finish).sendGlobalBarrierComplete();
+ assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
+ .hasException());
+ assertEquals("Operation error state was unexpected", opHasError, caughtError);
+
+ }
+
+ /**
+ * Wait for the coordinator task to complete, and verify all the mocks
+ * @param task to wait on
+ * @throws Exception on unexpected failure
+ */
+ private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
+ VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
+ throws Exception {
+ boolean caughtError = false;
+ try {
+ op.waitForLocallyCompleted();
+ } catch (ForeignException fe) {
+ caughtError = true;
+ }
+ // make sure that the task called all the expected phases
+ Mockito.verify(op, prepare).acquireBarrier();
+ Mockito.verify(op, commit).insideBarrier();
+ // We cannot guarantee that cleanup has run so we don't check it.
+
+ assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
+ .hasException());
+ assertEquals("Operation error state was unexpected", opHasError, caughtError);
+
+ }
+
+ private void verifyCohortSuccessful(List<String> cohortNames,
+ SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
+ VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
+ VerificationMode finish, boolean opHasError) throws Exception {
+
+ // make sure we build the correct number of cohort members
+ Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
+ Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
+ // verify that we ran each of the operations cleanly
+ int j = 0;
+ for (Subprocedure op : cohortTasks) {
+ LOG.debug("Checking mock:" + (j++));
+ waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
+ }
+ }
+
+ private void closeAll(
+ ProcedureCoordinator coordinator,
+ ZKProcedureCoordinatorRpcs coordinatorController,
+ List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
+ throws IOException {
+ // make sure we close all the resources
+ for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
+ member.getFirst().close();
+ member.getSecond().close();
+ }
+ coordinator.close();
+ coordinatorController.close();
+ }
+}