You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC

[34/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
new file mode 100644
index 0000000..9bf79b8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockJobIntf;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.CustomCodeCallbackHandler;
+import org.apache.helix.participant.HelixCustomCodeRunner;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase
+{
+  private final String _clusterName = "CLUSTER_" + getShortClassName();
+  private final int _nodeNb = 5;
+  private final int _startPort = 12918;
+  private final MockCallback _callback = new MockCallback();
+
+  class MockCallback implements CustomCodeCallbackHandler
+  {
+    boolean _isCallbackInvoked;
+
+    @Override
+    public void onCallback(NotificationContext context)
+    {
+      HelixManager manager = context.getManager();
+      Type type = context.getType();
+      _isCallbackInvoked = true;
+//      System.out.println(type + ": TestCallback invoked on " + manager.getInstanceName());
+    }
+
+  }
+
+  class MockJob implements MockJobIntf
+  {
+    @Override
+    public void doPreConnectJob(HelixManager manager)
+    {
+      try
+      {
+        // delay the start of the 1st participant
+        //  so there will be a leadership transfer from localhost_12919 to 12918
+        if (manager.getInstanceName().equals("localhost_12918"))
+        {
+          Thread.sleep(2000);
+        }
+
+        HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
+        customCodeRunner.invoke(_callback)
+                         .on(ChangeType.LIVE_INSTANCE)
+                         .usingLeaderStandbyModel("TestParticLeader")
+                         .start();
+      } catch (Exception e)
+      {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void doPostConnectJob(HelixManager manager)
+    {
+      // TODO Auto-generated method stub
+
+    }
+
+  }
+
+  @Test
+  public void testCustomCodeRunner() throws Exception
+  {
+    System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(_clusterName,
+                            ZK_ADDR,
+                            _startPort,
+                            "localhost",  // participant name prefix
+                            "TestDB",     // resource name prefix
+                            1,  // resourceNb
+                            5,  // partitionNb
+                            _nodeNb,  // nodesNb
+                            _nodeNb,  // replica
+                            "MasterSlave",
+                            true);
+
+    TestHelper.startController(_clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+
+    MockParticipant[] partics = new MockParticipant[5];
+    for (int i = 0; i < _nodeNb; i++)
+    {
+      String instanceName =  "localhost_" + (_startPort + i);
+
+      partics[i] = new MockParticipant(_clusterName, instanceName, ZK_ADDR,
+                        null, new MockJob());
+      partics[i].syncStart();
+//      new Thread(partics[i]).start();
+    }
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
+    Assert.assertTrue(result);
+
+    Thread.sleep(1000);  // wait for the INIT type callback to finish
+    Assert.assertTrue(_callback._isCallbackInvoked);
+    _callback._isCallbackInvoked = false;
+
+    // add a new live instance
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    LiveInstance newLiveIns = new LiveInstance("newLiveInstance");
+    newLiveIns.setHelixVersion("0.0.0");
+    newLiveIns.setSessionId("randomSessionId");
+    accessor.setProperty(keyBuilder.liveInstance("newLiveInstance"), newLiveIns);
+
+    Thread.sleep(1000);  // wait for the CALLBACK type callback to finish
+    Assert.assertTrue(_callback._isCallbackInvoked);
+
+    System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestHelixUsingDifferentParams.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixUsingDifferentParams.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixUsingDifferentParams.java
new file mode 100644
index 0000000..dd74b68
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixUsingDifferentParams.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+import org.testng.annotations.Test;
+
+public class TestHelixUsingDifferentParams extends ZkIntegrationTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestHelixUsingDifferentParams.class);
+
+  @Test()
+  public void testCMUsingDifferentParams() throws Exception
+  {
+    System.out.println("START " + getShortClassName() + " at "
+        + new Date(System.currentTimeMillis()));
+
+    int numResourceArray[] = new int[] { 1 }; // , 2}; // , 3, 6};
+    int numPartitionsPerResourceArray[] = new int[] { 10 }; // , 20, 50, 100}; // ,
+                                                        // 1000};
+    int numInstances[] = new int[] { 5 }; // , 10}; // , 50, 100, 1000};
+    int replicas[] = new int[] { 2 }; // , 3}; //, 4, 5};
+
+    for (int numResources : numResourceArray)
+    {
+      for (int numPartitionsPerResource : numPartitionsPerResourceArray)
+      {
+        for (int numInstance : numInstances)
+        {
+          for (int replica : replicas)
+          {
+            String uniqClusterName = "TestDiffParam_" + "rg" + numResources + "_p"
+                + numPartitionsPerResource + "_n" + numInstance + "_r" + replica;
+            System.out.println("START " + uniqClusterName + " at "
+                + new Date(System.currentTimeMillis()));
+
+            TestDriver.setupCluster(uniqClusterName, ZK_ADDR, numResources,
+                numPartitionsPerResource, numInstance, replica);
+
+            for (int i = 0; i < numInstance; i++)
+            {
+              TestDriver.startDummyParticipant(uniqClusterName, i);
+            }
+
+            TestDriver.startController(uniqClusterName);
+            TestDriver.verifyCluster(uniqClusterName, 1000, 50 * 1000);
+            TestDriver.stopCluster(uniqClusterName);
+
+            System.out.println("END " + uniqClusterName + " at "
+                + new Date(System.currentTimeMillis()));
+          }
+        }
+      }
+    }
+
+    System.out
+        .println("END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
new file mode 100644
index 0000000..60cfe7b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
@@ -0,0 +1,78 @@
+package org.apache.helix.integration;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestMessagePartitionStateMismatch extends ZkStandAloneCMTestBase
+{
+  @Test
+  public void testStateMismatch() throws InterruptedException
+  {
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    
+    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder kb = accessor.keyBuilder();
+    ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
+    Map<String, LiveInstance> liveinstanceMap = accessor.getChildValuesMap(accessor.keyBuilder().liveInstances());
+    
+    for(String instanceName : liveinstanceMap.keySet())
+    {
+      String sessionid = liveinstanceMap.get(instanceName).getSessionId();
+      for(String partition : ev.getPartitionSet())
+      {
+        if(ev.getStateMap(partition).containsKey(instanceName))
+        {
+          String uuid = UUID.randomUUID().toString();
+          Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+          boolean rand = new Random().nextInt(10) > 5;
+          if(ev.getStateMap(partition).get(instanceName).equals("MASTER"))
+          {
+            message.setSrcName(manager.getInstanceName());
+            message.setTgtName(instanceName);
+            message.setMsgState(MessageState.NEW);
+            message.setPartitionName(partition);
+            message.setResourceName(TEST_DB);
+            message.setFromState(rand ? "SLAVE" : "OFFLINE");
+            message.setToState(rand ? "MASTER" : "SLAVE");
+            message.setTgtSessionId(sessionid);
+            message.setSrcSessionId(manager.getSessionId());
+            message.setStateModelDef("MasterSlave");
+            message.setStateModelFactoryName("DEFAULT"); 
+          }
+          else if (ev.getStateMap(partition).get(instanceName).equals("SLAVE"))
+          {
+            message.setSrcName(manager.getInstanceName());
+            message.setTgtName(instanceName);
+            message.setMsgState(MessageState.NEW);
+            message.setPartitionName(partition);
+            message.setResourceName(TEST_DB);
+            message.setFromState(rand ? "MASTER" : "OFFLINE");
+            message.setToState(rand ? "SLAVE" : "SLAVE");
+            message.setTgtSessionId(sessionid);
+            message.setSrcSessionId(manager.getSessionId());
+            message.setStateModelDef("MasterSlave");
+            message.setStateModelFactoryName("DEFAULT");
+          }
+          accessor.setProperty(accessor.keyBuilder().message(instanceName, message.getMsgId()), message);
+        }
+      }
+    }
+    Thread.sleep(3000);
+    ExternalView ev2 = accessor.getProperty(kb.externalView(TEST_DB));
+    Assert.assertTrue(ev.equals(ev2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
new file mode 100644
index 0000000..45e948f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
@@ -0,0 +1,144 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestMessageThrottle extends ZkIntegrationTestBase
+{
+  @Test()
+  public void testMessageThrottle() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+
+    String clusterName = getShortClassName();
+    MockParticipant[] participants = new MockParticipant[5];
+    // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
+                                                         // port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // setup message constraint
+    // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,CONSTRAINT_VALUE=1";
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    Map<String, String> constraints = new TreeMap<String, String>();
+    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+    // constraints.put("TRANSITION", "OFFLINE-SLAVE");
+    constraints.put("CONSTRAINT_VALUE", "1");
+    constraints.put("INSTANCE", ".*");
+    admin.addMessageConstraint(clusterName, "constraint1", constraints);
+    
+
+    final ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+
+    // make sure we never see more than 1 state transition message for each participant
+    final AtomicBoolean success = new AtomicBoolean(true);
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+      String msgPath =
+          PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, instanceName);
+
+      _gZkClient.subscribeChildChanges(msgPath, new IZkChildListener()
+      {
+
+        @Override
+        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+        {
+          if (currentChilds != null && currentChilds.size() > 1)
+          {
+            List<ZNRecord> records = accessor.getBaseDataAccessor().getChildren(parentPath, null, 0);
+            int transitionMsgCount = 0;
+            for (ZNRecord record : records)
+            {
+              Message msg = new Message(record);
+              if(msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()))
+              {
+                transitionMsgCount++;
+              }
+            }
+
+            if (transitionMsgCount > 1)
+            {
+              success.set(false);
+              Assert.fail("Should not see more than 1 message");
+            }
+          }
+          
+          
+        }
+      });
+    }
+
+    TestHelper.startController(clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+                                                                              clusterName));
+    Assert.assertTrue(result);
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    Assert.assertTrue(success.get());
+    
+
+    // clean up
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
new file mode 100644
index 0000000..3dfb55d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -0,0 +1,478 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.HashSet;
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.Criteria.DataSource;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+  public static class TestMessagingHandlerFactory implements
+      MessageHandlerFactory
+  {
+    public static HashSet<String> _processedMsgIds = new HashSet<String>();
+
+    @Override
+    public MessageHandler createHandler(Message message,
+        NotificationContext context)
+    {
+      return new TestMessagingHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType()
+    {
+      return "TestExtensibility";
+    }
+
+    @Override
+    public void reset()
+    {
+      // TODO Auto-generated method stub
+
+    }
+
+    public static class TestMessagingHandler extends MessageHandler
+    {
+      public TestMessagingHandler(Message message, NotificationContext context)
+      {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException
+      {
+        HelixTaskResult result = new HelixTaskResult();
+        result.setSuccess(true);
+        Thread.sleep(1000);
+        System.out.println("TestMessagingHandler " + _message.getMsgId());
+        _processedMsgIds.add(_message.getRecord().getSimpleField(
+            "TestMessagingPara"));
+        result.getTaskResultMap().put("ReplyMessage", "TestReplyMessage");
+        return result;
+      }
+
+
+      @Override
+      public void onError( Exception e, ErrorCode code, ErrorType type)
+      {
+        // TODO Auto-generated method stub
+
+      }
+    }
+  }
+
+  @Test()
+  public void TestMessageSimpleSend() throws Exception
+  {
+    String hostSrc = "localhost_" + START_PORT;
+    String hostDest = "localhost_" + (START_PORT + 1);
+
+    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+    _startCMResultMap.get(hostDest)._manager.getMessagingService()
+        .registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(factory.getMessageType(),msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(hostSrc);
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+
+    int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    AssertJUnit.assertTrue(nMsgs == 1);
+    Thread.sleep(2500);
+    // Thread.currentThread().join();
+    AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds
+        .contains(para));
+    
+    cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setDataSource(DataSource.IDEALSTATES);
+
+    nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    AssertJUnit.assertTrue(nMsgs == 1);
+    Thread.sleep(2500);
+    // Thread.currentThread().join();
+    AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds
+        .contains(para));
+
+  }
+
+  public static class MockAsyncCallback extends AsyncCallback
+  {
+
+    public MockAsyncCallback()
+    {
+    }
+
+    @Override
+    public void onTimeOut()
+    {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void onReplyMessage(Message message)
+    {
+      // TODO Auto-generated method stub
+
+    }
+
+  }
+
+  public static class TestAsyncCallback extends AsyncCallback
+  {
+    public TestAsyncCallback(long timeout)
+    {
+      super(timeout);
+    }
+
+    static HashSet<String> _replyedMessageContents = new HashSet<String>();
+    public boolean timeout = false;
+
+    @Override
+    public void onTimeOut()
+    {
+      timeout = true;
+    }
+
+    @Override
+    public void onReplyMessage(Message message)
+    {
+      // TODO Auto-generated method stub
+      System.out.println("OnreplyMessage: "
+          + message.getRecord()
+              .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+              .get("ReplyMessage"));
+      if(message.getRecord()
+          .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+          .get("ReplyMessage") == null)
+      {
+        int x = 0;
+        x++;
+      }
+      _replyedMessageContents.add(message.getRecord()
+          .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+          .get("ReplyMessage"));
+    }
+
+  }
+
+  @Test()
+  public void TestMessageSimpleSendReceiveAsync() throws Exception
+  {
+    String hostSrc = "localhost_" + START_PORT;
+    String hostDest = "localhost_" + (START_PORT + 1);
+
+    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+    _startCMResultMap.get(hostDest)._manager.getMessagingService()
+        .registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(factory.getMessageType(),msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(hostSrc);
+
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+
+    TestAsyncCallback callback = new TestAsyncCallback(60000);
+
+    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+
+    Thread.sleep(2000);
+    // Thread.currentThread().join();
+    AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents
+        .contains("TestReplyMessage"));
+    AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
+
+    TestAsyncCallback callback2 = new TestAsyncCallback(500);
+    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+
+    Thread.sleep(3000);
+    // Thread.currentThread().join();
+    AssertJUnit.assertTrue(callback2.isTimedOut());
+    
+    cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setDataSource(DataSource.IDEALSTATES);
+
+    callback = new TestAsyncCallback(60000);
+
+    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+
+    Thread.sleep(2000);
+    // Thread.currentThread().join();
+    AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents
+        .contains("TestReplyMessage"));
+    AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
+
+    callback2 = new TestAsyncCallback(500);
+    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+
+    Thread.sleep(3000);
+    // Thread.currentThread().join();
+    AssertJUnit.assertTrue(callback2.isTimedOut());
+
+  }
+
+  @Test()
+  public void TestBlockingSendReceive() throws Exception
+  {
+    String hostSrc = "localhost_" + START_PORT;
+    String hostDest = "localhost_" + (START_PORT + 1);
+
+    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+    _startCMResultMap.get(hostDest)._manager.getMessagingService()
+        .registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(factory.getMessageType(),msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(hostSrc);
+
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(hostDest);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+
+    AsyncCallback asyncCallback = new MockAsyncCallback();
+    int messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, asyncCallback, 60000);
+
+    AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
+        .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+        .get("ReplyMessage").equals("TestReplyMessage"));
+    AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
+
+
+    AsyncCallback asyncCallback2 = new MockAsyncCallback();
+    messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, asyncCallback2, 500);
+    AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
+
+  }
+
+  @Test()
+  public void TestMultiMessageCriteria() throws Exception
+  {
+    String hostSrc = "localhost_" + START_PORT;
+
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(factory.getMessageType(), factory);
+    }
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(
+        new TestMessagingHandlerFactory().getMessageType(),msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(hostSrc);
+
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName("%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    AsyncCallback callback1 = new MockAsyncCallback();
+    int messageSent1 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback1, 2000);
+
+    AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
+        .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+        .get("ReplyMessage").equals("TestReplyMessage"));
+    AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
+
+    AsyncCallback callback2 = new MockAsyncCallback();
+    int messageSent2 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback2, 500);
+    AssertJUnit.assertTrue(callback2.isTimedOut());
+
+    cr.setPartition("TestDB_17");
+    AsyncCallback callback3 = new MockAsyncCallback();
+    int messageSent3 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback3, 2000);
+    AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
+
+
+    cr.setPartition("TestDB_15");
+    AsyncCallback callback4 = new MockAsyncCallback();
+    int messageSent4 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback4, 2000);
+    AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
+    
+    cr.setPartitionState("SLAVE");
+    AsyncCallback callback5 = new MockAsyncCallback();
+    int messageSent5 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback5, 2000);
+    AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
+    
+    cr.setDataSource(DataSource.IDEALSTATES);
+    AsyncCallback callback6 = new MockAsyncCallback();
+    int messageSent6 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback6, 2000);
+    AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
+  }
+
+  @Test()
+  public void sendSelfMsg()
+  {
+    String hostSrc = "localhost_" + START_PORT;
+
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(factory.getMessageType(), factory);
+    }
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(
+        new TestMessagingHandlerFactory().getMessageType(),msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(hostSrc);
+
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName("%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setSelfExcluded(false);
+    AsyncCallback callback1 = new MockAsyncCallback();
+    int messageSent1 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback1, 3000);
+
+    AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
+    AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
+                           .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+                           .get("ReplyMessage").equals("TestReplyMessage"));
+  }
+
+  @Test()
+  public void TestControllerMessage() throws Exception
+  {
+    String hostSrc = "localhost_" + START_PORT;
+
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(factory.getMessageType(), factory);
+    }
+    String msgId = new UUID(123, 456).toString();
+    Message msg = new Message(MessageType.CONTROLLER_MSG,msgId);
+    msg.setMsgId(msgId);
+    msg.setSrcName(hostSrc);
+
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+    String para = "Testing messaging para";
+    msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName("*");
+    cr.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr.setSessionSpecific(false);
+
+    AsyncCallback callback1 = new MockAsyncCallback();
+    int messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback1, 2000);
+
+    AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
+        .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+        .get("ControllerResult").indexOf(hostSrc) != -1);
+    AssertJUnit.assertTrue(callback1.getMessageReplied().size() == 1);
+
+    msgId = UUID.randomUUID().toString();
+    msg.setMsgId(msgId);
+    cr.setPartition("TestDB_17");
+    AsyncCallback callback2 = new MockAsyncCallback();
+    messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback2, 2000);
+    AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
+        .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+        .get("ControllerResult").indexOf(hostSrc) != -1);
+
+    AssertJUnit.assertTrue(callback2.getMessageReplied().size() == 1);
+
+    msgId = UUID.randomUUID().toString();
+    msg.setMsgId(msgId);
+    cr.setPartitionState("SLAVE");
+    AsyncCallback callback3 = new MockAsyncCallback();
+    messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+        .sendAndWait(cr, msg, callback3, 2000);
+    AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
+        .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+        .get("ControllerResult").indexOf(hostSrc) != -1);
+
+    AssertJUnit.assertTrue(callback3.getMessageReplied().size() == 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
new file mode 100644
index 0000000..636dd93
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -0,0 +1,113 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.MockBootstrapModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestNonOfflineInitState extends ZkIntegrationTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestNonOfflineInitState.class);
+
+  @Test
+  public void testNonOfflineInitState() throws Exception
+  {
+    System.out.println("START testNonOfflineInitState at "
+        + new Date(System.currentTimeMillis()));
+    String clusterName = getShortClassName();
+
+    setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                 "localhost", // participant name prefix
+                 "TestDB", // resource name prefix
+                 1, // resources
+                 10, // partitions per resource
+                 5, // number of nodes
+                 1, // replicas
+                 "Bootstrap",
+                 true); // do rebalance
+
+    TestHelper.startController(clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[5];
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+
+      // add a state model with non-OFFLINE initial state
+      StateMachineEngine stateMach = participants[i].getManager().getStateMachineEngine();
+      MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
+      stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
+
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    System.out.println("END testNonOfflineInitState at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  private static void setupCluster(String clusterName,
+                                   String ZkAddr,
+                                   int startPort,
+                                   String participantNamePrefix,
+                                   String resourceNamePrefix,
+                                   int resourceNb,
+                                   int partitionNb,
+                                   int nodesNb,
+                                   int replica,
+                                   String stateModelDef,
+                                   boolean doRebalance) throws Exception
+  {
+    ZkClient zkClient = new ZkClient(ZkAddr);
+    if (zkClient.exists("/" + clusterName))
+    {
+      LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
+      zkClient.deleteRecursive("/" + clusterName);
+    }
+
+    ClusterSetup setupTool = new ClusterSetup(ZkAddr);
+    setupTool.addCluster(clusterName, true);
+    setupTool.addStateModelDef(clusterName,
+                               "Bootstrap",
+                               TestHelper.generateStateModelDefForBootstrap());
+
+    for (int i = 0; i < nodesNb; i++)
+    {
+      int port = startPort + i;
+      setupTool.addInstanceToCluster(clusterName, participantNamePrefix + ":" + port);
+    }
+
+    for (int i = 0; i < resourceNb; i++)
+    {
+      String dbName = resourceNamePrefix + i;
+      setupTool.addResourceToCluster(clusterName, dbName, partitionNb, stateModelDef);
+      if (doRebalance)
+      {
+        setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+      }
+    }
+    zkClient.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
new file mode 100644
index 0000000..33dd30c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
@@ -0,0 +1,78 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestNullReplica extends ZkIntegrationTestBase
+{
+
+  @Test
+  public void testNullReplica() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[5];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+    // set replica in ideal state to null
+    String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, "TestDB0");
+    ZNRecord idealState = _gZkClient.readData(idealStatePath);
+    idealState.getSimpleFields().remove(IdealState.IdealStateProperty.REPLICAS.toString());
+    _gZkClient.writeData(idealStatePath, idealState);
+    
+    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+    
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // clean up
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+    
+    Thread.sleep(2000);
+    controller.syncStop();
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
new file mode 100644
index 0000000..02103e3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
@@ -0,0 +1,76 @@
+package org.apache.helix.integration;
+
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory;
+import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase
+{
+  @Test()
+  public void TestParticipantErrorMessageSend()
+  {
+    String participant1 = "localhost_" + START_PORT;
+    String participant2 = "localhost_" + (START_PORT + 1);
+    
+    Message errorMessage1
+      = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
+    errorMessage1.setTgtSessionId("*");
+    errorMessage1.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_INSTANCE.toString());
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
+    recipientCriteria.setSessionSpecific(false);
+    _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria, errorMessage1);
+    
+    Message errorMessage2
+    = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
+    errorMessage2.setTgtSessionId("*");
+    errorMessage2.setResourceName("TestDB");
+    errorMessage2.setPartitionName("TestDB_14");
+    errorMessage2.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_PARTITION.toString());
+    Criteria recipientCriteria2 = new Criteria();
+    recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
+    recipientCriteria2.setSessionSpecific(false);
+    _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2, errorMessage2);
+  
+    try
+    {
+      Thread.sleep(1500);
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 CLUSTER_NAME));
+    Assert.assertTrue(result);
+    Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder();
+    ExternalView externalView = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
+    
+    for(String partitionName : externalView.getRecord().getMapFields().keySet())
+    {
+      for(String hostName :  externalView.getRecord().getMapField(partitionName).keySet())
+      {
+        if(hostName.equals(participant1))
+        {
+          Assert.assertTrue(externalView.getRecord().getMapField(partitionName).get(hostName).equalsIgnoreCase("OFFLINE"));
+        }
+      }
+    }
+    Assert.assertTrue(externalView.getRecord().getMapField("TestDB_14").get(participant2).equalsIgnoreCase("OFFLINE"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
new file mode 100644
index 0000000..9715380
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.log4j.Logger;
+import org.testng.annotations.Test;
+
+
+public class TestParticipantNameCollision extends ZkStandAloneCMTestBase
+{
+  private static Logger logger = Logger.getLogger(TestParticipantNameCollision.class);
+
+  @Test()
+  public void testParticiptantNameCollision() throws Exception
+  {
+    logger.info("RUN TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
+
+    StartCMResult result = null;
+    for (int i = 0; i < 1; i++)
+    {
+      String instanceName = "localhost_" + (START_PORT + i);
+      try
+      {
+        // the call fails on getClusterManagerForParticipant()
+        // no threads start
+        result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+      }
+      catch (Exception e)
+      {
+        e.printStackTrace();
+      }
+    }
+
+
+    Thread.sleep(30000);
+    TestHelper.verifyWithTimeout("verifyNotConnected", result._manager);
+
+    logger.info("STOP TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
new file mode 100644
index 0000000..0c00912
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
@@ -0,0 +1,121 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestPauseSignal extends ZkIntegrationTestBase
+{
+  @Test()
+  public void testPauseSignal() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[5];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // pause the cluster and make sure pause is persistent
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+    final HelixDataAccessor tmpAccessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+    
+    String cmd = "-zkSvr " + ZK_ADDR + " -enableCluster " + clusterName + " false";
+    ClusterSetup.processCommandLineArgs(cmd.split(" "));
+    
+    tmpAccessor.setProperty(tmpAccessor.keyBuilder().pause(), new PauseSignal("pause"));
+    zkClient.close();
+    
+    // wait for controller to be signaled by pause
+    Thread.sleep(1000);
+
+    // add a new resource group
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addResourceToCluster(clusterName, "TestDB1", 10, "MasterSlave");
+    setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
+
+    // make sure TestDB1 external view is empty
+    TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
+                                 1000,
+                                 clusterName,
+                                 "TestDB1",
+                                 TestHelper.<String> setOf("localhost_12918",
+                                                           "localhost_12919",
+                                                           "localhost_12920",
+                                                           "localhost_12921",
+                                                           "localhost_12922"),
+                                 ZK_ADDR);
+
+    // resume controller
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+
+    cmd = "-zkSvr " + ZK_ADDR + " -enableCluster " + clusterName + " true";
+    ClusterSetup.processCommandLineArgs(cmd.split(" "));
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // clean up
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    Thread.sleep(2000);
+    controller.syncStop();
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
new file mode 100644
index 0000000..feb487b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestRenamePartition extends ZkIntegrationTestBase
+{
+  @Test()
+  public void testRenamePartitionAutoIS() throws Exception
+  {
+    String clusterName = "CLUSTER_" + getShortClassName() + "_auto";
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave", true); // do rebalance
+
+
+    startAndVerify(clusterName);
+    
+    // rename partition name TestDB0_0 tp TestDB0_100
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    
+    List<String> prioList = idealState.getRecord().getListFields().remove("TestDB0_0");
+    idealState.getRecord().getListFields().put("TestDB0_100", prioList);
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+  
+  @Test()
+  public void testRenamePartitionCustomIS() throws Exception
+  {
+    
+    String clusterName = "CLUSTER_" + getShortClassName() + "_custom";
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave", false); // do rebalance
+
+    // calculate idealState
+    List<String> instanceNames = Arrays.asList("localhost_12918", "localhost_12919", "localhost_12920",
+        "localhost_12921", "localhost_12922");
+    ZNRecord destIS = IdealStateCalculatorForStorageNode.calculateIdealState(instanceNames,
+        10, 3-1, "TestDB0", "MASTER", "SLAVE");
+    IdealState idealState = new IdealState(destIS);
+    idealState.setIdealStateMode("CUSTOMIZED");
+    idealState.setReplicas("3");
+    idealState.setStateModelDefRef("MasterSlave");
+    
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    startAndVerify(clusterName);
+    
+    Map<String, String> stateMap = idealState.getRecord().getMapFields().remove("TestDB0_0");
+    idealState.getRecord().getMapFields().put("TestDB0_100", stateMap);
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+  
+  private void startAndVerify(String clusterName) throws Exception
+  {
+    MockParticipant[] participants = new MockParticipant[5];
+
+    TestHelper.startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+//      new Thread(participants[i]).start();
+    }
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
new file mode 100644
index 0000000..24ee3a3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
@@ -0,0 +1,113 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestResetInstance extends ZkIntegrationTestBase
+{
+
+  @Test
+  public void testResetInstance() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
+    {
+      {
+        put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
+      }
+    };
+
+    // start mock participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (i == 0)
+      {
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                new ErrTransition(errPartitions));
+      }
+      else
+      {
+        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      }
+      participants[i].syncStart();
+    }
+
+    // verify cluster
+    Map<String, Map<String, String>> errStateMap =
+        new HashMap<String, Map<String, String>>();
+    errStateMap.put("TestDB0", new HashMap<String, String>());
+    errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+    errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                       clusterName,
+                                                                                                       errStateMap)));
+    Assert.assertTrue(result, "Cluster verification fails");
+    
+    // reset node "localhost_12918"
+    participants[0].setTransition(null);
+    String command =
+        "--zkSvr " + ZK_ADDR + " --resetInstance " + clusterName
+            + " localhost_12918";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                       clusterName)));
+    Assert.assertTrue(result, "Cluster verification fails");
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
new file mode 100644
index 0000000..4860778
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -0,0 +1,224 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestResetPartitionState extends ZkIntegrationTestBase
+{
+  int _errToOfflineInvoked = 0;
+
+  class ErrTransitionWithResetCnt extends ErrTransition
+  {
+    public ErrTransitionWithResetCnt(Map<String, Set<String>> errPartitions)
+    {
+      super(errPartitions);
+    }
+
+    @Override
+    public void doTransition(Message message, NotificationContext context)
+    {
+      // System.err.println("doReset() invoked");
+      super.doTransition(message, context);
+      String fromState = message.getFromState();
+      String toState = message.getToState();
+      if (fromState.equals("ERROR") && toState.equals("OFFLINE"))
+      {
+        _errToOfflineInvoked++;
+      }
+    }
+
+  }
+
+  @Test()
+  public void testResetPartitionState() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
+    {
+      {
+        put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
+      }
+    };
+
+    // start mock participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (i == 0)
+      {
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                new ErrTransition(errPartitions));
+      }
+      else
+      {
+        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      }
+      participants[i].syncStart();
+    }
+
+    // verify cluster
+    Map<String, Map<String, String>> errStateMap =
+        new HashMap<String, Map<String, String>>();
+    errStateMap.put("TestDB0", new HashMap<String, String>());
+    errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+    errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                       clusterName,
+                                                                                                       errStateMap)));
+    Assert.assertTrue(result, "Cluster verification fails");
+
+    // reset a non-exist partition, should throw exception
+    try
+    {
+      String command =
+          "--zkSvr " + ZK_ADDR + " --resetPartition " + clusterName
+              + " localhost_12918 TestDB0 TestDB0_nonExist";
+      ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+      Assert.fail("Should throw exception on reset a non-exist partition");
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+
+    // reset one error partition
+    errPartitions.remove("SLAVE-MASTER");
+    participants[0].setTransition(new ErrTransitionWithResetCnt(errPartitions));
+    clearStatusUpdate(clusterName, "localhost_12918", "TestDB0", "TestDB0_4");
+    _errToOfflineInvoked = 0;
+    String command =
+        "--zkSvr " + ZK_ADDR + " --resetPartition " + clusterName
+            + " localhost_12918 TestDB0 TestDB0_4";
+
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    Thread.sleep(200); // wait reset to be done
+    try
+    {
+      ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+      Assert.fail("Should throw exception on reset a partition not in ERROR state");
+    }
+    catch (Exception e)
+    {
+      // OK
+    }
+
+    errStateMap.get("TestDB0").remove("TestDB0_4");
+    result =
+        ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                   clusterName,
+                                                                                                   errStateMap)));
+    Assert.assertTrue(result, "Cluster verification fails");
+    Assert.assertEquals(_errToOfflineInvoked, 1);
+
+    // reset the other error partition
+    participants[0].setTransition(new ErrTransitionWithResetCnt(null));
+    clearStatusUpdate(clusterName, "localhost_12918", "TestDB0", "TestDB0_8");
+
+    command =
+        "--zkSvr " + ZK_ADDR + " --resetPartition " + clusterName
+            + " localhost_12918 TestDB0 TestDB0_8";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                   clusterName));
+    Assert.assertTrue(result, "Cluster verification fails");
+    Assert.assertEquals(_errToOfflineInvoked, 2, "Should reset 2 partitions");
+
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+  private void clearStatusUpdate(String clusterName,
+                                 String instance,
+                                 String resource,
+                                 String partition)
+  {
+    // clear status update for error partition so verify() will not fail on old
+    // errors
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instance));
+    accessor.removeProperty(keyBuilder.stateTransitionStatus(instance,
+                                                             liveInstance.getSessionId(),
+                                                             resource,
+                                                             partition));
+
+  }
+  // TODO: throw exception in reset()
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
new file mode 100644
index 0000000..1d199d3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
@@ -0,0 +1,112 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestResetResource extends ZkIntegrationTestBase
+{
+  @Test
+  public void testResetNode() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
+    {
+      {
+        put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
+      }
+    };
+
+    // start mock participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (i == 0)
+      {
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                new ErrTransition(errPartitions));
+      }
+      else
+      {
+        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      }
+      participants[i].syncStart();
+    }
+
+    // verify cluster
+    Map<String, Map<String, String>> errStateMap =
+        new HashMap<String, Map<String, String>>();
+    errStateMap.put("TestDB0", new HashMap<String, String>());
+    errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+    errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                       clusterName,
+                                                                                                       errStateMap)));
+    Assert.assertTrue(result, "Cluster verification fails");
+    
+    // reset resource "TestDB0"
+    participants[0].setTransition(null);
+    String command =
+        "--zkSvr " + ZK_ADDR + " --resetResource " + clusterName
+            + " TestDB0";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                       clusterName)));
+    Assert.assertTrue(result, "Cluster verification fails");
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
new file mode 100644
index 0000000..7df10dc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
@@ -0,0 +1,115 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestRestartParticipant extends ZkIntegrationTestBase
+{
+  public class KillOtherTransition extends MockTransition
+  {
+    final AtomicReference<MockParticipant> _other;
+
+    public KillOtherTransition(MockParticipant other)
+    {
+      _other = new AtomicReference<MockParticipant>(other);
+    }
+
+    @Override
+    public void doTransition(Message message, NotificationContext context)
+    {
+      MockParticipant other = _other.getAndSet(null);
+      if (other != null)
+      {
+        System.err.println("Kill " + other.getInstanceName()
+            + ". Interrupted exceptions are IGNORABLE");
+        other.syncStop();
+      }
+    }
+  }
+
+  @Test()
+  public void testRestartParticipant() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    System.out.println("START testRestartParticipant at "
+        + new Date(System.currentTimeMillis()));
+
+    String clusterName = getShortClassName();
+    MockParticipant[] participants = new MockParticipant[5];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    TestHelper.startController(clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (i == 4)
+      {
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                new KillOtherTransition(participants[0]));
+      }
+      else
+      {
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                null);
+//        Thread.sleep(100);
+      }
+
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // restart
+    Thread.sleep(500);
+    MockParticipant participant =
+        new MockParticipant(participants[0].getClusterName(),
+                            participants[0].getInstanceName(),
+                            ZK_ADDR,
+                            null);
+    System.err.println("Restart " + participant.getInstanceName());
+    participant.syncStart();
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    System.out.println("START testRestartParticipant at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}