You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC

[39/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestResetResource.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestResetResource.java
deleted file mode 100644
index e30ccba..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestResetResource.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.mock.controller.ClusterController;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockParticipant.ErrTransition;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestResetResource extends ZkIntegrationTestBase
-{
-  @Test
-  public void testResetNode() throws Exception
-  {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    final int n = 5;
-
-    System.out.println("START " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-                            "localhost", // participant name prefix
-                            "TestDB", // resource name prefix
-                            1, // resources
-                            10, // partitions per resource
-                            n, // number of nodes
-                            3, // replicas
-                            "MasterSlave",
-                            true); // do rebalance
-
-    // start controller
-    ClusterController controller =
-        new ClusterController(clusterName, "controller_0", ZK_ADDR);
-    controller.syncStart();
-
-    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
-    {
-      {
-        put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
-        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
-      }
-    };
-
-    // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
-    for (int i = 0; i < n; i++)
-    {
-      String instanceName = "localhost_" + (12918 + i);
-
-      if (i == 0)
-      {
-        participants[i] =
-            new MockParticipant(clusterName,
-                                instanceName,
-                                ZK_ADDR,
-                                new ErrTransition(errPartitions));
-      }
-      else
-      {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
-      }
-      participants[i].syncStart();
-    }
-
-    // verify cluster
-    Map<String, Map<String, String>> errStateMap =
-        new HashMap<String, Map<String, String>>();
-    errStateMap.put("TestDB0", new HashMap<String, String>());
-    errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
-    errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                                       clusterName,
-                                                                                                       errStateMap)));
-    Assert.assertTrue(result, "Cluster verification fails");
-    
-    // reset resource "TestDB0"
-    participants[0].setTransition(null);
-    String command =
-        "--zkSvr " + ZK_ADDR + " --resetResource " + clusterName
-            + " TestDB0";
-    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-
-    result =
-        ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                                       clusterName)));
-    Assert.assertTrue(result, "Cluster verification fails");
-    
-    // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
-    controller.syncStop();
-    for (int i = 0; i < 5; i++)
-    {
-      participants[i].syncStop();
-    }
-
-    System.out.println("END " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestRestartParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestRestartParticipant.java
deleted file mode 100644
index 8ea4e0b..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestRestartParticipant.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockTransition;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestRestartParticipant extends ZkIntegrationTestBase
-{
-  public class KillOtherTransition extends MockTransition
-  {
-    final AtomicReference<MockParticipant> _other;
-
-    public KillOtherTransition(MockParticipant other)
-    {
-      _other = new AtomicReference<MockParticipant>(other);
-    }
-
-    @Override
-    public void doTransition(Message message, NotificationContext context)
-    {
-      MockParticipant other = _other.getAndSet(null);
-      if (other != null)
-      {
-        System.err.println("Kill " + other.getInstanceName()
-            + ". Interrupted exceptions are IGNORABLE");
-        other.syncStop();
-      }
-    }
-  }
-
-  @Test()
-  public void testRestartParticipant() throws Exception
-  {
-    // Logger.getRootLogger().setLevel(Level.INFO);
-    System.out.println("START testRestartParticipant at "
-        + new Date(System.currentTimeMillis()));
-
-    String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-                            "localhost", // participant name prefix
-                            "TestDB", // resource name prefix
-                            1, // resources
-                            10, // partitions per resource
-                            5, // number of nodes
-                            3, // replicas
-                            "MasterSlave",
-                            true); // do rebalance
-
-    TestHelper.startController(clusterName,
-                               "controller_0",
-                               ZK_ADDR,
-                               HelixControllerMain.STANDALONE);
-    // start participants
-    for (int i = 0; i < 5; i++)
-    {
-      String instanceName = "localhost_" + (12918 + i);
-
-      if (i == 4)
-      {
-        participants[i] =
-            new MockParticipant(clusterName,
-                                instanceName,
-                                ZK_ADDR,
-                                new KillOtherTransition(participants[0]));
-      }
-      else
-      {
-        participants[i] =
-            new MockParticipant(clusterName,
-                                instanceName,
-                                ZK_ADDR,
-                                null);
-//        Thread.sleep(100);
-      }
-
-      participants[i].syncStart();
-    }
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                 clusterName));
-    Assert.assertTrue(result);
-
-    // restart
-    Thread.sleep(500);
-    MockParticipant participant =
-        new MockParticipant(participants[0].getClusterName(),
-                            participants[0].getInstanceName(),
-                            ZK_ADDR,
-                            null);
-    System.err.println("Restart " + participant.getInstanceName());
-    participant.syncStart();
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                 clusterName));
-    Assert.assertTrue(result);
-
-    System.out.println("START testRestartParticipant at "
-        + new Date(System.currentTimeMillis()));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestSchedulerMessage.java
deleted file mode 100644
index 36eafff..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestSchedulerMessage.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.io.StringWriter;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.monitoring.ZKPathDataDumpTask;
-import com.linkedin.helix.util.HelixUtil;
-
-public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck
-{
-  TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
-  public static class TestMessagingHandlerFactory implements
-      MessageHandlerFactory
-  {
-    public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
-    @Override
-    public MessageHandler createHandler(Message message,
-        NotificationContext context)
-    {
-      return new TestMessagingHandler(message, context);
-    }
-
-    @Override
-    public String getMessageType()
-    {
-      return "TestParticipant";
-    }
-
-    @Override
-    public void reset()
-    {
-      // TODO Auto-generated method stub
-
-    }
-
-    public class TestMessagingHandler extends MessageHandler
-    {
-      public TestMessagingHandler(Message message, NotificationContext context)
-      {
-        super(message, context);
-        // TODO Auto-generated constructor stub
-      }
-
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException
-      {
-        HelixTaskResult result = new HelixTaskResult();
-        result.setSuccess(true);
-        String destName = _message.getTgtName();
-        synchronized (_results)
-        {
-          if (!_results.containsKey(_message.getPartitionName()))
-          {
-            _results.put(_message.getPartitionName(),
-                new ConcurrentSkipListSet<String>());
-          }
-        }
-        _results.get(_message.getPartitionName()).add(destName);
-
-        return result;
-      }
-
-      @Override
-      public void onError(Exception e, ErrorCode code, ErrorType type)
-      {
-        // TODO Auto-generated method stub
-
-      }
-    }
-  }
-
-  @Test()
-  public void TestSchedulerMsg() throws Exception
-  {
-    _factory._results.clear();
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService()
-          .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
-        .randomUUID().toString());
-    schedulerMessage.setTgtSessionId("*");
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-
-    // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), "Template");
-    msg.setTgtSessionId("*");
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_%");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate",
-        msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    helixDataAccessor.createProperty(
-        keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
-        schedulerMessage);
-
-    Thread.sleep(15000);
-
-    Assert.assertEquals(_PARTITIONS, _factory._results.size());
-    PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
-        MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
-    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
-        .getRecord();
-    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
-        .get("MessageCount").equals("" + (_PARTITIONS * 3)));
-    int messageResultCount = 0;
-    for(String key : statusUpdate.getMapFields().keySet())
-    {
-      if(key.startsWith("MessageResult "))
-      {
-        messageResultCount ++;
-      }
-    }
-    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-    
-    int count = 0;
-    for (Set<String> val : _factory._results.values())
-    {
-      count += val.size();
-    }
-    Assert.assertEquals(count, _PARTITIONS * 3);
-    
-    // test the ZkPathDataDumpTask
-    String controllerStatusPath = HelixUtil.getControllerPropertyPath(manager.getClusterName(),
-        PropertyType.STATUSUPDATES_CONTROLLER);
-    List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
-    Assert.assertTrue(subPaths.size() > 0);
-    for(String subPath : subPaths)
-    {
-      String nextPath = controllerStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
-      Assert.assertTrue(subsubPaths.size() > 0);
-    }
-    
-    String instanceStatusPath = HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
-        PropertyType.STATUSUPDATES);
-    
-    subPaths = _zkClient.getChildren(instanceStatusPath);
-    Assert.assertTrue(subPaths.size() > 0);
-    for(String subPath : subPaths)
-    {
-      String nextPath = instanceStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
-      Assert.assertTrue(subsubPaths.size() > 0);
-      for(String subsubPath : subsubPaths)
-      {
-        String nextnextPath = nextPath + "/" + subsubPath;
-        Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
-      }
-    }
-    
-    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
-    dumpTask.run();
-    
-    subPaths = _zkClient.getChildren(controllerStatusPath);
-    Assert.assertTrue(subPaths.size() > 0);
-    for(String subPath : subPaths)
-    {
-      String nextPath = controllerStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
-      Assert.assertTrue(subsubPaths.size() == 0);
-    }
-    
-    subPaths = _zkClient.getChildren(instanceStatusPath);
-    Assert.assertTrue(subPaths.size() > 0);
-    for(String subPath : subPaths)
-    {
-      String nextPath = instanceStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
-      Assert.assertTrue(subsubPaths.size() > 0);
-      for(String subsubPath : subsubPaths)
-      {
-        String nextnextPath = nextPath + "/" + subsubPath;
-        Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
-      }
-    }
-  }
-  
-
-  @Test()
-  public void TestSchedulerMsg2() throws Exception
-  {
-    _factory._results.clear();
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService()
-          .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
-        .randomUUID().toString());
-    schedulerMessage.setTgtSessionId("*");
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-
-    // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), "Template");
-    msg.setTgtSessionId("*");
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_%");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate",
-        msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
-    
-    Criteria cr2 = new Criteria();
-    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
-    cr2.setInstanceName("*");
-    cr2.setSessionSpecific(false);
-    
-    class MockAsyncCallback extends AsyncCallback
-    {
-      Message _message;
-      public MockAsyncCallback()
-      {
-      }
-
-      @Override
-      public void onTimeOut()
-      {
-        // TODO Auto-generated method stub
-
-      }
-
-      @Override
-      public void onReplyMessage(Message message)
-      {
-        _message = message;
-      }
-
-    }
-    MockAsyncCallback callback = new MockAsyncCallback();
-    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
-    String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
-    
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-
-    Assert.assertEquals(_PARTITIONS, _factory._results.size());
-    PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
-        MessageType.SCHEDULER_MSG.toString(), msgId);
-    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
-        .getRecord();
-    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
-        .get("MessageCount").equals("" + (_PARTITIONS * 3)));
-    int messageResultCount = 0;
-    for(String key : statusUpdate.getMapFields().keySet())
-    {
-      if(key.startsWith("MessageResult "))
-      {
-        messageResultCount ++;
-      }
-    }
-    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-    
-    int count = 0;
-    for (Set<String> val : _factory._results.values())
-    {
-      count += val.size();
-    }
-    Assert.assertEquals(count, _PARTITIONS * 3);
-  }
-
-  @Test()
-  public void TestSchedulerZeroMsg() throws Exception
-  {
-    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService()
-          .registerMessageHandlerFactory(factory.getMessageType(), factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
-        .randomUUID().toString());
-    schedulerMessage.setTgtSessionId("*");
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-
-    // Template for the individual message sent to each participant
-    Message msg = new Message(factory.getMessageType(), "Template");
-    msg.setTgtSessionId("*");
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_DOESNOTEXIST");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate",
-        msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    PropertyKey controllerMessageKey = keyBuilder
-        .controllerMessage(schedulerMessage.getMsgId());
-    helixDataAccessor.setProperty(controllerMessageKey, schedulerMessage);
-
-    Thread.sleep(3000);
-
-    Assert.assertEquals(0, factory._results.size());
-    PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
-        MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
-    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
-        .getRecord();
-    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
-        .get("MessageCount").equals("0"));
-    int count = 0;
-    for (Set<String> val : factory._results.values())
-    {
-      count += val.size();
-    }
-    Assert.assertEquals(count, 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestSchemataSM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestSchemataSM.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestSchemataSM.java
deleted file mode 100644
index 5be3dab..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestSchemataSM.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestSchemataSM extends ZkIntegrationTestBase
-{
-  @Test
-  public void testSchemataSM() throws Exception
-  {
-    String testName = "TestSchemataSM";
-    String clusterName = testName;
-
-    MockParticipant[] participants = new MockParticipant[5];
-//    Logger.getRootLogger().setLevel(Level.INFO);
-
-    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
-                                                         // port
-                            "localhost", // participant name prefix
-                            "TestSchemata", // resource name prefix
-                            1, // resources
-                            10, // partitions per resource
-                            5, // number of nodes
-                            1, // replicas
-                            "STORAGE_DEFAULT_SM_SCHEMATA",
-                            true); // do rebalance
-
-    TestHelper.startController(clusterName,
-                               "controller_0",
-                               ZK_ADDR,
-                               HelixControllerMain.STANDALONE);
-    // start participants
-    for (int i = 0; i < 5; i++)
-    {
-      String instanceName = "localhost_" + (12918 + i);
-
-      participants[i] =
-          new MockParticipant(clusterName,
-                              instanceName,
-                              ZK_ADDR,
-                              null);
-      participants[i].syncStart();
-    }
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                 clusterName));
-    Assert.assertTrue(result);
-    for (int i = 0; i < 5; i++)
-    {
-      participants[i].syncStop();
-    }
-
-    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestSessionExpiryInTransition.java
deleted file mode 100644
index 73d7b7d..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestSessionExpiryInTransition.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZkTestHelper;
-import com.linkedin.helix.ZkTestHelper.TestZkHelixManager;
-import com.linkedin.helix.mock.controller.ClusterController;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockTransition;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
-{
-
-  public class SessionExpiryTransition extends MockTransition
-  {
-    private final AtomicBoolean _done = new AtomicBoolean();
-
-    @Override
-    public void doTransition(Message message, NotificationContext context)
-    {
-      TestZkHelixManager manager = (TestZkHelixManager)context.getManager();
-     
-      String instance = message.getTgtName();
-      String partition = message.getPartitionName();
-      if (instance.equals("localhost_12918")
-          && partition.equals("TestDB0_1")  // TestDB0_1 is SLAVE on localhost_12918
-          && _done.getAndSet(true) == false)
-      {
-        try
-        {
-          ZkTestHelper.expireSession(manager.getZkClient());
-        }
-        catch (Exception e)
-        {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-      }
-    }
-  }
- 
-  // TODO: disable test first until we have a clean design in handling zk disconnect/session-expiry
-  // when there is pending messages
-  // @Test
-  public void testSessionExpiryInTransition() throws Exception
-  {
-    Logger.getRootLogger().setLevel(Level.WARN);
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    final String clusterName = className + "_" + methodName;
-
-    System.out.println("START " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-
-    MockParticipant[] participants = new MockParticipant[5];
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-                            "localhost", // participant name prefix
-                            "TestDB", // resource name prefix
-                            1, // resources
-                            10, // partitions per resource
-                            5, // number of nodes
-                            3, // replicas
-                            "MasterSlave",
-                            true); // do rebalance
-
-    // start controller
-    ClusterController controller =
-        new ClusterController(clusterName, "controller_0", ZK_ADDR);
-    controller.syncStart();
-
-    // start participants
-    for (int i = 0; i < 5; i++)
-    {
-      String instanceName = "localhost_" + (12918 + i);
-      TestZkHelixManager manager =
-          new TestZkHelixManager(clusterName,
-                                 instanceName,
-                                 InstanceType.PARTICIPANT,
-                                 ZK_ADDR);
-      participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
-      participants[i].syncStart();
-    }
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                 clusterName));
-    Assert.assertTrue(result);
-
-    // clean up
-    for (int i = 0; i < 5; i++)
-    {
-      participants[i].syncStop();
-    }
-
-    Thread.sleep(2000);
-    controller.syncStop();
-
-    System.out.println("END " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMMain.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMMain.java
deleted file mode 100644
index d205f89..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMMain.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestStandAloneCMMain extends ZkStandAloneCMTestBase
-{
-  private static Logger logger = Logger.getLogger(TestStandAloneCMMain.class);
-
-  @Test()
-  public void testStandAloneCMMain() throws Exception
-  {
-    logger.info("RUN testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
-
-    for (int i = 1; i <= 2; i++)
-    {
-      String controllerName = "controller_" + i;
-      StartCMResult startResult =
-          TestHelper.startController(CLUSTER_NAME,
-                                            controllerName,
-                                            ZK_ADDR,
-                                            HelixControllerMain.STANDALONE);
-      _startCMResultMap.put(controllerName, startResult);
-    }
-
-    stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
-    boolean result = ClusterStateVerifier.verifyByPolling(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    logger.info("STOP testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMSessionExpiry.java
deleted file mode 100644
index d0339ea..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMSessionExpiry.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZkTestHelper;
-import com.linkedin.helix.ZkTestHelper.TestZkHelixManager;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase
-{
-  private static Logger LOG = Logger.getLogger(TestStandAloneCMSessionExpiry.class);
-
-  @Test()
-  public void testStandAloneCMSessionExpiry() throws Exception
-  {
-    // Logger.getRootLogger().setLevel(Level.DEBUG);
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-
-    System.out.println("START " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-
-    TestHelper.setupCluster(clusterName,
-                            ZK_ADDR,
-                            12918,
-                            PARTICIPANT_PREFIX,
-                            "TestDB",
-                            1,
-                            20,
-                            5,
-                            3,
-                            "MasterSlave",
-                            true);
-
-    MockParticipant[] participants = new MockParticipant[5];
-    for (int i = 0; i < 5; i++)
-    {
-      String instanceName = "localhost_" + (12918 + i);
-      TestZkHelixManager manager =
-          new TestZkHelixManager(clusterName,
-                                 instanceName,
-                                 InstanceType.PARTICIPANT,
-                                 ZK_ADDR);
-      participants[i] = new MockParticipant(manager, null);
-      participants[i].syncStart();
-    }
-
-    TestZkHelixManager controller =
-        new TestZkHelixManager(clusterName,
-                               "controller_0",
-                               InstanceType.CONTROLLER,
-                               ZK_ADDR);
-    controller.connect();
-
-    boolean result;
-    result =
-        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                                   clusterName));
-    Assert.assertTrue(result);
-
-    // participant session expiry
-    TestZkHelixManager participantToExpire = (TestZkHelixManager)participants[1].getManager();
-
-    System.out.println("Expire participant session");
-    String oldSessionId = participantToExpire.getSessionId();
-    
-    ZkTestHelper.expireSession(participantToExpire.getZkClient());
-    String newSessionId = participantToExpire.getSessionId();
-    System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
-    Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
-
-    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-    setupTool.addResourceToCluster(clusterName, "TestDB1", 10, "MasterSlave");
-    setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
-
-    result =
-        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                                   clusterName));
-    Assert.assertTrue(result);
-
-    // controller session expiry
-    System.out.println("Expire controller session");
-    oldSessionId = controller.getSessionId();
-    ZkTestHelper.expireSession(controller.getZkClient());
-    newSessionId = controller.getSessionId();
-    System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
-    Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
-
-    setupTool.addResourceToCluster(clusterName, "TestDB2", 8, "MasterSlave");
-    setupTool.rebalanceStorageCluster(clusterName, "TestDB2", 3);
-
-    result =
-        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                                   clusterName));
-    Assert.assertTrue(result);
-
-    // clean up
-    System.out.println("Clean up ...");
-    // Logger.getRootLogger().setLevel(Level.DEBUG);
-    controller.disconnect();
-    Thread.sleep(100);
-    for (int i = 0; i < 5; i++)
-    {
-      participants[i].syncStop();
-    }
-
-    System.out.println("END " + clusterName + " at "
-        + new Date(System.currentTimeMillis()));
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestStateTransitionTimeout.java
deleted file mode 100644
index 9e5f380..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestStateTransitionTimeout.java
+++ /dev/null
@@ -1,210 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.mock.storage.MockJobIntf;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockParticipant.MockMSStateModel;
-import com.linkedin.helix.mock.storage.MockTransition;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.participant.statemachine.StateModelInfo;
-import com.linkedin.helix.participant.statemachine.StateTransitionError;
-import com.linkedin.helix.participant.statemachine.Transition;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
-
-public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase
-{
-  private static Logger                LOG               =
-      Logger.getLogger(TestStateTransitionTimeout.class);
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-    System.out.println("START " + CLASS_NAME + " at "
-        + new Date(System.currentTimeMillis()));
-
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-    String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace))
-    {
-      _zkClient.deleteRecursive(namespace);
-    }
-    _setupTool = new ClusterSetup(ZK_ADDR);
-
-    // setup storage cluster
-    _setupTool.addCluster(CLUSTER_NAME, true);
-    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
-    
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
-    
-    // Set the timeout values
-    IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
-    String stateTransition = "SLAVE" + "-" + "MASTER" + "_" + Message.Attributes.TIMEOUT;
-    idealState.getRecord().setSimpleField(stateTransition, "300");
-    
-    String command = "-zkSvr " + ZK_ADDR + " -addResourceProperty "+ CLUSTER_NAME + " " + TEST_DB + " " + stateTransition + " 200";
-    ClusterSetup.processCommandLineArgs(command.split(" "));
-  }
-  
-  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
-  public static class TimeOutStateModel extends MockParticipant.MockMSStateModel
-  {
-    boolean _sleep = false;
-    StateTransitionError _error;
-    int _errorCallcount = 0;
-    public TimeOutStateModel(MockTransition transition, boolean sleep)
-    {
-      super(transition);
-      _sleep = sleep;
-    }
-
-    @Transition(to="SLAVE",from="OFFLINE")
-    public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
-    {
-      LOG.info("Become SLAVE from OFFLINE");
-      
-    }
-
-    @Transition(to="MASTER",from="SLAVE")
-    public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException
-    {
-      LOG.info("Become MASTER from SLAVE");
-      if (_transition != null && _sleep)
-      {
-        _transition.doTransition(message, context);
-      }
-    }
-
-    @Transition(to="SLAVE",from="MASTER")
-    public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
-    {
-      LOG.info("Become SLAVE from MASTER");
-    }
-
-    @Transition(to="OFFLINE",from="SLAVE")
-    public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
-    {
-      LOG.info("Become OFFLINE from SLAVE");
-      
-    }
-
-    @Transition(to="DROPPED",from="OFFLINE")
-    public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
-    {
-      LOG.info("Become DROPPED from OFFLINE");
-      
-    }
-    
-    public void rollbackOnError(Message message, NotificationContext context,
-        StateTransitionError error)
-    {
-      _error = error;
-      _errorCallcount ++;
-    }
-  }
-  
-  public static class SleepStateModelFactory extends StateModelFactory<TimeOutStateModel>
-  {
-    Set<String> partitionsToSleep = new HashSet<String>();
-    int _sleepTime;
-    
-    public SleepStateModelFactory(int sleepTime)
-    {
-      _sleepTime = sleepTime;
-    }
-    
-    public void setPartitions(Collection<String> partitions)
-    {
-      partitionsToSleep.addAll(partitions);
-    }
-    
-    public void addPartition(String partition)
-    {
-      partitionsToSleep.add(partition);
-    }
-    
-    @Override
-    public TimeOutStateModel createNewStateModel(String stateUnitKey)
-    {
-      return new TimeOutStateModel(new MockParticipant.SleepTransition(_sleepTime), partitionsToSleep.contains(stateUnitKey));
-    }
-  }
-  
-  @Test
-  public void testStateTransitionTimeOut() throws Exception
-  {
-    Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
-    MockParticipant[] participants = new MockParticipant[NODE_NR];
-    IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      SleepStateModelFactory factory = new SleepStateModelFactory(1000);
-      factories.put(instanceName, factory);
-      for(String p : idealState.getPartitionSet())
-      {
-        if(idealState.getPreferenceList(p).get(0).equals(instanceName))
-        {
-          factory.addPartition(p);
-        }
-      }
-      
-      participants[i] = new MockParticipant(factory, CLUSTER_NAME, instanceName, ZK_ADDR, null);
-      participants[i].syncStart();
-    }
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME,
-                                   controllerName,
-                                   ZK_ADDR,
-                                   HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
-                                                                              CLUSTER_NAME));
-    Assert.assertTrue(result);
-    HelixDataAccessor accessor = participants[0].getManager().getHelixDataAccessor();
-    
-    Builder kb = accessor.keyBuilder();
-    ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
-    for(String p : idealState.getPartitionSet())
-    {
-      String idealMaster = idealState.getPreferenceList(p).get(0);
-      Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
-      
-      TimeOutStateModel model = factories.get(idealMaster).getStateModel(p);
-      Assert.assertEquals(model._errorCallcount , 1);
-      Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestStatusUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestStatusUpdate.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestStatusUpdate.java
deleted file mode 100644
index e1f0dd8..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestStatusUpdate.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.testng.Assert;
-
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-public class TestStatusUpdate extends ZkStandAloneCMTestBase
-{
-  // For now write participant StatusUpdates to log4j.
-  // TODO: Need to investigate another data channel to report to controller and re-enable
-  // this test
-  // @Test
-  public void testParticipantStatusUpdates() throws Exception
-  {
-    ZkClient zkClient = new ZkClient(ZkIntegrationTestBase.ZK_ADDR);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
-    Assert.assertNotNull(extViews);
-
-    for (ExternalView extView : extViews)
-    {
-      String resourceName = extView.getResourceName();
-      Set<String> partitionSet = extView.getPartitionSet();
-      for (String partition : partitionSet)
-      {
-        Map<String, String> stateMap = extView.getStateMap(partition);
-        for (String instance : stateMap.keySet())
-        {
-          String state = stateMap.get(instance);
-          StatusUpdateUtil.StatusUpdateContents statusUpdates =
-              StatusUpdateUtil.StatusUpdateContents.getStatusUpdateContents(accessor,
-                                                                            instance,
-                                                                            resourceName,
-                                                                            partition);
-
-          Map<String, StatusUpdateUtil.TaskStatus> taskMessages =
-              statusUpdates.getTaskMessages();
-          List<StatusUpdateUtil.Transition> transitions = statusUpdates.getTransitions();
-          if (state.equals("MASTER"))
-          {
-            Assert.assertEquals(transitions.size() >= 2,
-                                true,
-                                "Invalid number of transitions");
-            StatusUpdateUtil.Transition lastTransition =
-                transitions.get(transitions.size() - 1);
-            StatusUpdateUtil.Transition prevTransition =
-                transitions.get(transitions.size() - 2);
-            Assert.assertEquals(taskMessages.get(lastTransition.getMsgID()),
-                                StatusUpdateUtil.TaskStatus.COMPLETED,
-                                "Incomplete transition");
-            Assert.assertEquals(taskMessages.get(prevTransition.getMsgID()),
-                                StatusUpdateUtil.TaskStatus.COMPLETED,
-                                "Incomplete transition");
-            Assert.assertEquals(lastTransition.getFromState(), "SLAVE", "Invalid State");
-            Assert.assertEquals(lastTransition.getToState(), "MASTER", "Invalid State");
-          }
-          else if (state.equals("SLAVE"))
-          {
-            Assert.assertEquals(transitions.size() >= 1,
-                                true,
-                                "Invalid number of transitions");
-            StatusUpdateUtil.Transition lastTransition =
-                transitions.get(transitions.size() - 1);
-            Assert.assertEquals(lastTransition.getFromState().equals("MASTER")
-                                    || lastTransition.getFromState().equals("OFFLINE"),
-                                true,
-                                "Invalid transition");
-            Assert.assertEquals(lastTransition.getToState(), "SLAVE", "Invalid State");
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestSwapInstance.java
deleted file mode 100644
index accd057..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestSwapInstance.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.linkedin.helix.integration;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZKHelixAdmin;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestSwapInstance extends ZkStandAloneCMTestBase
-{
-  @Test
-  public void TestSwap() throws Exception
-  {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
-    HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
-    _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
-    
-    
-    ZNRecord idealStateOld1 = new ZNRecord("TestDB");
-    ZNRecord idealStateOld2 = new ZNRecord("MyDB");
-    
-    IdealState is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
-    idealStateOld1.merge(is1.getRecord());
-    
-
-    IdealState is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
-    idealStateOld2.merge(is2.getRecord());
-    
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
-    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
-
-    boolean result = ClusterStateVerifier.verifyByPolling(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-    
-    String instanceName2 = PARTICIPANT_PREFIX + "_" + (START_PORT + 444);
-    _setupTool.addInstanceToCluster(CLUSTER_NAME, instanceName2);
-    
-    boolean exception = false;
-    try
-    {
-      _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
-    }
-    catch(Exception e)
-    {
-      exception = true;
-    }
-    Assert.assertTrue(exception);
-    
-    _startCMResultMap.get(instanceName)._manager.disconnect();
-    _startCMResultMap.get(instanceName)._thread.interrupt();
-    Thread.sleep(1000);
-    
-    exception = false;
-    try
-    {
-      _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
-    }
-    catch(Exception e)
-    {
-      e.printStackTrace();
-      exception = true;
-    }
-    Assert.assertFalse(exception);
-    StartCMResult result2 =
-        TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
-    _startCMResultMap.put(instanceName2, result2);
-    
-    result = ClusterStateVerifier.verifyByPolling(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-    
-    is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
-    
-    is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
-    
-    for(String key : idealStateOld1.getMapFields().keySet())
-    {
-      for(String host : idealStateOld1.getMapField(key).keySet())
-      {
-        if(host.equals(instanceName))
-        {
-          Assert.assertTrue(
-          idealStateOld1.getMapField(key).get(instanceName).equals(
-          is1.getRecord().getMapField(key).get(instanceName2)));
-        }
-        else
-        {
-          Assert.assertTrue(
-              idealStateOld1.getMapField(key).get(host).equals(
-              is1.getRecord().getMapField(key).get(host)));
-        }
-      }
-    }
-    
-    for(String key : idealStateOld1.getListFields().keySet())
-    {
-      Assert.assertEquals(idealStateOld1.getListField(key).size() , is1.getRecord().getListField(key).size());
-      for(int i = 0; i < idealStateOld1.getListField(key).size(); i++)
-      {
-        String host = idealStateOld1.getListField(key).get(i);
-        String newHost = is1.getRecord().getListField(key).get(i);
-        if(host.equals(instanceName))
-        {
-          Assert.assertTrue(
-              newHost.equals(instanceName2));
-        }
-        else
-        {
-          //System.out.println(key + " " + i+ " " + host + " "+newHost);
-          //System.out.println(idealStateOld1.getListField(key));
-          //System.out.println(is1.getRecord().getListField(key));
-          
-          Assert.assertTrue(host.equals(newHost));
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestZNRecordQueryProcessorWithZK.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestZNRecordQueryProcessorWithZK.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestZNRecordQueryProcessorWithZK.java
deleted file mode 100644
index 1bd3387..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestZNRecordQueryProcessorWithZK.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.josql.DataAccessorBasedTupleReader;
-import com.linkedin.helix.josql.JsqlQueryListProcessor;
-import com.linkedin.helix.josql.ZNRecordQueryProcessor;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.IdealState;
-
-public class TestZNRecordQueryProcessorWithZK extends ZkStandAloneCMTestBase
-{
-  //@Test
-  public void testClusterQuery() throws Exception
-  {
-    HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
-
-    DataAccessorBasedTupleReader tupleReader = new DataAccessorBasedTupleReader(manager.getHelixDataAccessor(), manager.getClusterName());
-
-    ZNRecordQueryProcessor queryProcessor = new ZNRecordQueryProcessor();
-
-    String partition = TEST_DB + "_4";
-
-    String sql = "select T2.instance, T1.listIndex, T2.id from " +
-    "explodeList(IDEALSTATES," + partition + ") as T1 join " +
-     "LIVEINSTANCES as T2 using (T1.listVal, T2.id)";
-    List<ZNRecord> result = queryProcessor.execute(sql, tupleReader);
-    System.out.println(result);
-    Assert.assertEquals(result.size(), 3);
-  }
-
-
-  //@Test
-  public void testWildcardExpansion() throws Exception
-  {
-    HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
-    List<String> instancesInCluster = manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName());
-    for(String instance : instancesInCluster)
-    {
-      ZNRecord record = new ZNRecord("scnTable");
-      record.setSimpleField("k1", "v1");
-      Builder kb = accessor.keyBuilder();
-      accessor.setProperty(kb.healthReport(instance, "scnTable"), new HealthStat(record));
-    }
-
-    String path = "INSTANCES/*/HEALTHREPORT/scnTable";
-    DataAccessorBasedTupleReader tupleReader = new DataAccessorBasedTupleReader(accessor, manager.getClusterName());
-    List<ZNRecord> tuples = tupleReader.get(path);
-    System.out.println(tuples);
-    Assert.assertEquals(tuples.size(), instancesInCluster.size());
-  }
-
-  @Test
-  public void testNewMasterSelection() throws Exception
-  {
-    HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
-    IdealState resourceIdealState = manager.getClusterManagmentTool().getResourceIdealState(manager.getClusterName(), TEST_DB);
-    Map<String, Map<String, Integer>> scnMap = new HashMap<String, Map<String, Integer>>();
-
-    List<String> instancesInCluster = manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName());
-    List<String> instances = new ArrayList<String>();
-    instances.addAll(instancesInCluster);
-    //instances.add(instancesInCluster.get(0));
-    instances.add("deadInstance");
-    System.out.println(instances.size());
-
-    int seq = 50;
-    for(String instance : instances)
-    {
-      ZNRecord scnRecord = new ZNRecord("scnTable");
-      scnRecord.setSimpleField("instance", instance);
-      for(int i = 0; i < _PARTITIONS; i++)
-      {
-        Map<String, String> scnDetails = new HashMap<String, String>();
-
-        String partition = TEST_DB + "_" + i;
-        List<String> idealStatePrefList =
-            resourceIdealState.getPreferenceList(partition);
-        String idealStateMaster = idealStatePrefList.get(0);
-
-        scnDetails.put("gen", "4");
-
-        if (instance.equals(idealStateMaster))
-        {
-          scnDetails.put("seq", "" + (seq - 25));
-        }
-        else
-        {
-          scnDetails.put("seq", "" + seq++);
-        }
-        scnRecord.setMapField(partition, scnDetails);
-      }
-
-      Builder kb = accessor.keyBuilder();
-      accessor.setProperty(kb.healthReport(instance, "scnTable"), new HealthStat(scnRecord));
-    }
-
-    ZNRecordQueryProcessor processor = new ZNRecordQueryProcessor();
-    DataAccessorBasedTupleReader tupleReader = new DataAccessorBasedTupleReader(accessor, manager.getClusterName());
-
-    String scnTableQuery = "SELECT T1.instance as instance, T1.mapField as partition, T1.gen as gen, T1.seq as seq " +
-            "FROM explodeMap(`INSTANCES/*/HEALTHREPORT/scnTable`) AS T1" +
-            " JOIN LIVEINSTANCES as T2 using (T1.instance, T2.id)";
-    List<ZNRecord> scnTable = processor.execute(scnTableQuery, tupleReader);
-    tupleReader.setTempTable("scnTable", scnTable);
-
-    String rankQuery = "SELECT instance, partition, gen, seq, T1.listIndex AS instanceRank " +
-            " FROM scnTable JOIN explodeList(`IDEALSTATES/" + TEST_DB + "`) AS T1 " +
-                    "USING (scnTable.instance, T1.listVal) WHERE scnTable.partition=T1.listField";
-    List<ZNRecord> rankTable = processor.execute(rankQuery, tupleReader);
-    System.out.println(rankTable.size());
-    tupleReader.setTempTable("rankTable", rankTable);
-
-    String masterSelectionQuery = "SELECT instance, partition, instanceRank, gen, (T.maxSeq-seq) AS seqDiff, seq FROM rankTable JOIN " +
-            " (SELECT partition, max(to_number(seq)) AS maxSeq FROM rankTable GROUP BY partition) AS T USING(rankTable.partition, T.partition) " +
-            " WHERE to_number(seqDiff) < 10 " +
-            " ORDER BY partition, to_number(gen) desc, to_number(instanceRank), to_number(seqDiff)";
-
-    List<ZNRecord> masterSelectionTable = processor.execute(masterSelectionQuery, tupleReader);
-    System.out.println(masterSelectionTable.size());
-    for(ZNRecord record : masterSelectionTable)
-    {
-      System.out.println(record);
-    }
-
-    List<String> combinedQueryStringList = new ArrayList<String>();
-    combinedQueryStringList.add(scnTableQuery + JsqlQueryListProcessor.SEPARATOR+"scnTable");
-    combinedQueryStringList.add(rankQuery + JsqlQueryListProcessor.SEPARATOR+"rankTable");
-    combinedQueryStringList.add(masterSelectionQuery);
-    System.out.println();
-    List<ZNRecord> masterSelectionTable2 = JsqlQueryListProcessor.executeQueryList(manager.getHelixDataAccessor(), manager.getClusterName(), combinedQueryStringList);
-    for(ZNRecord record : masterSelectionTable2)
-    {
-      System.out.println(record);
-    }
-    Assert.assertEquals(masterSelectionTable2.size(), masterSelectionTable.size());
-    for(int i = 0;i<masterSelectionTable2.size(); i++)
-    {
-      Assert.assertTrue(masterSelectionTable2.get(i).equals(masterSelectionTable.get(i)));
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/com/linkedin/helix/integration/ZkIntegrationTestBase.java
deleted file mode 100644
index b13ad83..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/ZkIntegrationTestBase.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Map;
-import java.util.logging.Level;
-
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeSuite;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.util.ZKClientPool;
-
-public class ZkIntegrationTestBase
-{
-  private static Logger         LOG                       =
-                                                              Logger.getLogger(ZkIntegrationTestBase.class);
-
-  protected static ZkServer     _zkServer;
-  protected static ZkClient     _gZkClient;
-  protected static ClusterSetup _gSetupTool;
-
-  public static final String    ZK_ADDR                   = "localhost:2183";
-  protected static final String CLUSTER_PREFIX            = "CLUSTER";
-  protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
-
-  protected final String        CONTROLLER_PREFIX         = "controller";
-  protected final String        PARTICIPANT_PREFIX        = "localhost";
-
-  @BeforeSuite
-  public void beforeSuite() throws Exception
-  {
-    // TODO: use logging.properties file to config java.util.logging.Logger levels
-    java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
-    topJavaLogger.setLevel(Level.WARNING);
-    
-    _zkServer = TestHelper.startZkSever(ZK_ADDR);
-    AssertJUnit.assertTrue(_zkServer != null);
-    ZKClientPool.reset();
-
-    _gZkClient = new ZkClient(ZK_ADDR);
-    _gZkClient.setZkSerializer(new ZNRecordSerializer());
-    _gSetupTool = new ClusterSetup(ZK_ADDR);
-  }
-
-  @AfterSuite
-  public void afterSuite()
-  {
-    ZKClientPool.reset();
-    TestHelper.stopZkServer(_zkServer);
-    _gZkClient.close();
-  }
-
-  protected String getShortClassName()
-  {
-    String className = this.getClass().getName();
-    return className.substring(className.lastIndexOf('.') + 1);
-  }
-
-  protected String getCurrentLeader(ZkClient zkClient, String clusterName)
-  {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
-    if (leader == null)
-    {
-      return null;
-    }
-    return leader.getInstanceName();
-  }
-
-  /**
-   * Stop current leader and returns the new leader
-   * 
-   * @param zkClient
-   * @param clusterName
-   * @param startCMResultMap
-   * @return
-   */
-  protected String stopCurrentLeader(ZkClient zkClient,
-                                     String clusterName,
-                                     Map<String, StartCMResult> startCMResultMap)
-  {
-    String leader = getCurrentLeader(zkClient, clusterName);
-    Assert.assertTrue(leader != null);
-    System.out.println("stop leader: " + leader + " in " + clusterName);
-    Assert.assertTrue(leader != null);
-
-    StartCMResult result = startCMResultMap.remove(leader);
-    Assert.assertTrue(result._manager != null);
-    result._manager.disconnect();
-
-    Assert.assertTrue(result._thread != null);
-    result._thread.interrupt();
-
-    boolean isNewLeaderElected = false;
-    String newLeader = null;
-    try
-    {
-      for (int i = 0; i < 5; i++)
-      {
-        Thread.sleep(1000);
-        newLeader = getCurrentLeader(zkClient, clusterName);
-        if (!newLeader.equals(leader))
-        {
-          isNewLeaderElected = true;
-          System.out.println("new leader elected: " + newLeader + " in " + clusterName);
-          break;
-        }
-      }
-    }
-    catch (InterruptedException e)
-    {
-      e.printStackTrace();
-    }
-    if (isNewLeaderElected == false)
-    {
-      System.out.println("fail to elect a new leader in " + clusterName);
-    }
-    AssertJUnit.assertTrue(isNewLeaderElected);
-    return newLeader;
-  }
-
-  protected void enableHealthCheck(String clusterName)
-  {
-    ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName).build();
-    new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBase.java
deleted file mode 100644
index 488f205..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBase.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
-
-/**
- * 
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
-{
-  private static Logger                LOG               =
-                                                             Logger.getLogger(ZkStandAloneCMTestBase.class);
-
-  protected static final int           NODE_NR           = 5;
-  protected static final int           START_PORT        = 12918;
-  protected static final String        STATE_MODEL       = "MasterSlave";
-  protected static final String        TEST_DB           = "TestDB";
-  protected static final int           _PARTITIONS       = 20;
-
-  protected ClusterSetup               _setupTool        = null;
-  protected final String               CLASS_NAME        = getShortClassName();
-  protected final String               CLUSTER_NAME      = CLUSTER_PREFIX + "_"
-                                                             + CLASS_NAME;
-
-  protected Map<String, StartCMResult> _startCMResultMap =
-                                                             new HashMap<String, StartCMResult>();
-  protected ZkClient                   _zkClient;
-  
-  int _replica = 3;
-
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-//    Logger.getRootLogger().setLevel(Level.INFO);
-    System.out.println("START " + CLASS_NAME + " at "
-        + new Date(System.currentTimeMillis()));
-
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-    String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace))
-    {
-      _zkClient.deleteRecursive(namespace);
-    }
-    _setupTool = new ClusterSetup(ZK_ADDR);
-
-    // setup storage cluster
-    _setupTool.addCluster(CLUSTER_NAME, true);
-    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
-
-    // start dummy participants
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null)
-      {
-        LOG.error("fail to start particpant:" + instanceName
-            + "(participant with same name already exists)");
-      }
-      else
-      {
-        StartCMResult result =
-            TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
-        _startCMResultMap.put(instanceName, result);
-      }
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME,
-                                   controllerName,
-                                   ZK_ADDR,
-                                   HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
-    
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
-                                                                              CLUSTER_NAME));
-
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                 CLUSTER_NAME));
-    Assert.assertTrue(result);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception
-  {
-    /**
-     * shutdown order: 1) disconnect the controller 2) disconnect participants
-     */
-   
-    StartCMResult result;
-    Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext())
-    {
-      String instanceName = it.next().getKey();
-      if (instanceName.startsWith(CONTROLLER_PREFIX))
-      {
-        result = _startCMResultMap.get(instanceName);
-        result._manager.disconnect();
-        result._thread.interrupt();
-        it.remove();
-      }
-    }
-
-    Thread.sleep(100);
-    it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext())
-    {
-      String instanceName = it.next().getKey();
-      result = _startCMResultMap.get(instanceName);
-      result._manager.disconnect();
-      result._thread.interrupt();
-      it.remove();
-    }
-
-    _zkClient.close();
-    // logger.info("END at " + new Date(System.currentTimeMillis()));
-    System.out.println("END " + CLASS_NAME + " at "
-        + new Date(System.currentTimeMillis()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
deleted file mode 100644
index 0b4b0fe..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.List;
-
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.controller.restlet.ZKPropertyTransferServer;
-import com.linkedin.helix.controller.restlet.ZkPropertyTransferClient;
-import com.linkedin.helix.model.StatusUpdate;
-/**
- * 
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase
-{
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-    ZKPropertyTransferServer.PERIOD = 500;
-    ZkPropertyTransferClient.SEND_PERIOD = 500;
-    ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
-    super.beforeClass();
-    
-    Thread.sleep(1000);
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null)
-      {
-        HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
-        Builder kb = accessor.keyBuilder();
-        List<StatusUpdate> statusUpdates = accessor.getChildValues(
-            kb.stateTransitionStatus(instanceName, _startCMResultMap.get(instanceName)._manager.getSessionId(),
-                TEST_DB));
-        Assert.assertTrue(statusUpdates.size() > 0);
-        for(StatusUpdate update : statusUpdates)
-        {
-          Assert.assertTrue(update.getRecord().getSimpleField(ZkPropertyTransferClient.USE_PROPERTYTRANSFER).equals("true"));
-          Assert.assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
-        }
-      }
-    }
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception
-  {
-    super.afterClass();
-    ZKPropertyTransferServer.getInstance().shutdown();
-    ZKPropertyTransferServer.getInstance().reset();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/josql/TestClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/josql/TestClusterJosqlQueryProcessor.java b/helix-core/src/test/java/com/linkedin/helix/josql/TestClusterJosqlQueryProcessor.java
deleted file mode 100644
index 1dacc68..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/josql/TestClusterJosqlQueryProcessor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.josql;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.josql.ZNRecordJosqlFunctionHandler;
-import com.linkedin.helix.josql.ZNRecordRow;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
-
-public class TestClusterJosqlQueryProcessor
-{
-  @Test (groups = {"unitTest"})
-  public void queryClusterDataSample() 
-  {
-    List<ZNRecord> liveInstances = new ArrayList<ZNRecord>();
-    Map<String, ZNRecord> liveInstanceMap = new HashMap<String, ZNRecord>();
-    List<String> instances = new ArrayList<String>();
-    for(int i = 0;i<5; i++)
-    {
-      String instance = "localhost_"+(12918+i);
-      instances.add(instance);
-      ZNRecord metaData = new ZNRecord(instance);
-      metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
-          UUID.randomUUID().toString());
-      metaData.setSimpleField("SCN", "" + (10-i));
-      liveInstances.add(metaData);
-      liveInstanceMap.put(instance, metaData);
-    }
-    
-    //liveInstances.remove(0);
-    ZNRecord externalView = IdealStateCalculatorForStorageNode.calculateIdealState(
-        instances, 21, 3, "TestDB", "MASTER", "SLAVE");
-    
-    
-    Criteria criteria = new Criteria();
-    criteria.setInstanceName("%");
-    criteria.setResource("TestDB");
-    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    criteria.setPartition("TestDB_2%");
-    criteria.setPartitionState("SLAVE");
-    
-    String josql = 
-      " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN') AS 'SCN'" +
-      " FROM com.linkedin.helix.josql.ZNRecordRow " + 
-      " WHERE mapKey LIKE 'TestDB_2%' " +
-        " AND mapSubKey LIKE '%' " +
-        " AND mapValue LIKE 'SLAVE' " +
-        " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) " +
-        " ORDER BY parseInt(getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN'))";
-    
-    Query josqlQuery = new Query();
-    josqlQuery.setVariable("LIVEINSTANCES", liveInstances);
-    josqlQuery.setVariable("LIVEINSTANCESMAP", liveInstanceMap);
-    josqlQuery.addFunctionHandler(new ZNRecordRow());
-    josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
-    josqlQuery.addFunctionHandler(new Integer(0));
-    try
-    {
-      josqlQuery.parse(josql);
-      QueryResults qr = josqlQuery.execute(ZNRecordRow.convertMapFields(externalView));
-      @SuppressWarnings({ "unchecked", "unused" })
-      List<Object> result = qr.getResults();
-      
-    } 
-    catch (QueryParseException e)
-    {
-      e.printStackTrace();
-    } catch (QueryExecutionException e)
-    {
-      e.printStackTrace();
-    }
-
-  }
-}