You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/02/05 22:06:14 UTC

[1/3] HELIX-42: refactor batch message handling

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 66e7fa3..b0aa4ac 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -60,6 +60,7 @@ import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MessageTask;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
@@ -262,13 +263,14 @@ public class Mocks {
 		boolean completionInvoked = false;
 
 		@Override
-		protected void reportCompletion(Message message) {
-			System.out.println("Mocks.MockCMTaskExecutor.reportCompletion()");
+		public void finishTask(MessageTask task) {
+			System.out.println("Mocks.MockCMTaskExecutor.finishTask()");
 			completionInvoked = true;
 		}
 
+
 		public boolean isDone(String taskId) {
-			Future<HelixTaskResult> future = _taskMap.get(taskId);
+			Future<HelixTaskResult> future = _taskMap.get(taskId).getFuture();
 			if (future != null) {
 				return future.isDone();
 			}
@@ -790,5 +792,4 @@ public class Mocks {
 		}
 
 	}
-// >>>>>>> 5ef256eeced461eae733d568ad730aabeda3c0f2
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
index c63f99a..5bc1fc0 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.helix.Mocks.MockStateModel;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.messaging.handling.AsyncCallbackService;
 import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
+import org.apache.helix.messaging.handling.HelixTask;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
@@ -76,14 +77,15 @@ public class TestHelixTaskExecutor
         new HelixStateTransitionHandler(stateModel,
                                         message,
                                         context,
-                                        currentStateDelta,
-                                        executor);
+                                        currentStateDelta);
 
-    executor.scheduleTask(message, handler, context);
-    while (!executor.isDone(msgId + "/" + message.getPartitionName()))
-    {
-      Thread.sleep(500);
-    }
+	HelixTask task = new HelixTask(message, context, handler, executor);
+	executor.scheduleTask(task);
+	for (int i = 0; i < 10; i++) {
+		if (!executor.isDone(task.getTaskId())) {
+			Thread.sleep(500);
+		}
+	}
     AssertJUnit.assertTrue(stateModel.stateModelInvoked);
     System.out.println("END TestCMTaskExecutor");
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
index 95f7809..0910061 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
@@ -75,7 +75,7 @@ public class TestHelixTaskHandler
     currentStateDelta.setState("TestDB_0", "OFFLINE");
 
     HelixStateTransitionHandler stHandler = new HelixStateTransitionHandler(stateModel, message,
-        context, currentStateDelta, executor);
+        context, currentStateDelta);
     HelixTask handler;
     handler = new HelixTask(message, context, stHandler, executor);
     handler.call();
@@ -122,7 +122,7 @@ public class TestHelixTaskHandler
     currentStateDelta.setState("TestDB_0", "OFFLINE");
 
     HelixStateTransitionHandler stHandler = new HelixStateTransitionHandler(stateModel, message,
-        context, currentStateDelta, executor);
+        context, currentStateDelta);
 
     HelixTask handler = new HelixTask(message, context, stHandler, executor);
     handler.call();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
new file mode 100644
index 0000000..cc9a4ce
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
@@ -0,0 +1,298 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.mock.participant.MockParticipant.ErrTransition;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestBatchMessage extends ZkIntegrationTestBase
+{
+  class TestZkChildListener implements IZkChildListener
+  {
+    int _maxNbOfChilds = 0;
+    
+    @Override
+    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+    {
+      System.out.println(parentPath + " has " + currentChilds.size() + " messages");
+      if (currentChilds.size() > _maxNbOfChilds)
+      {
+        _maxNbOfChilds = currentChilds.size();
+      }
+    }
+    
+  }
+  
+  @Test
+  public void testBasic() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            32, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+    
+    // enable batch message
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setBatchMessageMode(true);
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    // registry a message listener so we know how many message generated
+    TestZkChildListener listener = new TestZkChildListener();
+    _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
+
+    
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+  
+  // a non-batch-message run followed by a batch-message-enabled run
+  @Test
+  public void testChangeBatchMessageMode() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            32, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+    
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    // stop all participants
+    Thread.sleep(1000);
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+    
+    // enable batch message
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setBatchMessageMode(true);
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    // registry a message listener so we know how many message generated
+    TestZkChildListener listener = new TestZkChildListener();
+    _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
+
+    // restart all participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+    
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
+
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testSubMsgExecutionFail() throws Exception
+  {
+	    String className = TestHelper.getTestClassName();
+	    String methodName = TestHelper.getTestMethodName();
+	    String clusterName = className + "_" + methodName;
+	    
+	    final int n = 5;
+	    MockParticipant[] participants = new MockParticipant[n];
+
+	    System.out.println("START " + clusterName + " at "
+	        + new Date(System.currentTimeMillis()));
+//	    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+
+	    TestHelper.setupCluster(clusterName,
+	                            ZK_ADDR,
+	                            12918,
+	                            "localhost",
+	                            "TestDB",
+	                            1,	// resource#
+	                            6,	// partition#
+	                            n,	// nodes#
+	                            3,  // replicas#
+	                            "MasterSlave",
+	                            true);
+
+	    // enable batch message
+	    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+	    Builder keyBuilder = accessor.keyBuilder();
+	    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+	    idealState.setBatchMessageMode(true);
+	    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+	    TestHelper.startController(clusterName,
+	                               "controller_0",
+	                               ZK_ADDR,
+	                               HelixControllerMain.STANDALONE);
+	    for (int i = 0; i < n; i++)
+	    {
+	      String instanceName = "localhost_" + (12918 + i);
+
+	      if (i == 1)
+	      {
+	        Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+	        errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+	        participants[i] =
+	            new MockParticipant(clusterName,
+	                                instanceName,
+	                                ZK_ADDR,
+	                                new ErrTransition(errPartitions));
+	      }
+	      else
+	      {
+	        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+	      }
+	      participants[i].syncStart();
+	    }
+
+	    Map<String, Map<String, String>> errStates =
+	        new HashMap<String, Map<String, String>>();
+	    errStates.put("TestDB0", new HashMap<String, String>());
+	    errStates.get("TestDB0").put("TestDB0_4", "localhost_12919");
+	    boolean result =
+	        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+	                                                                                          clusterName,
+	                                                                                          errStates));
+	    Assert.assertTrue(result);
+
+	    Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>();
+	    errorStateMap.put("TestDB0_4", TestHelper.setOf("localhost_12919"));
+
+	    // verify "TestDB0_4", "localhost_12919" is in ERROR state
+	    TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");
+	    
+	    System.out.println("END " + clusterName + " at "
+	            + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
deleted file mode 100644
index 83c0a78..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.List;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class TestGroupMessage extends ZkIntegrationTestBase
-{
-  class TestZkChildListener implements IZkChildListener
-  {
-    int _maxNbOfChilds = 0;
-    
-    @Override
-    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
-    {
-      System.out.println(parentPath + " has " + currentChilds.size() + " messages");
-      if (currentChilds.size() > _maxNbOfChilds)
-      {
-        _maxNbOfChilds = currentChilds.size();
-      }
-    }
-    
-  }
-  
-  @Test
-  public void testBasic() throws Exception
-  {
-    // Logger.getRootLogger().setLevel(Level.INFO);
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    int n = 2;
-
-    System.out.println("START " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-                            "localhost", // participant name prefix
-                            "TestDB", // resource name prefix
-                            1, // resources
-                            32, // partitions per resource
-                            n, // number of nodes
-                            2, // replicas
-                            "MasterSlave",
-                            true); // do rebalance
-    
-    // enable group message
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
-    idealState.setGroupMessageMode(true);
-    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
-    // registry a message listener so we know how many message generated
-    TestZkChildListener listener = new TestZkChildListener();
-    _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
-
-    
-    ClusterController controller =
-        new ClusterController(clusterName, "controller_0", ZK_ADDR);
-    controller.syncStart();
-
-    // start participants
-    MockParticipant[] participants = new MockParticipant[n];
-    for (int i = 0; i < n; i++)
-    {
-      String instanceName = "localhost_" + (12918 + i);
-
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
-      participants[i].syncStart();
-    }
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                 clusterName));
-    Assert.assertTrue(result);
-    Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
-    
-    // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
-    controller.syncStop();
-    for (int i = 0; i < n; i++)
-    {
-      participants[i].syncStop();
-    }
-
-    System.out.println("END " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-  }
-  
-  // a non-group-message run followed by a group-message-enabled run
-  @Test
-  public void testChangeGroupMessageMode() throws Exception
-  {
-    // Logger.getRootLogger().setLevel(Level.INFO);
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    int n = 2;
-
-    System.out.println("START " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-                            "localhost", // participant name prefix
-                            "TestDB", // resource name prefix
-                            1, // resources
-                            32, // partitions per resource
-                            n, // number of nodes
-                            2, // replicas
-                            "MasterSlave",
-                            true); // do rebalance
-    
-    ClusterController controller =
-        new ClusterController(clusterName, "controller_0", ZK_ADDR);
-    controller.syncStart();
-
-    // start participants
-    MockParticipant[] participants = new MockParticipant[n];
-    for (int i = 0; i < n; i++)
-    {
-      String instanceName = "localhost_" + (12918 + i);
-
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
-      participants[i].syncStart();
-    }
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                 clusterName));
-    Assert.assertTrue(result);
-    
-    // stop all participants
-    Thread.sleep(1000);
-    for (int i = 0; i < n; i++)
-    {
-      participants[i].syncStop();
-    }
-    
-    // enable group message
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
-    idealState.setGroupMessageMode(true);
-    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
-    // registry a message listener so we know how many message generated
-    TestZkChildListener listener = new TestZkChildListener();
-    _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
-
-    // restart all participants
-    for (int i = 0; i < n; i++)
-    {
-      String instanceName = "localhost_" + (12918 + i);
-
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
-      participants[i].syncStart();
-    }
-    
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                 clusterName));
-    Assert.assertTrue(result);
-    Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
-
-    
-    // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
-    controller.syncStop();
-    for (int i = 0; i < n; i++)
-    {
-      participants[i].syncStop();
-    }
-
-    System.out.println("END " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index 75a3146..f40cf09 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -125,9 +125,9 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase
       
       DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
       HelixTaskExecutor helixExecutor = svc.getExecutor();
-      ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg"));
+      ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._executorMap.get("TestMsg"));
       
-      ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg2"));
+      ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._executorMap.get("TestMsg2"));
       if(i != 0)
       {
         

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 6ea0820..d84dd3c 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -445,7 +445,9 @@ public class TestHelixTaskExecutor
     Thread.sleep(500);
     for(int i = 0; i < nMsgs2; i++)
     {
-      executor.cancelTask(msgListToCancel.get(i), changeContext);
+      // executor.cancelTask(msgListToCancel.get(i), changeContext);
+      HelixTask task = new HelixTask(msgListToCancel.get(i), changeContext, null, null);
+      executor.cancelTask(task);
     }
     Thread.sleep(1500);
 
@@ -513,13 +515,13 @@ public class TestHelixTaskExecutor
       NotificationContext changeContext = new NotificationContext(manager);
       executor.onMessage("some", msgList, changeContext);
       Thread.sleep(500);
-      for(ExecutorService svc : executor._threadpoolMap.values())
+      for(ExecutorService svc : executor._executorMap.values())
       {
         Assert.assertFalse(svc.isShutdown());
       }
       Assert.assertTrue(factory._processedMsgIds.size() > 0);
-      executor.shutDown();
-      for(ExecutorService svc : executor._threadpoolMap.values())
+      executor.shutdown();
+      for(ExecutorService svc : executor._executorMap.values())
       {
         Assert.assertTrue(svc.isShutdown());
       }
@@ -527,10 +529,10 @@ public class TestHelixTaskExecutor
   }
   
   @Test ()
-  public void testRetryCount() throws InterruptedException
+  public void testNoRetry() throws InterruptedException
   {
-    String p = "test_";
-    System.out.println(p.substring(p.lastIndexOf('_')+1));
+//    String p = "test_";
+//    System.out.println(p.substring(p.lastIndexOf('_')+1));
     HelixTaskExecutor executor = new HelixTaskExecutor();
     HelixManager manager = new MockClusterManager();
 
@@ -566,10 +568,29 @@ public class TestHelixTaskExecutor
         AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
       }
     }
-    factory.reset();
-    msgList.clear();
+  }
+  
+  @Test ()
+  public void testRetryOnce() throws InterruptedException
+  {
+//	  Logger.getRootLogger().setLevel(Level.INFO);
+
+//    String p = "test_";
+//    System.out.println(p.substring(p.lastIndexOf('_')+1));
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    CancellableHandlerFactory factory = new CancellableHandlerFactory();
+    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+
+    List<Message> msgList = new ArrayList<Message>();
+
+//    factory.reset();
+//    msgList.clear();
     // Test the case that the message are executed for the second time
-    nMsgs2 = 4;
+    int nMsgs2 = 4;
     for(int i = 0; i < nMsgs2; i++)
     {
       Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index ca81836..be677be 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -60,7 +60,7 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase
       
       DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
       HelixTaskExecutor helixExecutor = svc.getExecutor();
-      ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get(MessageType.STATE_TRANSITION + "." + "NextDB"));
+      ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._executorMap.get(MessageType.STATE_TRANSITION + "." + "NextDB"));
       Assert.assertEquals(12, executor.getMaximumPoolSize());
       taskcount += executor.getCompletedTaskCount();
       Assert.assertTrue(executor.getCompletedTaskCount() > 0);