You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2013/03/04 12:24:53 UTC

svn commit: r1452257 [12/14] - in /hbase/branches/0.94: security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/test/java/org/apache/hadoop/hbase/security/access/ src/main/jamon/org/apache/hadoop/hbase/tmpl/master/ src/main/java/or...

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Demonstrate how Procedure handles single members, multiple members, and errors semantics
+ */
+@Category(SmallTests.class)
+public class TestProcedure {
+
+  ProcedureCoordinator coord;
+
+  @Before
+  public void setup() {
+    coord = mock(ProcedureCoordinator.class);
+    final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class);
+    when(coord.getRpcs()).thenReturn(comms); // make it not null
+  }
+
+  class LatchedProcedure extends Procedure {
+    CountDownLatch startedAcquireBarrier = new CountDownLatch(1);
+    CountDownLatch startedDuringBarrier = new CountDownLatch(1);
+    CountDownLatch completedProcedure = new CountDownLatch(1);
+
+    public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor,
+        long wakeFreq, long timeout, String opName, byte[] data,
+        List<String> expectedMembers) {
+      super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers);
+    }
+
+    @Override
+    public void sendGlobalBarrierStart() {
+      startedAcquireBarrier.countDown();
+    }
+
+    @Override
+    public void sendGlobalBarrierReached() {
+      startedDuringBarrier.countDown();
+    }
+
+    @Override
+    public void sendGlobalBarrierComplete() {
+      completedProcedure.countDown();
+    }
+  };
+
+  /**
+   * With a single member, verify ordered execution.  The Coordinator side is run in a separate
+   * thread so we can only trigger from members and wait for particular state latches.
+   */
+  @Test(timeout = 1000)
+  public void testSingleMember() throws Exception {
+    // The member
+    List<String> members =  new ArrayList<String>();
+    members.add("member");
+    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+        Integer.MAX_VALUE, "op", null, members);
+    final LatchedProcedure procspy = spy(proc);
+    // coordinator: start the barrier procedure
+    new Thread() {
+      public void run() {
+        procspy.call();
+      }
+    }.start();
+
+    // coordinator: wait for the barrier to be acquired, then send start barrier
+    proc.startedAcquireBarrier.await();
+
+    // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
+    verify(procspy).sendGlobalBarrierStart();
+    verify(procspy, never()).sendGlobalBarrierReached();
+    verify(procspy, never()).sendGlobalBarrierComplete();
+    verify(procspy, never()).barrierAcquiredByMember(anyString());
+
+    // member: trigger global barrier acquisition
+    proc.barrierAcquiredByMember(members.get(0));
+
+    // coordinator: wait for global barrier to be acquired.
+    proc.acquiredBarrierLatch.await();
+    verify(procspy).sendGlobalBarrierStart(); // old news
+
+    // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
+    // or was not called here.
+
+    // member: trigger global barrier release
+    proc.barrierReleasedByMember(members.get(0));
+
+    // coordinator: wait for procedure to be completed
+    proc.completedProcedure.await();
+    verify(procspy).sendGlobalBarrierReached();
+    verify(procspy).sendGlobalBarrierComplete();
+    verify(procspy, never()).receive(any(ForeignException.class));
+  }
+
+  @Test(timeout=1000)
+  public void testMultipleMember() throws Exception {
+    // 2 members
+    List<String> members =  new ArrayList<String>();
+    members.add("member1");
+    members.add("member2");
+
+    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+        Integer.MAX_VALUE, "op", null, members);
+    final LatchedProcedure procspy = spy(proc);
+    // start the barrier procedure
+    new Thread() {
+      public void run() {
+        procspy.call();
+      }
+    }.start();
+
+    // coordinator: wait for the barrier to be acquired, then send start barrier
+    procspy.startedAcquireBarrier.await();
+
+    // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
+    verify(procspy).sendGlobalBarrierStart();
+    verify(procspy, never()).sendGlobalBarrierReached();
+    verify(procspy, never()).sendGlobalBarrierComplete();
+    verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals
+
+    // member0: [1/2] trigger global barrier acquisition.
+    procspy.barrierAcquiredByMember(members.get(0));
+
+    // coordinator not satisified.
+    verify(procspy).sendGlobalBarrierStart();
+    verify(procspy, never()).sendGlobalBarrierReached();
+    verify(procspy, never()).sendGlobalBarrierComplete();
+
+    // member 1: [2/2] trigger global barrier acquisition.
+    procspy.barrierAcquiredByMember(members.get(1));
+
+    // coordinator: wait for global barrier to be acquired.
+    procspy.startedDuringBarrier.await();
+    verify(procspy).sendGlobalBarrierStart(); // old news
+
+    // member 1, 2: trigger global barrier release
+    procspy.barrierReleasedByMember(members.get(0));
+    procspy.barrierReleasedByMember(members.get(1));
+
+    // coordinator wait for procedure to be completed
+    procspy.completedProcedure.await();
+    verify(procspy).sendGlobalBarrierReached();
+    verify(procspy).sendGlobalBarrierComplete();
+    verify(procspy, never()).receive(any(ForeignException.class));
+  }
+
+  @Test(timeout = 1000)
+  public void testErrorPropagation() throws Exception {
+    List<String> members =  new ArrayList<String>();
+    members.add("member");
+    Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100,
+        Integer.MAX_VALUE, "op", null, members);
+    final Procedure procspy = spy(proc);
+
+    ForeignException cause = new ForeignException("SRC", "External Exception");
+    proc.receive(cause);
+
+    // start the barrier procedure
+    Thread t = new Thread() {
+      public void run() {
+        procspy.call();
+      }
+    };
+    t.start();
+    t.join();
+
+    verify(procspy, never()).sendGlobalBarrierStart();
+    verify(procspy, never()).sendGlobalBarrierReached();
+    verify(procspy).sendGlobalBarrierComplete();
+  }
+
+  @Test(timeout = 1000)
+  public void testBarrieredErrorPropagation() throws Exception {
+    List<String> members =  new ArrayList<String>();
+    members.add("member");
+    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+        Integer.MAX_VALUE, "op", null, members);
+    final LatchedProcedure procspy = spy(proc);
+
+    // start the barrier procedure
+    Thread t = new Thread() {
+      public void run() {
+        procspy.call();
+      }
+    };
+    t.start();
+
+    // now test that we can put an error in before the commit phase runs
+    procspy.startedAcquireBarrier.await();
+    ForeignException cause = new ForeignException("SRC", "External Exception");
+    procspy.receive(cause);
+    procspy.barrierAcquiredByMember(members.get(0));
+    t.join();
+
+    // verify state of all the object
+    verify(procspy).sendGlobalBarrierStart();
+    verify(procspy).sendGlobalBarrierComplete();
+    verify(procspy, never()).sendGlobalBarrierReached();
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test Procedure coordinator operation.
+ * <p>
+ * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
+ * level serialization this class will likely throw all kinds of errors.
+ */
+@Category(SmallTests.class)
+public class TestProcedureCoordinator {
+  // general test constants
+  private static final long WAKE_FREQUENCY = 1000;
+  private static final long TIMEOUT = 100000;
+  private static final long POOL_KEEP_ALIVE = 1;
+  private static final String nodeName = "node";
+  private static final String procName = "some op";
+  private static final byte[] procData = new byte[0];
+  private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
+
+  // setup the mocks
+  private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
+  private final Procedure task = mock(Procedure.class);
+  private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
+
+  // handle to the coordinator for each test
+  private ProcedureCoordinator coordinator;
+
+  @After
+  public void resetTest() throws IOException {
+    // reset all the mocks used for the tests
+    reset(controller, task, monitor);
+    // close the open coordinator, if it was used
+    if (coordinator != null) coordinator.close();
+  }
+
+  private ProcedureCoordinator buildNewCoordinator() {
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY);
+    return spy(new ProcedureCoordinator(controller, pool));
+  }
+
+  /**
+   * Currently we can only handle one procedure at a time.  This makes sure we handle that and
+   * reject submitting more.
+   */
+  @Test
+  public void testThreadPoolSize() throws Exception {
+    ProcedureCoordinator coordinator = buildNewCoordinator();
+    Procedure proc = new Procedure(coordinator,  monitor,
+        WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
+    Procedure procSpy = spy(proc);
+
+    Procedure proc2 = new Procedure(coordinator,  monitor,
+        WAKE_FREQUENCY, TIMEOUT, procName +"2", procData, expected);
+    Procedure procSpy2 = spy(proc2);
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+    .thenReturn(procSpy, procSpy2);
+
+    coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
+    // null here means second procedure failed to start.
+    assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
+      coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
+  }
+
+  /**
+   * Check handling a connection failure correctly if we get it during the acquiring phase
+   */
+  @Test(timeout = 5000)
+  public void testUnreachableControllerDuringPrepare() throws Exception {
+    coordinator = buildNewCoordinator();
+    // setup the proc
+    List<String> expected = Arrays.asList("cohort");
+    Procedure proc = new Procedure(coordinator, WAKE_FREQUENCY,
+        TIMEOUT, procName, procData, expected);
+    final Procedure procSpy = spy(proc);
+
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+        .thenReturn(procSpy);
+
+    // use the passed controller responses
+    IOException cause = new IOException("Failed to reach comms during acquire");
+    doThrow(cause).when(controller)
+        .sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyListOf(String.class));
+
+    // run the operation
+    proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
+    // and wait for it to finish
+    proc.waitForCompleted();
+    verify(procSpy, atLeastOnce()).receive(any(ForeignException.class));
+    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
+    verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
+    verify(controller, never()).sendGlobalBarrierReached(any(Procedure.class),
+        anyListOf(String.class));
+  }
+
+  /**
+   * Check handling a connection failure correctly if we get it during the barrier phase
+   */
+  @Test(timeout = 5000)
+  public void testUnreachableControllerDuringCommit() throws Exception {
+    coordinator = buildNewCoordinator();
+
+    // setup the task and spy on it
+    List<String> expected = Arrays.asList("cohort");
+    final Procedure spy = spy(new Procedure(coordinator,
+        WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
+
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+    .thenReturn(spy);
+
+    // use the passed controller responses
+    IOException cause = new IOException("Failed to reach controller during prepare");
+    doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" }))
+        .when(controller).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class));
+    doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
+
+    // run the operation
+    Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
+    // and wait for it to finish
+    task.waitForCompleted();
+    verify(spy, atLeastOnce()).receive(any(ForeignException.class));
+    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
+    verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy),
+        eq(procData), anyListOf(String.class));
+    verify(controller, times(1)).sendGlobalBarrierReached(any(Procedure.class),
+        anyListOf(String.class));
+  }
+
+  @Test(timeout = 1000)
+  public void testNoCohort() throws Exception {
+    runSimpleProcedure();
+  }
+
+  @Test(timeout = 1000)
+  public void testSingleCohortOrchestration() throws Exception {
+    runSimpleProcedure("one");
+  }
+
+  @Test(timeout = 1000)
+  public void testMultipleCohortOrchestration() throws Exception {
+    runSimpleProcedure("one", "two", "three", "four");
+  }
+
+  public void runSimpleProcedure(String... members) throws Exception {
+    coordinator = buildNewCoordinator();
+    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
+        TIMEOUT, procName, procData, Arrays.asList(members));
+    final Procedure spy = spy(task);
+    runCoordinatedProcedure(spy, members);
+  }
+
+  /**
+   * Test that if nodes join the barrier early we still correctly handle the progress
+   */
+  @Test(timeout = 1000)
+  public void testEarlyJoiningBarrier() throws Exception {
+    final String[] cohort = new String[] { "one", "two", "three", "four" };
+    coordinator = buildNewCoordinator();
+    final ProcedureCoordinator ref = coordinator;
+    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
+        TIMEOUT, procName, procData, Arrays.asList(cohort));
+    final Procedure spy = spy(task);
+
+    AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
+      public void doWork() {
+        // then do some fun where we commit before all nodes have prepared
+        // "one" commits before anyone else is done
+        ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
+        ref.memberFinishedBarrier(this.opName, this.cohort[0]);
+        // but "two" takes a while
+        ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
+        // "three"jumps ahead
+        ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
+        ref.memberFinishedBarrier(this.opName, this.cohort[2]);
+        // and "four" takes a while
+        ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
+      }
+    };
+
+    BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
+      @Override
+      public void doWork() {
+        ref.memberFinishedBarrier(opName, this.cohort[1]);
+        ref.memberFinishedBarrier(opName, this.cohort[3]);
+      }
+    };
+    runCoordinatedOperation(spy, prepare, commit, cohort);
+  }
+
+  /**
+   * Just run a procedure with the standard name and data, with not special task for the mock
+   * coordinator (it works just like a regular coordinator). For custom behavior see
+   * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])}
+   * .
+   * @param spy Spy on a real {@link Procedure}
+   * @param cohort expected cohort members
+   * @throws Exception on failure
+   */
+  public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
+    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
+      new BarrierAnswer(procName, cohort), cohort);
+  }
+
+  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare,
+      String... cohort) throws Exception {
+    runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
+  }
+
+  public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit,
+      String... cohort) throws Exception {
+    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
+  }
+
+  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
+      BarrierAnswer commitOperation, String... cohort) throws Exception {
+    List<String> expected = Arrays.asList(cohort);
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+      .thenReturn(spy);
+
+    // use the passed controller responses
+    doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
+    doAnswer(commitOperation).when(controller)
+        .sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
+
+    // run the operation
+    Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
+    // and wait for it to finish
+    task.waitForCompleted();
+
+    // make sure we mocked correctly
+    prepareOperation.ensureRan();
+    // we never got an exception
+    InOrder inorder = inOrder(spy, controller);
+    inorder.verify(spy).sendGlobalBarrierStart();
+    inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
+    inorder.verify(spy).sendGlobalBarrierReached();
+    inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class));
+  }
+
+  private abstract class OperationAnswer implements Answer<Void> {
+    private boolean ran = false;
+
+    public void ensureRan() {
+      assertTrue("Prepare mocking didn't actually run!", ran);
+    }
+
+    @Override
+    public final Void answer(InvocationOnMock invocation) throws Throwable {
+      this.ran = true;
+      doWork();
+      return null;
+    }
+
+    protected abstract void doWork() throws Throwable;
+  }
+
+  /**
+   * Just tell the current coordinator that each of the nodes has prepared
+   */
+  private class AcquireBarrierAnswer extends OperationAnswer {
+    protected final String[] cohort;
+    protected final String opName;
+
+    public AcquireBarrierAnswer(String opName, String... cohort) {
+      this.cohort = cohort;
+      this.opName = opName;
+    }
+
+    @Override
+    public void doWork() {
+      if (cohort == null) return;
+      for (String member : cohort) {
+        TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
+      }
+    }
+  }
+
+  /**
+   * Just tell the current coordinator that each of the nodes has committed
+   */
+  private class BarrierAnswer extends OperationAnswer {
+    protected final String[] cohort;
+    protected final String opName;
+
+    public BarrierAnswer(String opName, String... cohort) {
+      this.cohort = cohort;
+      this.opName = opName;
+    }
+
+    @Override
+    public void doWork() {
+      if (cohort == null) return;
+      for (String member : cohort) {
+        TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member);
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.TimeoutException;
+import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test the procedure member, and it's error handling mechanisms.
+ */
+@Category(SmallTests.class)
+public class TestProcedureMember {
+  private static final long WAKE_FREQUENCY = 100;
+  private static final long TIMEOUT = 100000;
+  private static final long POOL_KEEP_ALIVE = 1;
+
+  private final String op = "some op";
+  private final byte[] data = new byte[0];
+  private final ForeignExceptionDispatcher mockListener = Mockito
+      .spy(new ForeignExceptionDispatcher());
+  private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
+  private final ProcedureMemberRpcs mockMemberComms = Mockito
+      .mock(ProcedureMemberRpcs.class);
+  private ProcedureMember member;
+  private ForeignExceptionDispatcher dispatcher;
+  Subprocedure spySub;
+
+  /**
+   * Reset all the mock objects
+   */
+  @After
+  public void resetTest() {
+    reset(mockListener, mockBuilder, mockMemberComms);
+    if (member != null)
+      try {
+        member.close();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+  }
+
+  /**
+   * Build a member using the class level mocks
+   * @return member to use for tests
+   */
+  private ProcedureMember buildCohortMember() {
+    String name = "node";
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
+    return new ProcedureMember(mockMemberComms, pool, mockBuilder);
+  }
+
+  /**
+   * Setup a procedure member that returns the spied-upon {@link Subprocedure}.
+   */
+  private void buildCohortMemberPair() throws IOException {
+    dispatcher = new ForeignExceptionDispatcher();
+    String name = "node";
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
+    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
+    when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception
+    Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
+    spySub = spy(subproc);
+    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
+    addCommitAnswer();
+  }
+
+
+  /**
+   * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
+   */
+  private void addCommitAnswer() throws IOException {
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        member.receivedReachedGlobalBarrier(op);
+        return null;
+      }
+    }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
+  }
+
+  /**
+   * Test the normal sub procedure execution case.
+   */
+  @Test(timeout = 500)
+  public void testSimpleRun() throws Exception {
+    member = buildCohortMember();
+    EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
+    EmptySubprocedure spy = spy(subproc);
+    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
+
+    // when we get a prepare, then start the commit phase
+    addCommitAnswer();
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc1 = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc1);
+    // and wait for it to finish
+    subproc.waitForLocallyCompleted();
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spy);
+    order.verify(spy).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
+    order.verify(spy).insideBarrier();
+    order.verify(mockMemberComms).sendMemberCompleted(eq(spy));
+    order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
+        any(ForeignException.class));
+  }
+
+  /**
+   * Make sure we call cleanup etc, when we have an exception during
+   * {@link Subprocedure#acquireBarrier()}.
+   */
+  @Test(timeout = 1000)
+  public void testMemberPrepareException() throws Exception {
+    buildCohortMemberPair();
+
+    // mock an exception on Subprocedure's prepare
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            throw new IOException("Forced IOException in member acquireBarrier");
+          }
+        }).when(spySub).acquireBarrier();
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    // Later phases not run
+    order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
+    order.verify(spySub, never()).insideBarrier();
+    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Make sure we call cleanup etc, when we have an exception during prepare.
+   */
+  @Test(timeout = 1000)
+  public void testSendMemberAcquiredCommsFailure() throws Exception {
+    buildCohortMemberPair();
+
+    // mock an exception on Subprocedure's prepare
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            throw new IOException("Forced IOException in memeber prepare");
+          }
+        }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+
+    // Later phases not run
+    order.verify(spySub, never()).insideBarrier();
+    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Fail correctly if coordinator aborts the procedure.  The subprocedure will not interrupt a
+   * running {@link Subprocedure#prepare} -- prepare needs to finish first, and the the abort
+   * is checked.  Thus, the {@link Subprocedure#prepare} should succeed but later get rolled back
+   * via {@link Subprocedure#cleanup}.
+   */
+  @Test(timeout = 1000)
+  public void testCoordinatorAbort() throws Exception {
+    buildCohortMemberPair();
+
+    // mock that another node timed out or failed to prepare
+    final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            // inject a remote error (this would have come from an external thread)
+            spySub.cancel("bogus message", oate);
+            // sleep the wake frequency since that is what we promised
+            Thread.sleep(WAKE_FREQUENCY);
+            return null;
+          }
+        }).when(spySub).waitForReachedGlobalBarrier();
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+    // Later phases not run
+    order.verify(spySub, never()).insideBarrier();
+    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Handle failures if a member's commit phase fails.
+   *
+   * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
+   * 2PC the transaction is committed just before the coordinator sends commit messages to the
+   * member.  Members are then responsible for reading its TX log.  This implementation actually
+   * rolls back, and thus breaks the normal TX guarantees.
+  */
+  @Test(timeout = 1000)
+  public void testMemberCommitException() throws Exception {
+    buildCohortMemberPair();
+
+    // mock an exception on Subprocedure's prepare
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            throw new IOException("Forced IOException in memeber prepare");
+          }
+        }).when(spySub).insideBarrier();
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+    order.verify(spySub).insideBarrier();
+
+    // Later phases not run
+    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Handle Failures if a member's commit phase succeeds but notification to coordinator fails
+   *
+   * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
+   * 2PC the transaction is committed just before the coordinator sends commit messages to the
+   * member.  Members are then responsible for reading its TX log.  This implementation actually
+   * rolls back, and thus breaks the normal TX guarantees.
+  */
+  @Test(timeout = 1000)
+  public void testMemberCommitCommsFailure() throws Exception {
+    buildCohortMemberPair();
+    final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            // inject a remote error (this would have come from an external thread)
+            spySub.cancel("commit comms fail", oate);
+            // sleep the wake frequency since that is what we promised
+            Thread.sleep(WAKE_FREQUENCY);
+            return null;
+          }
+        }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class));
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+    order.verify(spySub).insideBarrier();
+    order.verify(mockMemberComms).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Fail correctly on getting an external error while waiting for the prepared latch
+   * @throws Exception on failure
+   */
+  @Test(timeout = 1000)
+  public void testPropagateConnectionErrorBackToManager() throws Exception {
+    // setup the operation
+    member = buildCohortMember();
+    ProcedureMember memberSpy = spy(member);
+
+    // setup the commit and the spy
+    final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
+    ForeignExceptionDispatcher dispSpy = spy(dispatcher);
+    Subprocedure commit = new EmptySubprocedure(member, dispatcher);
+    Subprocedure spy = spy(commit);
+    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
+
+    // fail during the prepare phase
+    doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
+    // and throw a connection error when we try to tell the controller about it
+    doThrow(new IOException("Controller is down!")).when(mockMemberComms)
+        .sendMemberAborted(eq(spy), any(ForeignException.class));
+
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = memberSpy.createSubprocedure(op, data);
+    memberSpy.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    memberSpy.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spy, dispSpy);
+    // make sure we acquire.
+    order.verify(spy).acquireBarrier();
+    order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
+
+    // TODO Need to do another refactor to get this to propagate to the coordinator.
+    // make sure we pass a remote exception back the controller
+//    order.verify(mockMemberComms).sendMemberAborted(eq(spy),
+//      any(ExternalException.class));
+//    order.verify(dispSpy).receiveError(anyString(),
+//        any(ExternalException.class), any());
+  }
+
+  /**
+   * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
+   * correctly build a new task for the requested operation
+   * @throws Exception on failure
+   */
+  @Test
+  public void testNoTaskToBeRunFromRequest() throws Exception {
+    ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
+    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
+      .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
+    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
+    // builder returns null
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // throws an illegal state exception
+    try {
+      // build a new operation
+      Subprocedure subproc2 = member.createSubprocedure(op, data);
+      member.submitSubprocedure(subproc2);
+    } catch (IllegalStateException ise) {
+    }
+    // throws an illegal argument exception
+    try {
+      // build a new operation
+      Subprocedure subproc3 = member.createSubprocedure(op, data);
+      member.submitSubprocedure(subproc3);
+    } catch (IllegalArgumentException iae) {
+    }
+
+    // no request should reach the pool
+    verifyZeroInteractions(pool);
+    // get two abort requests
+    // TODO Need to do another refactor to get this to propagate to the coordinator.
+    // verify(mockMemberComms, times(2)).sendMemberAborted(any(Subprocedure.class), any(ExternalException.class));
+  }
+
+  /**
+   * Helper {@link Procedure} who's phase for each step is just empty
+   */
+  public class EmptySubprocedure extends SubprocedureImpl {
+    public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
+      super( member, op, dispatcher,
+      // TODO 1000000 is an arbitrary number that I picked.
+          WAKE_FREQUENCY, TIMEOUT);
+    }
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.TimeoutException;
+import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.internal.matchers.ArrayEquals;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
+ */
+@Category(MediumTests.class)
+public class TestZKProcedure {
+
+  private static final Log LOG = LogFactory.getLog(TestZKProcedure.class);
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final String COORDINATOR_NODE_NAME = "coordinator";
+  private static final long KEEP_ALIVE = 100; // seconds
+  private static final int POOL_SIZE = 1;
+  private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
+  private static final long WAKE_FREQUENCY = 500;
+  private static final String opName = "op";
+  private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
+  private static final VerificationMode once = Mockito.times(1);
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    UTIL.startMiniZKCluster();
+  }
+
+  @AfterClass
+  public static void cleanupTest() throws Exception {
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  private static ZooKeeperWatcher newZooKeeperWatcher() throws IOException {
+    return new ZooKeeperWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
+      @Override
+      public void abort(String why, Throwable e) {
+        throw new RuntimeException(
+            "Unexpected abort in distributed three phase commit test:" + why, e);
+      }
+
+      @Override
+      public boolean isAborted() {
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void testEmptyMemberSet() throws Exception {
+    runCommit();
+  }
+
+  @Test
+  public void testSingleMember() throws Exception {
+    runCommit("one");
+  }
+
+  @Test
+  public void testMultipleMembers() throws Exception {
+    runCommit("one", "two", "three", "four" );
+  }
+
+  private void runCommit(String... members) throws Exception {
+    // make sure we just have an empty list
+    if (members == null) {
+      members = new String[0];
+    }
+    List<String> expected = Arrays.asList(members);
+
+    // setup the constants
+    ZooKeeperWatcher coordZkw = newZooKeeperWatcher();
+    String opDescription = "coordination test - " + members.length + " cohort members";
+
+    // start running the controller
+    ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
+        coordZkw, opDescription, COORDINATOR_NODE_NAME);
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
+    ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
+      @Override
+      public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
+          List<String> expectedMembers) {
+        return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
+      }
+    };
+
+    // build and start members
+    // NOTE: There is a single subprocedure builder for all members here.
+    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
+    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
+        members.length);
+    // start each member
+    for (String member : members) {
+      ZooKeeperWatcher watcher = newZooKeeperWatcher();
+      ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
+      ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
+      procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
+      comms.start(procMember);
+    }
+
+    // setup mock member subprocedures
+    final List<Subprocedure> subprocs = new ArrayList<Subprocedure>();
+    for (int i = 0; i < procMembers.size(); i++) {
+      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
+      Subprocedure commit = Mockito
+      .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
+          WAKE_FREQUENCY, TIMEOUT));
+      subprocs.add(commit);
+    }
+
+    // link subprocedure to buildNewOperation invocation.
+    final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
+    Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
+        (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
+      new Answer<Subprocedure>() {
+        @Override
+        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
+          int index = i.getAndIncrement();
+          LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
+          Subprocedure commit = subprocs.get(index);
+          return commit;
+        }
+      });
+
+    // setup spying on the coordinator
+//    Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
+//    Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
+
+    // start running the operation
+    Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
+//    assertEquals("Didn't mock coordinator task", proc, task);
+
+    // verify all things ran as expected
+//    waitAndVerifyProc(proc, once, once, never(), once, false);
+    waitAndVerifyProc(task, once, once, never(), once, false);
+    verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
+
+    // close all the things
+    closeAll(coordinator, coordinatorComms, procMembers);
+  }
+
+  /**
+   * Test a distributed commit with multiple cohort members, where one of the cohort members has a
+   * timeout exception during the prepare stage.
+   */
+  @Test
+  public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
+    String opDescription = "error injection coordination";
+    String[] cohortMembers = new String[] { "one", "two", "three" };
+    List<String> expected = Lists.newArrayList(cohortMembers);
+    // error constants
+    final int memberErrorIndex = 2;
+    final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
+
+    // start running the coordinator and its controller
+    ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
+    ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
+        coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
+    ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
+
+    // start a member for each node
+    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
+    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
+        expected.size());
+    for (String member : expected) {
+      ZooKeeperWatcher watcher = newZooKeeperWatcher();
+      ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
+      ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
+      members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
+      controller.start(mem);
+    }
+
+    // setup mock subprocedures
+    final List<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
+    final int[] elem = new int[1];
+    for (int i = 0; i < members.size(); i++) {
+      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
+      ProcedureMember comms = members.get(i).getFirst();
+      Subprocedure commit = Mockito
+      .spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
+      // This nasty bit has one of the impls throw a TimeoutException
+      Mockito.doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          int index = elem[0];
+          if (index == memberErrorIndex) {
+            LOG.debug("Sending error to coordinator");
+            ForeignException remoteCause = new ForeignException("TIMER",
+                new TimeoutException("subprocTimeout" , 1, 2, 0));
+            Subprocedure r = ((Subprocedure) invocation.getMock());
+            LOG.error("Remote commit failure, not propagating error:" + remoteCause);
+            r.monitor.receive(remoteCause);
+            // don't complete the error phase until the coordinator has gotten the error
+            // notification (which ensures that we never progress past prepare)
+            try {
+              Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
+                  WAKE_FREQUENCY, "coordinator received error");
+            } catch (InterruptedException e) {
+              LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
+              // reset the interrupt status on the thread
+              Thread.currentThread().interrupt();
+            }
+          }
+          elem[0] = ++index;
+          return null;
+        }
+      }).when(commit).acquireBarrier();
+      cohortTasks.add(commit);
+    }
+
+    // pass out a task per member
+    final int[] i = new int[] { 0 };
+    Mockito.when(
+      subprocFactory.buildSubprocedure(Mockito.eq(opName),
+        (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
+      new Answer<Subprocedure>() {
+        @Override
+        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
+          int index = i[0];
+          Subprocedure commit = cohortTasks.get(index);
+          index++;
+          i[0] = index;
+          return commit;
+        }
+      });
+
+    // setup spying on the coordinator
+    ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
+        .spy(new ForeignExceptionDispatcher());
+    Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
+        coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
+        opName, data, expected));
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(opName), eq(data), anyListOf(String.class)))
+      .thenReturn(coordinatorTask);
+    // count down the error latch when we get the remote error
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        // pass on the error to the master
+        invocation.callRealMethod();
+        // then count down the got error latch
+        coordinatorReceivedErrorLatch.countDown();
+        return null;
+      }
+    }).when(coordinatorTask).receive(Mockito.any(ForeignException.class));
+
+    // ----------------------------
+    // start running the operation
+    // ----------------------------
+
+    Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
+    assertEquals("Didn't mock coordinator task", coordinatorTask, task);
+
+    // wait for the task to complete
+    try {
+      task.waitForCompleted();
+    } catch (ForeignException fe) {
+      // this may get caught or may not 
+    }
+
+    // -------------
+    // verification
+    // -------------
+    waitAndVerifyProc(coordinatorTask, once, never(), once, once, true);
+    verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
+      once, true);
+
+    // close all the open things
+    closeAll(coordinator, coordinatorController, members);
+  }
+
+  /**
+   * Wait for the coordinator task to complete, and verify all the mocks
+   * @param task to wait on
+   * @throws Exception on unexpected failure
+   */
+  private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
+      VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
+      throws Exception {
+    boolean caughtError = false;
+    try {
+      proc.waitForCompleted();
+    } catch (ForeignException fe) {
+      caughtError = true;
+    }
+    // make sure that the task called all the expected phases
+    Mockito.verify(proc, prepare).sendGlobalBarrierStart();
+    Mockito.verify(proc, commit).sendGlobalBarrierReached();
+    Mockito.verify(proc, finish).sendGlobalBarrierComplete();
+    assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
+        .hasException());
+    assertEquals("Operation error state was unexpected", opHasError, caughtError);
+
+  }
+
+  /**
+   * Wait for the coordinator task to complete, and verify all the mocks
+   * @param task to wait on
+   * @throws Exception on unexpected failure
+   */
+  private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
+      VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
+      throws Exception {
+    boolean caughtError = false;
+    try {
+      op.waitForLocallyCompleted();
+    } catch (ForeignException fe) {
+      caughtError = true;
+    }
+    // make sure that the task called all the expected phases
+    Mockito.verify(op, prepare).acquireBarrier();
+    Mockito.verify(op, commit).insideBarrier();
+    // We cannot guarantee that cleanup has run so we don't check it.
+
+    assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
+        .hasException());
+    assertEquals("Operation error state was unexpected", opHasError, caughtError);
+
+  }
+
+  private void verifyCohortSuccessful(List<String> cohortNames,
+      SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
+      VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
+      VerificationMode finish, boolean opHasError) throws Exception {
+
+    // make sure we build the correct number of cohort members
+    Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
+      Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
+    // verify that we ran each of the operations cleanly
+    int j = 0;
+    for (Subprocedure op : cohortTasks) {
+      LOG.debug("Checking mock:" + (j++));
+      waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
+    }
+  }
+
+  private void closeAll(
+      ProcedureCoordinator coordinator,
+      ZKProcedureCoordinatorRpcs coordinatorController,
+      List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
+      throws IOException {
+    // make sure we close all the resources
+    for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
+      member.getFirst().close();
+      member.getSecond().close();
+    }
+    coordinator.close();
+    coordinatorController.close();
+  }
+}

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test zookeeper-based, procedure controllers
+ */
+@Category(MediumTests.class)
+public class TestZKProcedureControllers {
+
+  static final Log LOG = LogFactory.getLog(TestZKProcedureControllers.class);
+  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final String COHORT_NODE_NAME = "expected";
+  private static final String CONTROLLER_NODE_NAME = "controller";
+  private static final VerificationMode once = Mockito.times(1);
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    UTIL.startMiniZKCluster();
+  }
+
+  @AfterClass
+  public static void cleanupTest() throws Exception {
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  /**
+   * Smaller test to just test the actuation on the cohort member
+   * @throws Exception on failure
+   */
+  @Test(timeout = 15000)
+  public void testSimpleZKCohortMemberController() throws Exception {
+    ZooKeeperWatcher watcher = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
+    final String operationName = "instanceTest";
+
+    final Subprocedure sub = Mockito.mock(Subprocedure.class);
+    Mockito.when(sub.getName()).thenReturn(operationName);
+
+    final byte[] data = new byte[] { 1, 2, 3 };
+    final CountDownLatch prepared = new CountDownLatch(1);
+    final CountDownLatch committed = new CountDownLatch(1);
+
+    final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
+    final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
+        watcher, "testSimple", COHORT_NODE_NAME);
+
+    // mock out cohort member callbacks
+    final ProcedureMember member = Mockito
+        .mock(ProcedureMember.class);
+    Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        controller.sendMemberAcquired(sub);
+        prepared.countDown();
+        return null;
+      }
+    }).when(member).submitSubprocedure(sub);
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        controller.sendMemberCompleted(sub);
+        committed.countDown();
+        return null;
+      }
+    }).when(member).receivedReachedGlobalBarrier(operationName);
+
+    // start running the listener
+    controller.start(member);
+
+    // set a prepare node from a 'coordinator'
+    String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
+    ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
+    // wait for the operation to be prepared
+    prepared.await();
+
+    // create the commit node so we update the operation to enter the commit phase
+    String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
+    LOG.debug("Found prepared, posting commit node:" + commit);
+    ZKUtil.createAndFailSilent(watcher, commit);
+    LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
+    committed.await();
+
+    verify(monitor, never()).receive(Mockito.any(ForeignException.class));
+    // XXX: broken due to composition.
+//    verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
+//      Mockito.any(IOException.class));
+    // cleanup after the test
+    ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
+    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
+    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
+  }
+
+  @Test(timeout = 15000)
+  public void testZKCoordinatorControllerWithNoCohort() throws Exception {
+    final String operationName = "no cohort controller test";
+    final byte[] data = new byte[] { 1, 2, 3 };
+
+    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
+    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
+  }
+
+  @Test(timeout = 15000)
+  public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
+    final String operationName = "single member controller test";
+    final byte[] data = new byte[] { 1, 2, 3 };
+
+    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
+    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
+  }
+
+  @Test(timeout = 15000)
+  public void testZKCoordinatorControllerMultipleCohort() throws Exception {
+    final String operationName = "multi member controller test";
+    final byte[] data = new byte[] { 1, 2, 3 };
+
+    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
+      "cohort2", "cohort3");
+    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
+      "cohort2", "cohort3");
+  }
+
+  private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
+      String operationName, byte[] data, String... cohort) throws Exception {
+    ZooKeeperWatcher watcher = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
+    List<String> expected = Lists.newArrayList(cohort);
+
+    final Subprocedure sub = Mockito.mock(Subprocedure.class);
+    Mockito.when(sub.getName()).thenReturn(operationName);
+
+    CountDownLatch prepared = new CountDownLatch(expected.size());
+    CountDownLatch committed = new CountDownLatch(expected.size());
+    // mock out coordinator so we can keep track of zk progress
+    ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
+      prepared, committed);
+
+    ProcedureMember member = Mockito.mock(ProcedureMember.class);
+
+    Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
+        .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
+    ZKProcedureCoordinatorRpcs controller = pair.getFirst();
+    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
+    // start the operation
+    Procedure p = Mockito.mock(Procedure.class);
+    Mockito.when(p.getName()).thenReturn(operationName);
+
+    controller.sendGlobalBarrierAcquire(p, data, expected);
+
+    // post the prepare node for each expected node
+    for (ZKProcedureMemberRpcs cc : cohortControllers) {
+      cc.sendMemberAcquired(sub);
+    }
+
+    // wait for all the notifications to reach the coordinator
+    prepared.await();
+    // make sure we got the all the nodes and no more
+    Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
+      Mockito.anyString());
+
+    // kick off the commit phase
+    controller.sendGlobalBarrierReached(p, expected);
+
+    // post the committed node for each expected node
+    for (ZKProcedureMemberRpcs cc : cohortControllers) {
+      cc.sendMemberCompleted(sub);
+    }
+
+    // wait for all commit notifications to reach the coordinator
+    committed.await();
+    // make sure we got the all the nodes and no more
+    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
+      Mockito.anyString());
+
+    controller.resetMembers(p);
+
+    // verify all behavior
+    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
+    verifyCohort(member, cohortControllers.size(), operationName, data);
+    verifyCoordinator(operationName, coordinator, expected);
+  }
+
+  // TODO Broken by composition.
+//  @Test
+//  public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
+//    runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
+//      "cohort1", "cohort2");
+//    runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
+//      "cohort1", "cohort2");
+//  }
+
+  public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
+      String... cohort) throws Exception {
+    ZooKeeperWatcher watcher = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
+    List<String> expected = Lists.newArrayList(cohort);
+
+    final Subprocedure sub = Mockito.mock(Subprocedure.class);
+    Mockito.when(sub.getName()).thenReturn(operationName);
+
+    final CountDownLatch prepared = new CountDownLatch(expected.size());
+    final CountDownLatch committed = new CountDownLatch(expected.size());
+    // mock out coordinator so we can keep track of zk progress
+    ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
+      prepared, committed);
+
+    ProcedureMember member = Mockito.mock(ProcedureMember.class);
+    Procedure p = Mockito.mock(Procedure.class);
+    Mockito.when(p.getName()).thenReturn(operationName);
+
+    Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
+        .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
+    ZKProcedureCoordinatorRpcs controller = pair.getFirst();
+    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
+
+    // post 1/2 the prepare nodes early
+    for (int i = 0; i < cohortControllers.size() / 2; i++) {
+      cohortControllers.get(i).sendMemberAcquired(sub);
+    }
+
+    // start the operation
+    controller.sendGlobalBarrierAcquire(p, data, expected);
+
+    // post the prepare node for each expected node
+    for (ZKProcedureMemberRpcs cc : cohortControllers) {
+      cc.sendMemberAcquired(sub);
+    }
+
+    // wait for all the notifications to reach the coordinator
+    prepared.await();
+    // make sure we got the all the nodes and no more
+    Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
+      Mockito.anyString());
+
+    // kick off the commit phase
+    controller.sendGlobalBarrierReached(p, expected);
+
+    // post the committed node for each expected node
+    for (ZKProcedureMemberRpcs cc : cohortControllers) {
+      cc.sendMemberCompleted(sub);
+    }
+
+    // wait for all commit notifications to reach the coordiantor
+    committed.await();
+    // make sure we got the all the nodes and no more
+    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
+      Mockito.anyString());
+
+    controller.resetMembers(p);
+
+    // verify all behavior
+    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
+    verifyCohort(member, cohortControllers.size(), operationName, data);
+    verifyCoordinator(operationName, coordinator, expected);
+  }
+
+  /**
+   * @return a mock {@link ProcedureCoordinator} that just counts down the
+   *         prepared and committed latch for called to the respective method
+   */
+  private ProcedureCoordinator setupMockCoordinator(String operationName,
+      final CountDownLatch prepared, final CountDownLatch committed) {
+    ProcedureCoordinator coordinator = Mockito
+        .mock(ProcedureCoordinator.class);
+    Mockito.mock(ProcedureCoordinator.class);
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        prepared.countDown();
+        return null;
+      }
+    }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        committed.countDown();
+        return null;
+      }
+    }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString());
+    return coordinator;
+  }
+
+  /**
+   * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
+   */
+  private void verifyZooKeeperClean(String operationName, ZooKeeperWatcher watcher,
+      ZKProcedureUtil controller) throws Exception {
+    String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
+    String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
+    String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
+    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
+    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
+    assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
+  }
+
+  /**
+   * Verify the cohort controller got called once per expected node to start the operation
+   */
+  private void verifyCohort(ProcedureMember member, int cohortSize,
+      String operationName, byte[] data) {
+//    verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
+//      (byte[]) Mockito.argThat(new ArrayEquals(data)));
+    verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.any(Subprocedure.class));
+
+  }
+
+  /**
+   * Verify that the coordinator only got called once for each expected node
+   */
+  private void verifyCoordinator(String operationName,
+      ProcedureCoordinator coordinator, List<String> expected) {
+    // verify that we got all the expected nodes
+    for (String node : expected) {
+      verify(coordinator, once).memberAcquiredBarrier(operationName, node);
+      verify(coordinator, once).memberFinishedBarrier(operationName, node);
+    }
+  }
+
+  /**
+   * Specify how the controllers that should be started (not spy/mockable) for the test.
+   */
+  private abstract class StartControllers {
+    public abstract Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
+        ZooKeeperWatcher watcher, String operationName,
+        ProcedureCoordinator coordinator, String controllerName,
+        ProcedureMember member, List<String> cohortNames) throws Exception;
+  }
+
+  private final StartControllers startCoordinatorFirst = new StartControllers() {
+
+    @Override
+    public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
+        ZooKeeperWatcher watcher, String operationName,
+        ProcedureCoordinator coordinator, String controllerName,
+        ProcedureMember member, List<String> expected) throws Exception {
+      // start the controller
+      ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
+          watcher, operationName, CONTROLLER_NODE_NAME);
+      controller.start(coordinator);
+
+      // make a cohort controller for each expected node
+
+      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
+      for (String nodeName : expected) {
+        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
+            watcher, operationName, nodeName);
+        cc.start(member);
+        cohortControllers.add(cc);
+      }
+      return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
+          controller, cohortControllers);
+    }
+  };
+
+  /**
+   * Check for the possible race condition where a cohort member starts after the controller and
+   * therefore could miss a new operation
+   */
+  private final StartControllers startCohortFirst = new StartControllers() {
+
+    @Override
+    public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
+        ZooKeeperWatcher watcher, String operationName,
+        ProcedureCoordinator coordinator, String controllerName,
+        ProcedureMember member, List<String> expected) throws Exception {
+
+      // make a cohort controller for each expected node
+      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
+      for (String nodeName : expected) {
+        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
+            watcher, operationName, nodeName);
+        cc.start(member);
+        cohortControllers.add(cc);
+      }
+
+      // start the controller
+      ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
+          watcher, operationName, CONTROLLER_NODE_NAME);
+      controller.start(coordinator);
+
+      return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
+          controller, cohortControllers);
+    }
+  };
+}