You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/02/05 22:06:14 UTC
[1/3] HELIX-42: refactor batch message handling
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 66e7fa3..b0aa4ac 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -60,6 +60,7 @@ import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MessageTask;
import org.apache.helix.model.Message;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
@@ -262,13 +263,14 @@ public class Mocks {
boolean completionInvoked = false;
@Override
- protected void reportCompletion(Message message) {
- System.out.println("Mocks.MockCMTaskExecutor.reportCompletion()");
+ public void finishTask(MessageTask task) {
+ System.out.println("Mocks.MockCMTaskExecutor.finishTask()");
completionInvoked = true;
}
+
public boolean isDone(String taskId) {
- Future<HelixTaskResult> future = _taskMap.get(taskId);
+ Future<HelixTaskResult> future = _taskMap.get(taskId).getFuture();
if (future != null) {
return future.isDone();
}
@@ -790,5 +792,4 @@ public class Mocks {
}
}
-// >>>>>>> 5ef256eeced461eae733d568ad730aabeda3c0f2
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
index c63f99a..5bc1fc0 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.helix.Mocks.MockStateModel;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.messaging.handling.AsyncCallbackService;
import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
+import org.apache.helix.messaging.handling.HelixTask;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
@@ -76,14 +77,15 @@ public class TestHelixTaskExecutor
new HelixStateTransitionHandler(stateModel,
message,
context,
- currentStateDelta,
- executor);
+ currentStateDelta);
- executor.scheduleTask(message, handler, context);
- while (!executor.isDone(msgId + "/" + message.getPartitionName()))
- {
- Thread.sleep(500);
- }
+ HelixTask task = new HelixTask(message, context, handler, executor);
+ executor.scheduleTask(task);
+ for (int i = 0; i < 10; i++) {
+ if (!executor.isDone(task.getTaskId())) {
+ Thread.sleep(500);
+ }
+ }
AssertJUnit.assertTrue(stateModel.stateModelInvoked);
System.out.println("END TestCMTaskExecutor");
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
index 95f7809..0910061 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
@@ -75,7 +75,7 @@ public class TestHelixTaskHandler
currentStateDelta.setState("TestDB_0", "OFFLINE");
HelixStateTransitionHandler stHandler = new HelixStateTransitionHandler(stateModel, message,
- context, currentStateDelta, executor);
+ context, currentStateDelta);
HelixTask handler;
handler = new HelixTask(message, context, stHandler, executor);
handler.call();
@@ -122,7 +122,7 @@ public class TestHelixTaskHandler
currentStateDelta.setState("TestDB_0", "OFFLINE");
HelixStateTransitionHandler stHandler = new HelixStateTransitionHandler(stateModel, message,
- context, currentStateDelta, executor);
+ context, currentStateDelta);
HelixTask handler = new HelixTask(message, context, stHandler, executor);
handler.call();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
new file mode 100644
index 0000000..cc9a4ce
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
@@ -0,0 +1,298 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.mock.participant.MockParticipant.ErrTransition;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestBatchMessage extends ZkIntegrationTestBase
+{
+ class TestZkChildListener implements IZkChildListener
+ {
+ int _maxNbOfChilds = 0;
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+ {
+ System.out.println(parentPath + " has " + currentChilds.size() + " messages");
+ if (currentChilds.size() > _maxNbOfChilds)
+ {
+ _maxNbOfChilds = currentChilds.size();
+ }
+ }
+
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // enable batch message
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setBatchMessageMode(true);
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ // registry a message listener so we know how many message generated
+ TestZkChildListener listener = new TestZkChildListener();
+ _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
+
+
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < n; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ // a non-batch-message run followed by a batch-message-enabled run
+ @Test
+ public void testChangeBatchMessageMode() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // stop all participants
+ Thread.sleep(1000);
+ for (int i = 0; i < n; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ // enable batch message
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setBatchMessageMode(true);
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ // registry a message listener so we know how many message generated
+ TestZkChildListener listener = new TestZkChildListener();
+ _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
+
+ // restart all participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
+
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < n; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testSubMsgExecutionFail() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ final int n = 5;
+ MockParticipant[] participants = new MockParticipant[n];
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+// ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+
+ TestHelper.setupCluster(clusterName,
+ ZK_ADDR,
+ 12918,
+ "localhost",
+ "TestDB",
+ 1, // resource#
+ 6, // partition#
+ n, // nodes#
+ 3, // replicas#
+ "MasterSlave",
+ true);
+
+ // enable batch message
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setBatchMessageMode(true);
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ if (i == 1)
+ {
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new ErrTransition(errPartitions));
+ }
+ else
+ {
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ }
+ participants[i].syncStart();
+ }
+
+ Map<String, Map<String, String>> errStates =
+ new HashMap<String, Map<String, String>>();
+ errStates.put("TestDB0", new HashMap<String, String>());
+ errStates.get("TestDB0").put("TestDB0_4", "localhost_12919");
+ boolean result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStates));
+ Assert.assertTrue(result);
+
+ Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>();
+ errorStateMap.put("TestDB0_4", TestHelper.setOf("localhost_12919"));
+
+ // verify "TestDB0_4", "localhost_12919" is in ERROR state
+ TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
deleted file mode 100644
index 83c0a78..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.List;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class TestGroupMessage extends ZkIntegrationTestBase
-{
- class TestZkChildListener implements IZkChildListener
- {
- int _maxNbOfChilds = 0;
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
- {
- System.out.println(parentPath + " has " + currentChilds.size() + " messages");
- if (currentChilds.size() > _maxNbOfChilds)
- {
- _maxNbOfChilds = currentChilds.size();
- }
- }
-
- }
-
- @Test
- public void testBasic() throws Exception
- {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- int n = 2;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 32, // partitions per resource
- n, // number of nodes
- 2, // replicas
- "MasterSlave",
- true); // do rebalance
-
- // enable group message
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
- IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
- idealState.setGroupMessageMode(true);
- accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
- // registry a message listener so we know how many message generated
- TestZkChildListener listener = new TestZkChildListener();
- _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
-
-
- ClusterController controller =
- new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- // start participants
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
- Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
-
- // clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- controller.syncStop();
- for (int i = 0; i < n; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-
- // a non-group-message run followed by a group-message-enabled run
- @Test
- public void testChangeGroupMessageMode() throws Exception
- {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- int n = 2;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 32, // partitions per resource
- n, // number of nodes
- 2, // replicas
- "MasterSlave",
- true); // do rebalance
-
- ClusterController controller =
- new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- // start participants
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // stop all participants
- Thread.sleep(1000);
- for (int i = 0; i < n; i++)
- {
- participants[i].syncStop();
- }
-
- // enable group message
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
- IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
- idealState.setGroupMessageMode(true);
- accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
- // registry a message listener so we know how many message generated
- TestZkChildListener listener = new TestZkChildListener();
- _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
-
- // restart all participants
- for (int i = 0; i < n; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
- }
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
- Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
-
-
- // clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- controller.syncStop();
- for (int i = 0; i < n; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index 75a3146..f40cf09 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -125,9 +125,9 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase
DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
HelixTaskExecutor helixExecutor = svc.getExecutor();
- ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg"));
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._executorMap.get("TestMsg"));
- ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg2"));
+ ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._executorMap.get("TestMsg2"));
if(i != 0)
{
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 6ea0820..d84dd3c 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -445,7 +445,9 @@ public class TestHelixTaskExecutor
Thread.sleep(500);
for(int i = 0; i < nMsgs2; i++)
{
- executor.cancelTask(msgListToCancel.get(i), changeContext);
+ // executor.cancelTask(msgListToCancel.get(i), changeContext);
+ HelixTask task = new HelixTask(msgListToCancel.get(i), changeContext, null, null);
+ executor.cancelTask(task);
}
Thread.sleep(1500);
@@ -513,13 +515,13 @@ public class TestHelixTaskExecutor
NotificationContext changeContext = new NotificationContext(manager);
executor.onMessage("some", msgList, changeContext);
Thread.sleep(500);
- for(ExecutorService svc : executor._threadpoolMap.values())
+ for(ExecutorService svc : executor._executorMap.values())
{
Assert.assertFalse(svc.isShutdown());
}
Assert.assertTrue(factory._processedMsgIds.size() > 0);
- executor.shutDown();
- for(ExecutorService svc : executor._threadpoolMap.values())
+ executor.shutdown();
+ for(ExecutorService svc : executor._executorMap.values())
{
Assert.assertTrue(svc.isShutdown());
}
@@ -527,10 +529,10 @@ public class TestHelixTaskExecutor
}
@Test ()
- public void testRetryCount() throws InterruptedException
+ public void testNoRetry() throws InterruptedException
{
- String p = "test_";
- System.out.println(p.substring(p.lastIndexOf('_')+1));
+// String p = "test_";
+// System.out.println(p.substring(p.lastIndexOf('_')+1));
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
@@ -566,10 +568,29 @@ public class TestHelixTaskExecutor
AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
}
}
- factory.reset();
- msgList.clear();
+ }
+
+ @Test ()
+ public void testRetryOnce() throws InterruptedException
+ {
+// Logger.getRootLogger().setLevel(Level.INFO);
+
+// String p = "test_";
+// System.out.println(p.substring(p.lastIndexOf('_')+1));
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+
+ CancellableHandlerFactory factory = new CancellableHandlerFactory();
+ executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+
+ List<Message> msgList = new ArrayList<Message>();
+
+// factory.reset();
+// msgList.clear();
// Test the case that the message are executed for the second time
- nMsgs2 = 4;
+ int nMsgs2 = 4;
for(int i = 0; i < nMsgs2; i++)
{
Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index ca81836..be677be 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -60,7 +60,7 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase
DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
HelixTaskExecutor helixExecutor = svc.getExecutor();
- ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get(MessageType.STATE_TRANSITION + "." + "NextDB"));
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._executorMap.get(MessageType.STATE_TRANSITION + "." + "NextDB"));
Assert.assertEquals(12, executor.getMaximumPoolSize());
taskcount += executor.getCompletedTaskCount();
Assert.assertTrue(executor.getCompletedTaskCount() > 0);