You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2013/03/04 12:24:53 UTC
svn commit: r1452257 [12/14] - in /hbase/branches/0.94:
security/src/main/java/org/apache/hadoop/hbase/security/access/
security/src/test/java/org/apache/hadoop/hbase/security/access/
src/main/jamon/org/apache/hadoop/hbase/tmpl/master/ src/main/java/or...
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Demonstrate how Procedure handles single members, multiple members, and errors semantics
+ */
+@Category(SmallTests.class)
+public class TestProcedure {
+
+ ProcedureCoordinator coord;
+
+ @Before
+ public void setup() {
+ coord = mock(ProcedureCoordinator.class);
+ final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class);
+ when(coord.getRpcs()).thenReturn(comms); // make it not null
+ }
+
+ class LatchedProcedure extends Procedure {
+ CountDownLatch startedAcquireBarrier = new CountDownLatch(1);
+ CountDownLatch startedDuringBarrier = new CountDownLatch(1);
+ CountDownLatch completedProcedure = new CountDownLatch(1);
+
+ public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor,
+ long wakeFreq, long timeout, String opName, byte[] data,
+ List<String> expectedMembers) {
+ super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers);
+ }
+
+ @Override
+ public void sendGlobalBarrierStart() {
+ startedAcquireBarrier.countDown();
+ }
+
+ @Override
+ public void sendGlobalBarrierReached() {
+ startedDuringBarrier.countDown();
+ }
+
+ @Override
+ public void sendGlobalBarrierComplete() {
+ completedProcedure.countDown();
+ }
+ };
+
+ /**
+ * With a single member, verify ordered execution. The Coordinator side is run in a separate
+ * thread so we can only trigger from members and wait for particular state latches.
+ */
+ @Test(timeout = 1000)
+ public void testSingleMember() throws Exception {
+ // The member
+ List<String> members = new ArrayList<String>();
+ members.add("member");
+ LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+ Integer.MAX_VALUE, "op", null, members);
+ final LatchedProcedure procspy = spy(proc);
+ // coordinator: start the barrier procedure
+ new Thread() {
+ public void run() {
+ procspy.call();
+ }
+ }.start();
+
+ // coordinator: wait for the barrier to be acquired, then send start barrier
+ proc.startedAcquireBarrier.await();
+
+ // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
+ verify(procspy).sendGlobalBarrierStart();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ verify(procspy, never()).sendGlobalBarrierComplete();
+ verify(procspy, never()).barrierAcquiredByMember(anyString());
+
+ // member: trigger global barrier acquisition
+ proc.barrierAcquiredByMember(members.get(0));
+
+ // coordinator: wait for global barrier to be acquired.
+ proc.acquiredBarrierLatch.await();
+ verify(procspy).sendGlobalBarrierStart(); // old news
+
+ // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
+ // or was not called here.
+
+ // member: trigger global barrier release
+ proc.barrierReleasedByMember(members.get(0));
+
+ // coordinator: wait for procedure to be completed
+ proc.completedProcedure.await();
+ verify(procspy).sendGlobalBarrierReached();
+ verify(procspy).sendGlobalBarrierComplete();
+ verify(procspy, never()).receive(any(ForeignException.class));
+ }
+
+ @Test(timeout=1000)
+ public void testMultipleMember() throws Exception {
+ // 2 members
+ List<String> members = new ArrayList<String>();
+ members.add("member1");
+ members.add("member2");
+
+ LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+ Integer.MAX_VALUE, "op", null, members);
+ final LatchedProcedure procspy = spy(proc);
+ // start the barrier procedure
+ new Thread() {
+ public void run() {
+ procspy.call();
+ }
+ }.start();
+
+ // coordinator: wait for the barrier to be acquired, then send start barrier
+ procspy.startedAcquireBarrier.await();
+
+ // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
+ verify(procspy).sendGlobalBarrierStart();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ verify(procspy, never()).sendGlobalBarrierComplete();
+ verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals
+
+ // member0: [1/2] trigger global barrier acquisition.
+ procspy.barrierAcquiredByMember(members.get(0));
+
+ // coordinator not satisified.
+ verify(procspy).sendGlobalBarrierStart();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ verify(procspy, never()).sendGlobalBarrierComplete();
+
+ // member 1: [2/2] trigger global barrier acquisition.
+ procspy.barrierAcquiredByMember(members.get(1));
+
+ // coordinator: wait for global barrier to be acquired.
+ procspy.startedDuringBarrier.await();
+ verify(procspy).sendGlobalBarrierStart(); // old news
+
+ // member 1, 2: trigger global barrier release
+ procspy.barrierReleasedByMember(members.get(0));
+ procspy.barrierReleasedByMember(members.get(1));
+
+ // coordinator wait for procedure to be completed
+ procspy.completedProcedure.await();
+ verify(procspy).sendGlobalBarrierReached();
+ verify(procspy).sendGlobalBarrierComplete();
+ verify(procspy, never()).receive(any(ForeignException.class));
+ }
+
+ @Test(timeout = 1000)
+ public void testErrorPropagation() throws Exception {
+ List<String> members = new ArrayList<String>();
+ members.add("member");
+ Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100,
+ Integer.MAX_VALUE, "op", null, members);
+ final Procedure procspy = spy(proc);
+
+ ForeignException cause = new ForeignException("SRC", "External Exception");
+ proc.receive(cause);
+
+ // start the barrier procedure
+ Thread t = new Thread() {
+ public void run() {
+ procspy.call();
+ }
+ };
+ t.start();
+ t.join();
+
+ verify(procspy, never()).sendGlobalBarrierStart();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ verify(procspy).sendGlobalBarrierComplete();
+ }
+
+ @Test(timeout = 1000)
+ public void testBarrieredErrorPropagation() throws Exception {
+ List<String> members = new ArrayList<String>();
+ members.add("member");
+ LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+ Integer.MAX_VALUE, "op", null, members);
+ final LatchedProcedure procspy = spy(proc);
+
+ // start the barrier procedure
+ Thread t = new Thread() {
+ public void run() {
+ procspy.call();
+ }
+ };
+ t.start();
+
+ // now test that we can put an error in before the commit phase runs
+ procspy.startedAcquireBarrier.await();
+ ForeignException cause = new ForeignException("SRC", "External Exception");
+ procspy.receive(cause);
+ procspy.barrierAcquiredByMember(members.get(0));
+ t.join();
+
+ // verify state of all the object
+ verify(procspy).sendGlobalBarrierStart();
+ verify(procspy).sendGlobalBarrierComplete();
+ verify(procspy, never()).sendGlobalBarrierReached();
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test Procedure coordinator operation.
+ * <p>
+ * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
+ * level serialization this class will likely throw all kinds of errors.
+ */
+@Category(SmallTests.class)
+public class TestProcedureCoordinator {
+ // general test constants
+ private static final long WAKE_FREQUENCY = 1000;
+ private static final long TIMEOUT = 100000;
+ private static final long POOL_KEEP_ALIVE = 1;
+ private static final String nodeName = "node";
+ private static final String procName = "some op";
+ private static final byte[] procData = new byte[0];
+ private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
+
+ // setup the mocks
+ private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
+ private final Procedure task = mock(Procedure.class);
+ private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
+
+ // handle to the coordinator for each test
+ private ProcedureCoordinator coordinator;
+
+ @After
+ public void resetTest() throws IOException {
+ // reset all the mocks used for the tests
+ reset(controller, task, monitor);
+ // close the open coordinator, if it was used
+ if (coordinator != null) coordinator.close();
+ }
+
+ private ProcedureCoordinator buildNewCoordinator() {
+ ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY);
+ return spy(new ProcedureCoordinator(controller, pool));
+ }
+
+ /**
+ * Currently we can only handle one procedure at a time. This makes sure we handle that and
+ * reject submitting more.
+ */
+ @Test
+ public void testThreadPoolSize() throws Exception {
+ ProcedureCoordinator coordinator = buildNewCoordinator();
+ Procedure proc = new Procedure(coordinator, monitor,
+ WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
+ Procedure procSpy = spy(proc);
+
+ Procedure proc2 = new Procedure(coordinator, monitor,
+ WAKE_FREQUENCY, TIMEOUT, procName +"2", procData, expected);
+ Procedure procSpy2 = spy(proc2);
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+ .thenReturn(procSpy, procSpy2);
+
+ coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
+ // null here means second procedure failed to start.
+ assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
+ coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
+ }
+
+ /**
+ * Check handling a connection failure correctly if we get it during the acquiring phase
+ */
+ @Test(timeout = 5000)
+ public void testUnreachableControllerDuringPrepare() throws Exception {
+ coordinator = buildNewCoordinator();
+ // setup the proc
+ List<String> expected = Arrays.asList("cohort");
+ Procedure proc = new Procedure(coordinator, WAKE_FREQUENCY,
+ TIMEOUT, procName, procData, expected);
+ final Procedure procSpy = spy(proc);
+
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+ .thenReturn(procSpy);
+
+ // use the passed controller responses
+ IOException cause = new IOException("Failed to reach comms during acquire");
+ doThrow(cause).when(controller)
+ .sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyListOf(String.class));
+
+ // run the operation
+ proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
+ // and wait for it to finish
+ proc.waitForCompleted();
+ verify(procSpy, atLeastOnce()).receive(any(ForeignException.class));
+ verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
+ verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
+ verify(controller, never()).sendGlobalBarrierReached(any(Procedure.class),
+ anyListOf(String.class));
+ }
+
+ /**
+ * Check handling a connection failure correctly if we get it during the barrier phase
+ */
+ @Test(timeout = 5000)
+ public void testUnreachableControllerDuringCommit() throws Exception {
+ coordinator = buildNewCoordinator();
+
+ // setup the task and spy on it
+ List<String> expected = Arrays.asList("cohort");
+ final Procedure spy = spy(new Procedure(coordinator,
+ WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
+
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+ .thenReturn(spy);
+
+ // use the passed controller responses
+ IOException cause = new IOException("Failed to reach controller during prepare");
+ doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" }))
+ .when(controller).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class));
+ doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
+
+ // run the operation
+ Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
+ // and wait for it to finish
+ task.waitForCompleted();
+ verify(spy, atLeastOnce()).receive(any(ForeignException.class));
+ verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
+ verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy),
+ eq(procData), anyListOf(String.class));
+ verify(controller, times(1)).sendGlobalBarrierReached(any(Procedure.class),
+ anyListOf(String.class));
+ }
+
+ @Test(timeout = 1000)
+ public void testNoCohort() throws Exception {
+ runSimpleProcedure();
+ }
+
+ @Test(timeout = 1000)
+ public void testSingleCohortOrchestration() throws Exception {
+ runSimpleProcedure("one");
+ }
+
+ @Test(timeout = 1000)
+ public void testMultipleCohortOrchestration() throws Exception {
+ runSimpleProcedure("one", "two", "three", "four");
+ }
+
+ public void runSimpleProcedure(String... members) throws Exception {
+ coordinator = buildNewCoordinator();
+ Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
+ TIMEOUT, procName, procData, Arrays.asList(members));
+ final Procedure spy = spy(task);
+ runCoordinatedProcedure(spy, members);
+ }
+
+ /**
+ * Test that if nodes join the barrier early we still correctly handle the progress
+ */
+ @Test(timeout = 1000)
+ public void testEarlyJoiningBarrier() throws Exception {
+ final String[] cohort = new String[] { "one", "two", "three", "four" };
+ coordinator = buildNewCoordinator();
+ final ProcedureCoordinator ref = coordinator;
+ Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
+ TIMEOUT, procName, procData, Arrays.asList(cohort));
+ final Procedure spy = spy(task);
+
+ AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
+ public void doWork() {
+ // then do some fun where we commit before all nodes have prepared
+ // "one" commits before anyone else is done
+ ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
+ ref.memberFinishedBarrier(this.opName, this.cohort[0]);
+ // but "two" takes a while
+ ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
+ // "three"jumps ahead
+ ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
+ ref.memberFinishedBarrier(this.opName, this.cohort[2]);
+ // and "four" takes a while
+ ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
+ }
+ };
+
+ BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
+ @Override
+ public void doWork() {
+ ref.memberFinishedBarrier(opName, this.cohort[1]);
+ ref.memberFinishedBarrier(opName, this.cohort[3]);
+ }
+ };
+ runCoordinatedOperation(spy, prepare, commit, cohort);
+ }
+
+ /**
+ * Just run a procedure with the standard name and data, with not special task for the mock
+ * coordinator (it works just like a regular coordinator). For custom behavior see
+ * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])}
+ * .
+ * @param spy Spy on a real {@link Procedure}
+ * @param cohort expected cohort members
+ * @throws Exception on failure
+ */
+ public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
+ runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
+ new BarrierAnswer(procName, cohort), cohort);
+ }
+
+ public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare,
+ String... cohort) throws Exception {
+ runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
+ }
+
+ public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit,
+ String... cohort) throws Exception {
+ runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
+ }
+
+ public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
+ BarrierAnswer commitOperation, String... cohort) throws Exception {
+ List<String> expected = Arrays.asList(cohort);
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+ .thenReturn(spy);
+
+ // use the passed controller responses
+ doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
+ doAnswer(commitOperation).when(controller)
+ .sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
+
+ // run the operation
+ Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
+ // and wait for it to finish
+ task.waitForCompleted();
+
+ // make sure we mocked correctly
+ prepareOperation.ensureRan();
+ // we never got an exception
+ InOrder inorder = inOrder(spy, controller);
+ inorder.verify(spy).sendGlobalBarrierStart();
+ inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
+ inorder.verify(spy).sendGlobalBarrierReached();
+ inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class));
+ }
+
+ private abstract class OperationAnswer implements Answer<Void> {
+ private boolean ran = false;
+
+ public void ensureRan() {
+ assertTrue("Prepare mocking didn't actually run!", ran);
+ }
+
+ @Override
+ public final Void answer(InvocationOnMock invocation) throws Throwable {
+ this.ran = true;
+ doWork();
+ return null;
+ }
+
+ protected abstract void doWork() throws Throwable;
+ }
+
+ /**
+ * Just tell the current coordinator that each of the nodes has prepared
+ */
+ private class AcquireBarrierAnswer extends OperationAnswer {
+ protected final String[] cohort;
+ protected final String opName;
+
+ public AcquireBarrierAnswer(String opName, String... cohort) {
+ this.cohort = cohort;
+ this.opName = opName;
+ }
+
+ @Override
+ public void doWork() {
+ if (cohort == null) return;
+ for (String member : cohort) {
+ TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
+ }
+ }
+ }
+
+ /**
+ * Just tell the current coordinator that each of the nodes has committed
+ */
+ private class BarrierAnswer extends OperationAnswer {
+ protected final String[] cohort;
+ protected final String opName;
+
+ public BarrierAnswer(String opName, String... cohort) {
+ this.cohort = cohort;
+ this.opName = opName;
+ }
+
+ @Override
+ public void doWork() {
+ if (cohort == null) return;
+ for (String member : cohort) {
+ TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member);
+ }
+ }
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.TimeoutException;
+import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test the procedure member, and it's error handling mechanisms.
+ */
+@Category(SmallTests.class)
+public class TestProcedureMember {
+ private static final long WAKE_FREQUENCY = 100;
+ private static final long TIMEOUT = 100000;
+ private static final long POOL_KEEP_ALIVE = 1;
+
+ private final String op = "some op";
+ private final byte[] data = new byte[0];
+ private final ForeignExceptionDispatcher mockListener = Mockito
+ .spy(new ForeignExceptionDispatcher());
+ private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
+ private final ProcedureMemberRpcs mockMemberComms = Mockito
+ .mock(ProcedureMemberRpcs.class);
+ private ProcedureMember member;
+ private ForeignExceptionDispatcher dispatcher;
+ Subprocedure spySub;
+
+ /**
+ * Reset all the mock objects
+ */
+ @After
+ public void resetTest() {
+ reset(mockListener, mockBuilder, mockMemberComms);
+ if (member != null)
+ try {
+ member.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Build a member using the class level mocks
+ * @return member to use for tests
+ */
+ private ProcedureMember buildCohortMember() {
+ String name = "node";
+ ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
+ return new ProcedureMember(mockMemberComms, pool, mockBuilder);
+ }
+
+ /**
+ * Setup a procedure member that returns the spied-upon {@link Subprocedure}.
+ */
+ private void buildCohortMemberPair() throws IOException {
+ dispatcher = new ForeignExceptionDispatcher();
+ String name = "node";
+ ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
+ member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
+ when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception
+ Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
+ spySub = spy(subproc);
+ when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
+ addCommitAnswer();
+ }
+
+
+ /**
+ * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
+ */
+ private void addCommitAnswer() throws IOException {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ member.receivedReachedGlobalBarrier(op);
+ return null;
+ }
+ }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
+ }
+
+ /**
+ * Test the normal sub procedure execution case.
+ */
+ @Test(timeout = 500)
+ public void testSimpleRun() throws Exception {
+ member = buildCohortMember();
+ EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
+ EmptySubprocedure spy = spy(subproc);
+ when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
+
+ // when we get a prepare, then start the commit phase
+ addCommitAnswer();
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc1 = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc1);
+ // and wait for it to finish
+ subproc.waitForLocallyCompleted();
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spy);
+ order.verify(spy).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
+ order.verify(spy).insideBarrier();
+ order.verify(mockMemberComms).sendMemberCompleted(eq(spy));
+ order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
+ any(ForeignException.class));
+ }
+
+ /**
+ * Make sure we call cleanup etc, when we have an exception during
+ * {@link Subprocedure#acquireBarrier()}.
+ */
+ @Test(timeout = 1000)
+ public void testMemberPrepareException() throws Exception {
+ buildCohortMemberPair();
+
+ // mock an exception on Subprocedure's prepare
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ throw new IOException("Forced IOException in member acquireBarrier");
+ }
+ }).when(spySub).acquireBarrier();
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ // Later phases not run
+ order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
+ order.verify(spySub, never()).insideBarrier();
+ order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Make sure we call cleanup etc, when we have an exception during prepare.
+ */
+ @Test(timeout = 1000)
+ public void testSendMemberAcquiredCommsFailure() throws Exception {
+ buildCohortMemberPair();
+
+ // mock an exception on Subprocedure's prepare
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ throw new IOException("Forced IOException in memeber prepare");
+ }
+ }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+
+ // Later phases not run
+ order.verify(spySub, never()).insideBarrier();
+ order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Fail correctly if coordinator aborts the procedure. The subprocedure will not interrupt a
+ * running {@link Subprocedure#prepare} -- prepare needs to finish first, and the the abort
+ * is checked. Thus, the {@link Subprocedure#prepare} should succeed but later get rolled back
+ * via {@link Subprocedure#cleanup}.
+ */
+ @Test(timeout = 1000)
+ public void testCoordinatorAbort() throws Exception {
+ buildCohortMemberPair();
+
+ // mock that another node timed out or failed to prepare
+ final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ // inject a remote error (this would have come from an external thread)
+ spySub.cancel("bogus message", oate);
+ // sleep the wake frequency since that is what we promised
+ Thread.sleep(WAKE_FREQUENCY);
+ return null;
+ }
+ }).when(spySub).waitForReachedGlobalBarrier();
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+ // Later phases not run
+ order.verify(spySub, never()).insideBarrier();
+ order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Handle failures if a member's commit phase fails.
+ *
+ * NOTE: This is the core difference that makes this different from traditional 2PC. In true
+ * 2PC the transaction is committed just before the coordinator sends commit messages to the
+ * member. Members are then responsible for reading its TX log. This implementation actually
+ * rolls back, and thus breaks the normal TX guarantees.
+ */
+ @Test(timeout = 1000)
+ public void testMemberCommitException() throws Exception {
+ buildCohortMemberPair();
+
+ // mock an exception on Subprocedure's prepare
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ throw new IOException("Forced IOException in memeber prepare");
+ }
+ }).when(spySub).insideBarrier();
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+ order.verify(spySub).insideBarrier();
+
+ // Later phases not run
+ order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Handle Failures if a member's commit phase succeeds but notification to coordinator fails
+ *
+ * NOTE: This is the core difference that makes this different from traditional 2PC. In true
+ * 2PC the transaction is committed just before the coordinator sends commit messages to the
+ * member. Members are then responsible for reading its TX log. This implementation actually
+ * rolls back, and thus breaks the normal TX guarantees.
+ */
+ @Test(timeout = 1000)
+ public void testMemberCommitCommsFailure() throws Exception {
+ buildCohortMemberPair();
+ final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ // inject a remote error (this would have come from an external thread)
+ spySub.cancel("commit comms fail", oate);
+ // sleep the wake frequency since that is what we promised
+ Thread.sleep(WAKE_FREQUENCY);
+ return null;
+ }
+ }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class));
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ member.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spySub);
+ order.verify(spySub).acquireBarrier();
+ order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+ order.verify(spySub).insideBarrier();
+ order.verify(mockMemberComms).sendMemberCompleted(eq(spySub));
+ // error recovery path exercised
+ order.verify(spySub).cancel(anyString(), any(Exception.class));
+ order.verify(spySub).cleanup(any(Exception.class));
+ }
+
+ /**
+ * Fail correctly on getting an external error while waiting for the prepared latch
+ * @throws Exception on failure
+ */
+ @Test(timeout = 1000)
+ public void testPropagateConnectionErrorBackToManager() throws Exception {
+ // setup the operation
+ member = buildCohortMember();
+ ProcedureMember memberSpy = spy(member);
+
+ // setup the commit and the spy
+ final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
+ ForeignExceptionDispatcher dispSpy = spy(dispatcher);
+ Subprocedure commit = new EmptySubprocedure(member, dispatcher);
+ Subprocedure spy = spy(commit);
+ when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
+
+ // fail during the prepare phase
+ doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
+ // and throw a connection error when we try to tell the controller about it
+ doThrow(new IOException("Controller is down!")).when(mockMemberComms)
+ .sendMemberAborted(eq(spy), any(ForeignException.class));
+
+
+ // run the operation
+ // build a new operation
+ Subprocedure subproc = memberSpy.createSubprocedure(op, data);
+ memberSpy.submitSubprocedure(subproc);
+ // if the operation doesn't die properly, then this will timeout
+ memberSpy.closeAndWait(TIMEOUT);
+
+ // make sure everything ran in order
+ InOrder order = inOrder(mockMemberComms, spy, dispSpy);
+ // make sure we acquire.
+ order.verify(spy).acquireBarrier();
+ order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
+
+ // TODO Need to do another refactor to get this to propagate to the coordinator.
+ // make sure we pass a remote exception back the controller
+// order.verify(mockMemberComms).sendMemberAborted(eq(spy),
+// any(ExternalException.class));
+// order.verify(dispSpy).receiveError(anyString(),
+// any(ExternalException.class), any());
+ }
+
+ /**
+ * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
+ * correctly build a new task for the requested operation
+ * @throws Exception on failure
+ */
+ @Test
+ public void testNoTaskToBeRunFromRequest() throws Exception {
+ ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
+ when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
+ .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
+ member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
+ // builder returns null
+ // build a new operation
+ Subprocedure subproc = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc);
+ // throws an illegal state exception
+ try {
+ // build a new operation
+ Subprocedure subproc2 = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc2);
+ } catch (IllegalStateException ise) {
+ }
+ // throws an illegal argument exception
+ try {
+ // build a new operation
+ Subprocedure subproc3 = member.createSubprocedure(op, data);
+ member.submitSubprocedure(subproc3);
+ } catch (IllegalArgumentException iae) {
+ }
+
+ // no request should reach the pool
+ verifyZeroInteractions(pool);
+ // get two abort requests
+ // TODO Need to do another refactor to get this to propagate to the coordinator.
+ // verify(mockMemberComms, times(2)).sendMemberAborted(any(Subprocedure.class), any(ExternalException.class));
+ }
+
+ /**
+ * Helper {@link Procedure} who's phase for each step is just empty
+ */
+ public class EmptySubprocedure extends SubprocedureImpl {
+ public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
+ super( member, op, dispatcher,
+ // TODO 1000000 is an arbitrary number that I picked.
+ WAKE_FREQUENCY, TIMEOUT);
+ }
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.TimeoutException;
+import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.internal.matchers.ArrayEquals;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
+ */
+@Category(MediumTests.class)
+public class TestZKProcedure {
+
+ private static final Log LOG = LogFactory.getLog(TestZKProcedure.class);
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final String COORDINATOR_NODE_NAME = "coordinator";
+ private static final long KEEP_ALIVE = 100; // seconds
+ private static final int POOL_SIZE = 1;
+ private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
+ private static final long WAKE_FREQUENCY = 500;
+ private static final String opName = "op";
+ private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
+ private static final VerificationMode once = Mockito.times(1);
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+ UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ UTIL.shutdownMiniZKCluster();
+ }
+
+ private static ZooKeeperWatcher newZooKeeperWatcher() throws IOException {
+ return new ZooKeeperWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
+ @Override
+ public void abort(String why, Throwable e) {
+ throw new RuntimeException(
+ "Unexpected abort in distributed three phase commit test:" + why, e);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void testEmptyMemberSet() throws Exception {
+ runCommit();
+ }
+
+ @Test
+ public void testSingleMember() throws Exception {
+ runCommit("one");
+ }
+
+ @Test
+ public void testMultipleMembers() throws Exception {
+ runCommit("one", "two", "three", "four" );
+ }
+
+ private void runCommit(String... members) throws Exception {
+ // make sure we just have an empty list
+ if (members == null) {
+ members = new String[0];
+ }
+ List<String> expected = Arrays.asList(members);
+
+ // setup the constants
+ ZooKeeperWatcher coordZkw = newZooKeeperWatcher();
+ String opDescription = "coordination test - " + members.length + " cohort members";
+
+ // start running the controller
+ ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
+ coordZkw, opDescription, COORDINATOR_NODE_NAME);
+ ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
+ ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
+ @Override
+ public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
+ List<String> expectedMembers) {
+ return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
+ }
+ };
+
+ // build and start members
+ // NOTE: There is a single subprocedure builder for all members here.
+ SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
+ List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
+ members.length);
+ // start each member
+ for (String member : members) {
+ ZooKeeperWatcher watcher = newZooKeeperWatcher();
+ ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+ ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
+ ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
+ procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
+ comms.start(procMember);
+ }
+
+ // setup mock member subprocedures
+ final List<Subprocedure> subprocs = new ArrayList<Subprocedure>();
+ for (int i = 0; i < procMembers.size(); i++) {
+ ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
+ Subprocedure commit = Mockito
+ .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
+ WAKE_FREQUENCY, TIMEOUT));
+ subprocs.add(commit);
+ }
+
+ // link subprocedure to buildNewOperation invocation.
+ final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
+ Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
+ (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
+ new Answer<Subprocedure>() {
+ @Override
+ public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
+ int index = i.getAndIncrement();
+ LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
+ Subprocedure commit = subprocs.get(index);
+ return commit;
+ }
+ });
+
+ // setup spying on the coordinator
+// Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
+// Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
+
+ // start running the operation
+ Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
+// assertEquals("Didn't mock coordinator task", proc, task);
+
+ // verify all things ran as expected
+// waitAndVerifyProc(proc, once, once, never(), once, false);
+ waitAndVerifyProc(task, once, once, never(), once, false);
+ verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
+
+ // close all the things
+ closeAll(coordinator, coordinatorComms, procMembers);
+ }
+
+ /**
+ * Test a distributed commit with multiple cohort members, where one of the cohort members has a
+ * timeout exception during the prepare stage.
+ */
+ @Test
+ public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
+ String opDescription = "error injection coordination";
+ String[] cohortMembers = new String[] { "one", "two", "three" };
+ List<String> expected = Lists.newArrayList(cohortMembers);
+ // error constants
+ final int memberErrorIndex = 2;
+ final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
+
+ // start running the coordinator and its controller
+ ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
+ ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
+ coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
+ ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
+ ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
+
+ // start a member for each node
+ SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
+ List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
+ expected.size());
+ for (String member : expected) {
+ ZooKeeperWatcher watcher = newZooKeeperWatcher();
+ ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+ ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
+ ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
+ members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
+ controller.start(mem);
+ }
+
+ // setup mock subprocedures
+ final List<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
+ final int[] elem = new int[1];
+ for (int i = 0; i < members.size(); i++) {
+ ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
+ ProcedureMember comms = members.get(i).getFirst();
+ Subprocedure commit = Mockito
+ .spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
+ // This nasty bit has one of the impls throw a TimeoutException
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ int index = elem[0];
+ if (index == memberErrorIndex) {
+ LOG.debug("Sending error to coordinator");
+ ForeignException remoteCause = new ForeignException("TIMER",
+ new TimeoutException("subprocTimeout" , 1, 2, 0));
+ Subprocedure r = ((Subprocedure) invocation.getMock());
+ LOG.error("Remote commit failure, not propagating error:" + remoteCause);
+ r.monitor.receive(remoteCause);
+ // don't complete the error phase until the coordinator has gotten the error
+ // notification (which ensures that we never progress past prepare)
+ try {
+ Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
+ WAKE_FREQUENCY, "coordinator received error");
+ } catch (InterruptedException e) {
+ LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
+ // reset the interrupt status on the thread
+ Thread.currentThread().interrupt();
+ }
+ }
+ elem[0] = ++index;
+ return null;
+ }
+ }).when(commit).acquireBarrier();
+ cohortTasks.add(commit);
+ }
+
+ // pass out a task per member
+ final int[] i = new int[] { 0 };
+ Mockito.when(
+ subprocFactory.buildSubprocedure(Mockito.eq(opName),
+ (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
+ new Answer<Subprocedure>() {
+ @Override
+ public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
+ int index = i[0];
+ Subprocedure commit = cohortTasks.get(index);
+ index++;
+ i[0] = index;
+ return commit;
+ }
+ });
+
+ // setup spying on the coordinator
+ ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
+ .spy(new ForeignExceptionDispatcher());
+ Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
+ coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
+ opName, data, expected));
+ when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(opName), eq(data), anyListOf(String.class)))
+ .thenReturn(coordinatorTask);
+ // count down the error latch when we get the remote error
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ // pass on the error to the master
+ invocation.callRealMethod();
+ // then count down the got error latch
+ coordinatorReceivedErrorLatch.countDown();
+ return null;
+ }
+ }).when(coordinatorTask).receive(Mockito.any(ForeignException.class));
+
+ // ----------------------------
+ // start running the operation
+ // ----------------------------
+
+ Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
+ assertEquals("Didn't mock coordinator task", coordinatorTask, task);
+
+ // wait for the task to complete
+ try {
+ task.waitForCompleted();
+ } catch (ForeignException fe) {
+ // this may get caught or may not
+ }
+
+ // -------------
+ // verification
+ // -------------
+ waitAndVerifyProc(coordinatorTask, once, never(), once, once, true);
+ verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
+ once, true);
+
+ // close all the open things
+ closeAll(coordinator, coordinatorController, members);
+ }
+
+ /**
+ * Wait for the coordinator task to complete, and verify all the mocks
+ * @param task to wait on
+ * @throws Exception on unexpected failure
+ */
+ private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
+ VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
+ throws Exception {
+ boolean caughtError = false;
+ try {
+ proc.waitForCompleted();
+ } catch (ForeignException fe) {
+ caughtError = true;
+ }
+ // make sure that the task called all the expected phases
+ Mockito.verify(proc, prepare).sendGlobalBarrierStart();
+ Mockito.verify(proc, commit).sendGlobalBarrierReached();
+ Mockito.verify(proc, finish).sendGlobalBarrierComplete();
+ assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
+ .hasException());
+ assertEquals("Operation error state was unexpected", opHasError, caughtError);
+
+ }
+
+ /**
+ * Wait for the coordinator task to complete, and verify all the mocks
+ * @param task to wait on
+ * @throws Exception on unexpected failure
+ */
+ private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
+ VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
+ throws Exception {
+ boolean caughtError = false;
+ try {
+ op.waitForLocallyCompleted();
+ } catch (ForeignException fe) {
+ caughtError = true;
+ }
+ // make sure that the task called all the expected phases
+ Mockito.verify(op, prepare).acquireBarrier();
+ Mockito.verify(op, commit).insideBarrier();
+ // We cannot guarantee that cleanup has run so we don't check it.
+
+ assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
+ .hasException());
+ assertEquals("Operation error state was unexpected", opHasError, caughtError);
+
+ }
+
+ private void verifyCohortSuccessful(List<String> cohortNames,
+ SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
+ VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
+ VerificationMode finish, boolean opHasError) throws Exception {
+
+ // make sure we build the correct number of cohort members
+ Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
+ Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
+ // verify that we ran each of the operations cleanly
+ int j = 0;
+ for (Subprocedure op : cohortTasks) {
+ LOG.debug("Checking mock:" + (j++));
+ waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
+ }
+ }
+
+ private void closeAll(
+ ProcedureCoordinator coordinator,
+ ZKProcedureCoordinatorRpcs coordinatorController,
+ List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
+ throws IOException {
+ // make sure we close all the resources
+ for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
+ member.getFirst().close();
+ member.getSecond().close();
+ }
+ coordinator.close();
+ coordinatorController.close();
+ }
+}
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test zookeeper-based, procedure controllers
+ */
+@Category(MediumTests.class)
+public class TestZKProcedureControllers {
+
+ static final Log LOG = LogFactory.getLog(TestZKProcedureControllers.class);
+ private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final String COHORT_NODE_NAME = "expected";
+ private static final String CONTROLLER_NODE_NAME = "controller";
+ private static final VerificationMode once = Mockito.times(1);
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+ UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ UTIL.shutdownMiniZKCluster();
+ }
+
+ /**
+ * Smaller test to just test the actuation on the cohort member
+ * @throws Exception on failure
+ */
+ @Test(timeout = 15000)
+ public void testSimpleZKCohortMemberController() throws Exception {
+ ZooKeeperWatcher watcher = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
+ final String operationName = "instanceTest";
+
+ final Subprocedure sub = Mockito.mock(Subprocedure.class);
+ Mockito.when(sub.getName()).thenReturn(operationName);
+
+ final byte[] data = new byte[] { 1, 2, 3 };
+ final CountDownLatch prepared = new CountDownLatch(1);
+ final CountDownLatch committed = new CountDownLatch(1);
+
+ final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
+ final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
+ watcher, "testSimple", COHORT_NODE_NAME);
+
+ // mock out cohort member callbacks
+ final ProcedureMember member = Mockito
+ .mock(ProcedureMember.class);
+ Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ controller.sendMemberAcquired(sub);
+ prepared.countDown();
+ return null;
+ }
+ }).when(member).submitSubprocedure(sub);
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ controller.sendMemberCompleted(sub);
+ committed.countDown();
+ return null;
+ }
+ }).when(member).receivedReachedGlobalBarrier(operationName);
+
+ // start running the listener
+ controller.start(member);
+
+ // set a prepare node from a 'coordinator'
+ String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
+ ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
+ // wait for the operation to be prepared
+ prepared.await();
+
+ // create the commit node so we update the operation to enter the commit phase
+ String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
+ LOG.debug("Found prepared, posting commit node:" + commit);
+ ZKUtil.createAndFailSilent(watcher, commit);
+ LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
+ committed.await();
+
+ verify(monitor, never()).receive(Mockito.any(ForeignException.class));
+ // XXX: broken due to composition.
+// verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
+// Mockito.any(IOException.class));
+ // cleanup after the test
+ ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
+ assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
+ assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
+ }
+
+ @Test(timeout = 15000)
+ public void testZKCoordinatorControllerWithNoCohort() throws Exception {
+ final String operationName = "no cohort controller test";
+ final byte[] data = new byte[] { 1, 2, 3 };
+
+ runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
+ runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
+ }
+
+ @Test(timeout = 15000)
+ public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
+ final String operationName = "single member controller test";
+ final byte[] data = new byte[] { 1, 2, 3 };
+
+ runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
+ runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
+ }
+
+ @Test(timeout = 15000)
+ public void testZKCoordinatorControllerMultipleCohort() throws Exception {
+ final String operationName = "multi member controller test";
+ final byte[] data = new byte[] { 1, 2, 3 };
+
+ runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
+ "cohort2", "cohort3");
+ runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
+ "cohort2", "cohort3");
+ }
+
+ private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
+ String operationName, byte[] data, String... cohort) throws Exception {
+ ZooKeeperWatcher watcher = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
+ List<String> expected = Lists.newArrayList(cohort);
+
+ final Subprocedure sub = Mockito.mock(Subprocedure.class);
+ Mockito.when(sub.getName()).thenReturn(operationName);
+
+ CountDownLatch prepared = new CountDownLatch(expected.size());
+ CountDownLatch committed = new CountDownLatch(expected.size());
+ // mock out coordinator so we can keep track of zk progress
+ ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
+ prepared, committed);
+
+ ProcedureMember member = Mockito.mock(ProcedureMember.class);
+
+ Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
+ .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
+ ZKProcedureCoordinatorRpcs controller = pair.getFirst();
+ List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
+ // start the operation
+ Procedure p = Mockito.mock(Procedure.class);
+ Mockito.when(p.getName()).thenReturn(operationName);
+
+ controller.sendGlobalBarrierAcquire(p, data, expected);
+
+ // post the prepare node for each expected node
+ for (ZKProcedureMemberRpcs cc : cohortControllers) {
+ cc.sendMemberAcquired(sub);
+ }
+
+ // wait for all the notifications to reach the coordinator
+ prepared.await();
+ // make sure we got the all the nodes and no more
+ Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
+ Mockito.anyString());
+
+ // kick off the commit phase
+ controller.sendGlobalBarrierReached(p, expected);
+
+ // post the committed node for each expected node
+ for (ZKProcedureMemberRpcs cc : cohortControllers) {
+ cc.sendMemberCompleted(sub);
+ }
+
+ // wait for all commit notifications to reach the coordinator
+ committed.await();
+ // make sure we got the all the nodes and no more
+ Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
+ Mockito.anyString());
+
+ controller.resetMembers(p);
+
+ // verify all behavior
+ verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
+ verifyCohort(member, cohortControllers.size(), operationName, data);
+ verifyCoordinator(operationName, coordinator, expected);
+ }
+
+ // TODO Broken by composition.
+// @Test
+// public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
+// runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
+// "cohort1", "cohort2");
+// runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
+// "cohort1", "cohort2");
+// }
+
+ public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
+ String... cohort) throws Exception {
+ ZooKeeperWatcher watcher = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
+ List<String> expected = Lists.newArrayList(cohort);
+
+ final Subprocedure sub = Mockito.mock(Subprocedure.class);
+ Mockito.when(sub.getName()).thenReturn(operationName);
+
+ final CountDownLatch prepared = new CountDownLatch(expected.size());
+ final CountDownLatch committed = new CountDownLatch(expected.size());
+ // mock out coordinator so we can keep track of zk progress
+ ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
+ prepared, committed);
+
+ ProcedureMember member = Mockito.mock(ProcedureMember.class);
+ Procedure p = Mockito.mock(Procedure.class);
+ Mockito.when(p.getName()).thenReturn(operationName);
+
+ Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
+ .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
+ ZKProcedureCoordinatorRpcs controller = pair.getFirst();
+ List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
+
+ // post 1/2 the prepare nodes early
+ for (int i = 0; i < cohortControllers.size() / 2; i++) {
+ cohortControllers.get(i).sendMemberAcquired(sub);
+ }
+
+ // start the operation
+ controller.sendGlobalBarrierAcquire(p, data, expected);
+
+ // post the prepare node for each expected node
+ for (ZKProcedureMemberRpcs cc : cohortControllers) {
+ cc.sendMemberAcquired(sub);
+ }
+
+ // wait for all the notifications to reach the coordinator
+ prepared.await();
+ // make sure we got the all the nodes and no more
+ Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
+ Mockito.anyString());
+
+ // kick off the commit phase
+ controller.sendGlobalBarrierReached(p, expected);
+
+ // post the committed node for each expected node
+ for (ZKProcedureMemberRpcs cc : cohortControllers) {
+ cc.sendMemberCompleted(sub);
+ }
+
+ // wait for all commit notifications to reach the coordiantor
+ committed.await();
+ // make sure we got the all the nodes and no more
+ Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
+ Mockito.anyString());
+
+ controller.resetMembers(p);
+
+ // verify all behavior
+ verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
+ verifyCohort(member, cohortControllers.size(), operationName, data);
+ verifyCoordinator(operationName, coordinator, expected);
+ }
+
+ /**
+ * @return a mock {@link ProcedureCoordinator} that just counts down the
+ * prepared and committed latch for called to the respective method
+ */
+ private ProcedureCoordinator setupMockCoordinator(String operationName,
+ final CountDownLatch prepared, final CountDownLatch committed) {
+ ProcedureCoordinator coordinator = Mockito
+ .mock(ProcedureCoordinator.class);
+ Mockito.mock(ProcedureCoordinator.class);
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ prepared.countDown();
+ return null;
+ }
+ }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ committed.countDown();
+ return null;
+ }
+ }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString());
+ return coordinator;
+ }
+
+ /**
+ * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
+ */
+ private void verifyZooKeeperClean(String operationName, ZooKeeperWatcher watcher,
+ ZKProcedureUtil controller) throws Exception {
+ String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
+ String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
+ String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
+ assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
+ assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
+ assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
+ }
+
+ /**
+ * Verify the cohort controller got called once per expected node to start the operation
+ */
+ private void verifyCohort(ProcedureMember member, int cohortSize,
+ String operationName, byte[] data) {
+// verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
+// (byte[]) Mockito.argThat(new ArrayEquals(data)));
+ verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.any(Subprocedure.class));
+
+ }
+
+ /**
+ * Verify that the coordinator only got called once for each expected node
+ */
+ private void verifyCoordinator(String operationName,
+ ProcedureCoordinator coordinator, List<String> expected) {
+ // verify that we got all the expected nodes
+ for (String node : expected) {
+ verify(coordinator, once).memberAcquiredBarrier(operationName, node);
+ verify(coordinator, once).memberFinishedBarrier(operationName, node);
+ }
+ }
+
+ /**
+ * Specify how the controllers that should be started (not spy/mockable) for the test.
+ */
+ private abstract class StartControllers {
+ public abstract Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
+ ZooKeeperWatcher watcher, String operationName,
+ ProcedureCoordinator coordinator, String controllerName,
+ ProcedureMember member, List<String> cohortNames) throws Exception;
+ }
+
+ private final StartControllers startCoordinatorFirst = new StartControllers() {
+
+ @Override
+ public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
+ ZooKeeperWatcher watcher, String operationName,
+ ProcedureCoordinator coordinator, String controllerName,
+ ProcedureMember member, List<String> expected) throws Exception {
+ // start the controller
+ ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
+ watcher, operationName, CONTROLLER_NODE_NAME);
+ controller.start(coordinator);
+
+ // make a cohort controller for each expected node
+
+ List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
+ for (String nodeName : expected) {
+ ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
+ watcher, operationName, nodeName);
+ cc.start(member);
+ cohortControllers.add(cc);
+ }
+ return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
+ controller, cohortControllers);
+ }
+ };
+
+ /**
+ * Check for the possible race condition where a cohort member starts after the controller and
+ * therefore could miss a new operation
+ */
+ private final StartControllers startCohortFirst = new StartControllers() {
+
+ @Override
+ public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
+ ZooKeeperWatcher watcher, String operationName,
+ ProcedureCoordinator coordinator, String controllerName,
+ ProcedureMember member, List<String> expected) throws Exception {
+
+ // make a cohort controller for each expected node
+ List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
+ for (String nodeName : expected) {
+ ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
+ watcher, operationName, nodeName);
+ cc.start(member);
+ cohortControllers.add(cc);
+ }
+
+ // start the controller
+ ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
+ watcher, operationName, CONTROLLER_NODE_NAME);
+ controller.start(coordinator);
+
+ return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
+ controller, cohortControllers);
+ }
+ };
+}