You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC

[42/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkClusterManager.java
deleted file mode 100644
index fa6e18e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkClusterManager.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.manager.MockListener;
-import com.linkedin.helix.store.PropertyStore;
-
-public class TestZkClusterManager extends ZkUnitTestBase
-{
-  final String className = getShortClassName();
-
-  @Test()
-  public void testController() throws Exception
-  {
-    System.out.println("START " + className + ".testController() at " + new Date(System.currentTimeMillis()));
-    final String clusterName = CLUSTER_PREFIX + "_" + className + "_controller";
-
-    // basic test
-    if (_gZkClient.exists("/" + clusterName))
-    {
-      _gZkClient.deleteRecursive("/" + clusterName);
-    }
-
-    ZKHelixManager controller = new ZKHelixManager(clusterName, null,
-                                                   InstanceType.CONTROLLER,
-                                                   ZK_ADDR);
-    try
-    {
-      controller.connect();
-      Assert.fail("Should throw HelixException if initial cluster structure is not setup");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-
-    TestHelper.setupEmptyCluster(_gZkClient, clusterName);
-
-    controller.connect();
-    AssertJUnit.assertTrue(controller.isConnected());
-    controller.connect();
-    AssertJUnit.assertTrue(controller.isConnected());
-
-    MockListener listener = new MockListener();
-    listener.reset();
-
-    try
-    {
-      controller.addControllerListener(null);
-      Assert.fail("Should throw HelixException");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-
-    controller.addControllerListener(listener);
-    AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
-    controller.removeListener(listener);
-
-    PropertyStore<ZNRecord> store = controller.getPropertyStore();
-    ZNRecord record = new ZNRecord("node_1");
-    store.setProperty("node_1", record);
-    record = store.getProperty("node_1");
-    AssertJUnit.assertEquals("node_1", record.getId());
-
-    controller.getMessagingService();
-    controller.getHealthReportCollector();
-    controller.getClusterManagmentTool();
-
-    controller.handleNewSession();
-    controller.disconnect();
-    AssertJUnit.assertFalse(controller.isConnected());
-
-    System.out.println("END " + className + ".testController() at " + new Date(System.currentTimeMillis()));
-  }
-
-  @Test()
-  public void testAdministrator() throws Exception
-  {
-    System.out.println("START " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
-    final String clusterName = CLUSTER_PREFIX + "_" + className + "_admin";
-
-    // basic test
-    if (_gZkClient.exists("/" + clusterName))
-    {
-      _gZkClient.deleteRecursive("/" + clusterName);
-    }
-
-    ZKHelixManager admin = new ZKHelixManager(clusterName, null,
-                                              InstanceType.ADMINISTRATOR,
-                                              ZK_ADDR);
-
-    TestHelper.setupEmptyCluster(_gZkClient, clusterName);
-
-    admin.connect();
-    AssertJUnit.assertTrue(admin.isConnected());
-
-    HelixAdmin adminTool = admin.getClusterManagmentTool();
-    ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName)
-        .forResource("testResource").forPartition("testPartition").build();
-    Map<String, String> properties = new HashMap<String, String>();
-    properties.put("pKey1", "pValue1");
-    properties.put("pKey2", "pValue2");
-    adminTool.setConfig(scope, properties);
-
-    properties = adminTool.getConfig(scope, TestHelper.setOf("pKey1", "pKey2"));
-    Assert.assertEquals(properties.size(), 2);
-    Assert.assertEquals(properties.get("pKey1"), "pValue1");
-    Assert.assertEquals(properties.get("pKey2"), "pValue2");
-
-    admin.disconnect();
-    AssertJUnit.assertFalse(admin.isConnected());
-
-    System.out.println("END " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkHelixAdmin.java
deleted file mode 100644
index 1bd2d27..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkHelixAdmin.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.StateModelDefinition;
-
-public class TestZkHelixAdmin extends ZkUnitTestBase
-{
-  @Test()
-  public void testZkHelixAdmin()
-  {
-    System.out.println("START testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
-
-    final String clusterName = getShortClassName();
-    if (_gZkClient.exists("/" + clusterName))
-    {
-      _gZkClient.deleteRecursive("/" + clusterName);
-    }
-
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
-    tool.addCluster(clusterName, true);
-    Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
-    tool.addCluster(clusterName, true);
-    Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
-
-    List<String> list = tool.getClusters();
-    AssertJUnit.assertTrue(list.size() > 0);
-
-    try
-    {
-      tool.addCluster(clusterName, false);
-      Assert.fail("should fail if add an already existing cluster");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-
-    InstanceConfig config = new InstanceConfig("host1_9999");
-    config.setHostName("host1");
-    config.setPort("9999");
-    tool.addInstance(clusterName, config);
-    tool.enableInstance(clusterName, "host1_9999", true);
-    String path = PropertyPathConfig.getPath(PropertyType.INSTANCES,
-        clusterName, "host1_9999");
-    AssertJUnit.assertTrue(_gZkClient.exists(path));
-
-    try
-    {
-      tool.addInstance(clusterName, config);
-      Assert.fail("should fail if add an alredy-existing instance");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-    config = tool.getInstanceConfig(clusterName, "host1_9999");
-    AssertJUnit.assertEquals(config.getId(), "host1_9999");
-
-    tool.dropInstance(clusterName, config);
-    try
-    {
-      tool.getInstanceConfig(clusterName, "host1_9999");
-      Assert.fail("should fail if get a non-existent instance");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-    try
-    {
-      tool.dropInstance(clusterName, config);
-      Assert.fail("should fail if drop on a non-existent instance");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-    try
-    {
-      tool.enableInstance(clusterName, "host1_9999", false);
-      Assert.fail("should fail if enable a non-existent instance");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-    ZNRecord stateModelRecord = new ZNRecord("id1");
-    try
-    {
-      tool.addStateModelDef(clusterName, "id1", new StateModelDefinition(
-          stateModelRecord));
-      path = PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
-          clusterName, "id1");
-      AssertJUnit.assertTrue(_gZkClient.exists(path));
-      Assert.fail("should fail");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-    try
-    {
-      tool.addStateModelDef(clusterName, "id1", new StateModelDefinition(
-          stateModelRecord));
-      Assert.fail("should fail if add an already-existing state model");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-    list = tool.getStateModelDefs(clusterName);
-    AssertJUnit.assertEquals(list.size(), 0);
-
-    try
-    {
-      tool.addResource(clusterName, "resource", 10,
-          "nonexistStateModelDef");
-      Assert
-          .fail("should fail if add a resource without an existing state model");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-    try
-    {
-      tool.addResource(clusterName, "resource", 10, "id1");
-      Assert.fail("should fail");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-    list = tool.getResourcesInCluster(clusterName);
-    AssertJUnit.assertEquals(list.size(), 0);
-    try
-    {
-      tool.addResource(clusterName, "resource", 10, "id1");
-      Assert.fail("should fail");
-    } catch (HelixException e)
-    {
-      // OK
-    }
-    list = tool.getResourcesInCluster(clusterName);
-    AssertJUnit.assertEquals(list.size(), 0);
-
-    ExternalView resourceExternalView = tool.getResourceExternalView(
-        clusterName, "resource");
-    AssertJUnit.assertNull(resourceExternalView);
-
-    // test config support
-    ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName)
-        .forResource("testResource").forPartition("testPartition").build();
-    Map<String, String> properties = new HashMap<String, String>();
-    properties.put("pKey1", "pValue1");
-    properties.put("pKey2", "pValue2");
-    
-    // make sure calling set/getConfig() many times will not drain zkClient resources
-    // int nbOfZkClients = ZkClient.getNumberOfConnections();
-    for (int i = 0; i < 100; i++)
-    {
-      tool.setConfig(scope, properties);
-      Map<String, String> newProperties = tool.getConfig(scope, properties.keySet());
-      Assert.assertEquals(newProperties.size(), 2);
-      Assert.assertEquals(newProperties.get("pKey1"), "pValue1");
-      Assert.assertEquals(newProperties.get("pKey2"), "pValue2");
-    }
-    // Assert.assertTrue(ZkClient.getNumberOfConnections() - nbOfZkClients < 5);
-
-    System.out.println("END testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallback.java b/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallback.java
deleted file mode 100644
index 31c5d25..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallback.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.model.Message;
-
-public class TestAsyncCallback
-{
-  class AsyncCallbackSample extends AsyncCallback
-  {
-    int _onTimeOutCalled = 0;
-    int _onReplyMessageCalled = 0;
-    @Override
-    public void onTimeOut()
-    {
-      // TODO Auto-generated method stub
-      _onTimeOutCalled ++;
-    }
-
-    @Override
-    public void onReplyMessage(Message message)
-    {
-      _onReplyMessageCalled++;
-    }
-  }
-
-  @Test()
-  public void testAsyncCallback() throws Exception
-  {
-    System.out.println("START TestAsyncCallback at " + new Date(System.currentTimeMillis()));
-    AsyncCallbackSample callback = new AsyncCallbackSample();
-    AssertJUnit.assertFalse(callback.isInterrupted());
-    AssertJUnit.assertFalse(callback.isTimedOut());
-    AssertJUnit.assertTrue(callback.getMessageReplied().size() == 0);
-
-    int nMsgs = 5;
-
-    List<Message> messageSent = new ArrayList<Message>();
-    for(int i = 0;i < nMsgs; i++)
-    {
-      messageSent.add(new Message("Test", UUID.randomUUID().toString()));
-    }
-
-    callback.setMessagesSent(messageSent);
-
-    for(int i = 0;i < nMsgs; i++)
-    {
-      AssertJUnit.assertFalse(callback.isDone());
-      callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
-    }
-    AssertJUnit.assertTrue(callback.isDone());
-
-    AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
-
-    sleep(50);
-    callback = new AsyncCallbackSample();
-    callback.setMessagesSent(messageSent);
-    callback.setTimeout(1000);
-    sleep(50);
-    callback.startTimer();
-    AssertJUnit.assertFalse(callback.isTimedOut());
-    for(int i = 0;i < nMsgs - 1; i++)
-    {
-      sleep(50);
-      AssertJUnit.assertFalse(callback.isDone());
-      AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
-      callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
-    }
-    sleep(1000);
-    AssertJUnit.assertTrue(callback.isTimedOut());
-    AssertJUnit.assertTrue(callback._onTimeOutCalled == 1 );
-    AssertJUnit.assertFalse(callback.isDone());
-
-    callback = new AsyncCallbackSample();
-    callback.setMessagesSent(messageSent);
-    callback.setTimeout(1000);
-    callback.startTimer();
-    sleep(50);
-    AssertJUnit.assertFalse(callback.isTimedOut());
-    for(int i = 0;i < nMsgs; i++)
-    {
-      AssertJUnit.assertFalse(callback.isDone());
-      sleep(50);
-      AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
-      callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
-    }
-    AssertJUnit.assertTrue(callback.isDone());
-    sleep(1300);
-    AssertJUnit.assertFalse(callback.isTimedOut());
-    AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
-    System.out.println("END TestAsyncCallback at " + new Date(System.currentTimeMillis()));
-  }
-
-  void sleep(int time)
-  {
-    try
-    {
-      Thread.sleep(time);
-    }
-    catch(Exception e)
-    {
-      System.out.println(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallbackSvc.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallbackSvc.java b/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallbackSvc.java
deleted file mode 100644
index e44e707..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallbackSvc.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import org.testng.annotations.Test;
-import org.testng.AssertJUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.Mocks;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.AsyncCallbackService;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.TestHelixTaskExecutor.MockClusterManager;
-import com.linkedin.helix.model.Message;
-
-public class TestAsyncCallbackSvc
-{
-  class MockHelixManager extends Mocks.MockManager
-  {
-    public String getSessionId()
-    {
-      return "123";
-    }
-  }
-  
-  class TestAsyncCallback extends AsyncCallback
-  {
-    HashSet<String> _repliedMessageId = new HashSet<String>();
-    @Override
-    public void onTimeOut()
-    {
-      // TODO Auto-generated method stub
-      
-    }
-
-    @Override
-    public void onReplyMessage(Message message)
-    {
-      // TODO Auto-generated method stub
-      _repliedMessageId.add(message.getMsgId());
-    }
-    
-  }
-  @Test(groups =
-  { "unitTest" })
-  public void testAsyncCallbackSvc() throws Exception
-  {
-    AsyncCallbackService svc = new AsyncCallbackService();
-    HelixManager manager = new MockHelixManager();
-    NotificationContext changeContext = new NotificationContext(manager);
-    
-    Message msg = new Message(svc.getMessageType(), UUID.randomUUID().toString());
-    msg.setTgtSessionId(manager.getSessionId());
-    try
-    {
-      MessageHandler aHandler = svc.createHandler(msg, changeContext);
-    }
-    catch(HelixException e)
-    {
-      AssertJUnit.assertTrue(e.getMessage().indexOf(msg.getMsgId())!= -1);
-    }
-    Message msg2 = new Message("RandomType", UUID.randomUUID().toString());
-    msg2.setTgtSessionId(manager.getSessionId());
-    try
-    {
-      MessageHandler aHandler = svc.createHandler(msg2, changeContext);
-    }
-    catch(HelixException e)
-    {
-      AssertJUnit.assertTrue(e.getMessage().indexOf(msg2.getMsgId())!= -1);
-    }
-    Message msg3 = new Message(svc.getMessageType(), UUID.randomUUID().toString());
-    msg3.setTgtSessionId(manager.getSessionId());
-    msg3.setCorrelationId("wfwegw");
-    try
-    {
-      MessageHandler aHandler = svc.createHandler(msg3, changeContext);
-    }
-    catch(HelixException e)
-    {
-      AssertJUnit.assertTrue(e.getMessage().indexOf(msg3.getMsgId())!= -1);
-    }
-    
-    TestAsyncCallback callback = new TestAsyncCallback();
-    String corrId = UUID.randomUUID().toString();
-    svc.registerAsyncCallback(corrId, new TestAsyncCallback());
-    svc.registerAsyncCallback(corrId, callback);
-    
-    List<Message> msgSent = new ArrayList<Message>();
-    msgSent.add(new Message("Test", UUID.randomUUID().toString()));
-    callback.setMessagesSent(msgSent);
-    
-    msg = new Message(svc.getMessageType(), UUID.randomUUID().toString());
-    msg.setTgtSessionId("*");
-    msg.setCorrelationId(corrId);
-    
-    MessageHandler aHandler = svc.createHandler(msg, changeContext);
-    Map<String, String> resultMap = new HashMap<String, String>();
-    aHandler.handleMessage();
-    
-    AssertJUnit.assertTrue(callback.isDone());
-    AssertJUnit.assertTrue(callback._repliedMessageId.contains(msg.getMsgId()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/TestDefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/com/linkedin/helix/messaging/TestDefaultMessagingService.java
deleted file mode 100644
index 16bfc03..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/TestDefaultMessagingService.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.Mocks;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.messaging.DefaultMessagingService;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorType;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
-
-public class TestDefaultMessagingService
-{
-  class MockHelixManager extends Mocks.MockManager
-  {
-    class MockDataAccessor extends Mocks.MockAccessor
-    {
-      
-      @Override
-      public <T extends HelixProperty> T getProperty(PropertyKey key)
-      {
-        
-        PropertyType type = key.getType();
-        if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
-        {
-          return (T) new ExternalView(_externalView);
-        }
-        return null;
-      }
-
-      @Override
-      public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
-      {
-        PropertyType type = key.getType();
-        List<T> result = new ArrayList<T>();
-        Class<? extends HelixProperty> clazz = key.getTypeClass();
-        if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
-        {
-          HelixProperty typedInstance = HelixProperty.convertToTypedInstance(clazz, _externalView);
-          result.add((T) typedInstance);
-          return result;
-        }
-        else if(type == PropertyType.LIVEINSTANCES)
-        {
-          return (List<T>) HelixProperty.convertToTypedList(clazz, _liveInstances);
-        }
-
-        return result;
-      }
-    }
-
-    HelixDataAccessor _accessor = new MockDataAccessor();
-    ZNRecord _externalView;
-    List<String> _instances;
-    List<ZNRecord> _liveInstances;
-    String _db = "DB";
-    int _replicas = 3;
-    int _partitions = 50;
-
-    public MockHelixManager()
-    {
-      _liveInstances = new ArrayList<ZNRecord>();
-      _instances = new ArrayList<String>();
-      for(int i = 0;i<5; i++)
-      {
-        String instance = "localhost_"+(12918+i);
-        _instances.add(instance);
-        ZNRecord metaData = new ZNRecord(instance);
-        metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
-            UUID.randomUUID().toString());
-        _liveInstances.add(metaData);
-      }
-      _externalView = IdealStateCalculatorForStorageNode.calculateIdealState(
-          _instances, _partitions, _replicas, _db, "MASTER", "SLAVE");
-
-    }
-
-    @Override
-    public boolean isConnected()
-    {
-      return true;
-    }
-
-    @Override
-    public HelixDataAccessor getHelixDataAccessor()
-    {
-      return _accessor;
-    }
-
-
-    @Override
-    public String getInstanceName()
-    {
-      return "localhost_12919";
-    }
-
-    @Override
-    public InstanceType getInstanceType()
-    {
-      return InstanceType.PARTICIPANT;
-    }
-  }
-
-  class TestMessageHandlerFactory implements MessageHandlerFactory
-  {
-    class TestMessageHandler extends MessageHandler
-    {
-
-      public TestMessageHandler(Message message, NotificationContext context)
-      {
-        super(message, context);
-        // TODO Auto-generated constructor stub
-      }
-
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException
-      {
-        HelixTaskResult result = new HelixTaskResult();
-        result.setSuccess(true);
-        return result;
-      }
-
-      @Override
-      public void onError( Exception e, ErrorCode code, ErrorType type)
-      {
-        // TODO Auto-generated method stub
-        
-      }
-    }
-    @Override
-    public MessageHandler createHandler(Message message,
-        NotificationContext context)
-    {
-      // TODO Auto-generated method stub
-      return new TestMessageHandler(message, context);
-    }
-
-    @Override
-    public String getMessageType()
-    {
-      // TODO Auto-generated method stub
-      return "TestingMessageHandler";
-    }
-
-    @Override
-    public void reset()
-    {
-      // TODO Auto-generated method stub
-
-    }
-  }
-
-  @Test()
-  public void TestMessageSend()
-  {
-    HelixManager manager = new MockHelixManager();
-    DefaultMessagingService svc = new DefaultMessagingService(manager);
-    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    svc.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setInstanceName("localhost_12919");
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setSelfExcluded(true);
-
-    Message template = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-    AssertJUnit.assertEquals(0, svc.send(recipientCriteria, template));
-
-    recipientCriteria.setSelfExcluded(false);
-    AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
-
-
-    recipientCriteria.setSelfExcluded(false);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource("DB");
-    recipientCriteria.setPartition("%");
-    AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
-
-    recipientCriteria.setSelfExcluded(true);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource("DB");
-    recipientCriteria.setPartition("%");
-    AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
-
-    recipientCriteria.setSelfExcluded(true);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource("DB");
-    recipientCriteria.setPartition("%");
-    AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
-
-    recipientCriteria.setSelfExcluded(true);
-    recipientCriteria.setInstanceName("localhost_12920");
-    recipientCriteria.setResource("DB");
-    recipientCriteria.setPartition("%");
-    AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
-
-
-    recipientCriteria.setSelfExcluded(true);
-    recipientCriteria.setInstanceName("localhost_12920");
-    recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
-    recipientCriteria.setResource("DB");
-    recipientCriteria.setPartition("%");
-    AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestConfigThreadpoolSize.java
deleted file mode 100644
index 08c2f86..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package com.linkedin.helix.messaging.handling;
-
-import java.util.HashSet;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.integration.ZkStandAloneCMTestBase;
-import com.linkedin.helix.integration.TestMessagingService.TestMessagingHandlerFactory.TestMessagingHandler;
-import com.linkedin.helix.messaging.DefaultMessagingService;
-import com.linkedin.helix.messaging.handling.HelixTaskExecutor;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorType;
-import com.linkedin.helix.model.Message;
-
-public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase
-{
-  public static class TestMessagingHandlerFactory implements MessageHandlerFactory
-  {
-    public static HashSet<String> _processedMsgIds = new HashSet<String>();
-    
-    @Override
-    public MessageHandler createHandler(Message message,
-        NotificationContext context)
-    {
-      return null;
-    }
-    
-    @Override
-    public String getMessageType()
-    {
-      return "TestMsg";
-    }
-    
-    @Override
-    public void reset()
-    {
-      // TODO Auto-generated method stub
-    }
-    
-  }
-  
-  public static class TestMessagingHandlerFactory2 implements MessageHandlerFactory
-  {
-    public static HashSet<String> _processedMsgIds = new HashSet<String>();
-    
-    @Override
-    public MessageHandler createHandler(Message message,
-        NotificationContext context)
-    {
-      return null;
-    }
-    
-    @Override
-    public String getMessageType()
-    {
-      return "TestMsg2";
-    }
-    
-    @Override
-    public void reset()
-    {
-      // TODO Auto-generated method stub
-    }
-    
-  }
-  @Test
-  public void TestThreadPoolSizeConfig()
-  {
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    HelixManager manager = _startCMResultMap.get(instanceName)._manager;
-    
-    ConfigAccessor accessor = manager.getConfigAccessor();
-    ConfigScope scope =
-        new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(instanceName).build();
-    accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+12);
-    
-    scope =
-        new ConfigScopeBuilder().forCluster(manager.getClusterName()).build();
-    accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+8);
-    
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      
-      _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg", new TestMessagingHandlerFactory());
-      _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg2", new TestMessagingHandlerFactory2());
-      
-    
-    }
-    
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      
-      DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
-      HelixTaskExecutor helixExecutor = svc.getExecutor();
-      ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg"));
-      
-      ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg2"));
-      if(i != 0)
-      {
-        
-        Assert.assertEquals(8, executor.getMaximumPoolSize());
-      }
-      else
-      {
-        Assert.assertEquals(12, executor.getMaximumPoolSize());
-      }
-      Assert.assertEquals(HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, executor2.getMaximumPoolSize());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestHelixTaskExecutor.java
deleted file mode 100644
index 2a4619e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestHelixTaskExecutor.java
+++ /dev/null
@@ -1,584 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.Mocks;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-
-public class TestHelixTaskExecutor
-{
-  public static class MockClusterManager extends Mocks.MockManager
-  {
-    @Override
-    public String getSessionId()
-    {
-      return "123";
-    }
-  }
-
-  class TestMessageHandlerFactory implements MessageHandlerFactory
-  {
-    int _handlersCreated = 0;
-    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
-    class TestMessageHandler extends MessageHandler
-    {
-      public TestMessageHandler(Message message, NotificationContext context)
-      {
-        super(message, context);
-        // TODO Auto-generated constructor stub
-      }
-
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException
-      {
-        HelixTaskResult result = new HelixTaskResult();
-        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
-        Thread.currentThread().sleep(100);
-        result.setSuccess(true);
-        return result;
-      }
-
-      @Override
-      public void onError(Exception e, ErrorCode code, ErrorType type)
-      {
-        // TODO Auto-generated method stub
-        
-      }
-    }
-    @Override
-    public MessageHandler createHandler(Message message,
-        NotificationContext context)
-    {
-      // TODO Auto-generated method stub
-      if(message.getMsgSubType()!= null && message.getMsgSubType().equals("EXCEPTION"))
-      {
-        throw new HelixException("Test Message handler exception, can ignore");
-      }
-      _handlersCreated++;
-      return new TestMessageHandler(message, context);
-    }
-
-    @Override
-    public String getMessageType()
-    {
-      // TODO Auto-generated method stub
-      return "TestingMessageHandler";
-    }
-
-    @Override
-    public void reset()
-    {
-      // TODO Auto-generated method stub
-
-    }
-  }
-
-  class TestMessageHandlerFactory2 extends TestMessageHandlerFactory
-  {
-    @Override
-    public String getMessageType()
-    {
-      // TODO Auto-generated method stub
-      return "TestingMessageHandler2";
-    }
-    
-  }
-
-  class CancellableHandlerFactory implements MessageHandlerFactory
-  {
-
-    int _handlersCreated = 0;
-    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
-    ConcurrentHashMap<String, String> _processingMsgIds = new ConcurrentHashMap<String, String>();
-    ConcurrentHashMap<String, String> _timedOutMsgIds = new ConcurrentHashMap<String, String>();
-    class CancellableHandler extends MessageHandler
-    {
-      public CancellableHandler(Message message, NotificationContext context)
-      {
-        super(message, context);
-        // TODO Auto-generated constructor stub
-      }
-      public boolean _interrupted = false;
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException
-      {
-        HelixTaskResult result = new HelixTaskResult();
-        int sleepTimes = 15;
-        if(_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
-        {
-          sleepTimes = 10;
-        }
-        _processingMsgIds.put(_message.getMsgId(), _message.getMsgId());
-        try
-        {
-          for (int i = 0; i < sleepTimes; i++)
-          {
-            Thread.sleep(100);
-          }
-        } 
-        catch (InterruptedException e)
-        {
-          _interrupted = true;
-          _timedOutMsgIds.put(_message.getMsgId(), "");
-          result.setInterrupted(true);
-          if(!_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
-          {
-            _message.getRecord().setSimpleField("Cancelcount", "1");
-          }
-          else
-          {
-            int c = Integer.parseInt( _message.getRecord().getSimpleField("Cancelcount"));
-            _message.getRecord().setSimpleField("Cancelcount", ""+(c + 1));
-          }
-          throw e;
-        }
-        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
-        result.setSuccess(true);
-        return result;
-      }
-      @Override
-      public void onError(Exception e, ErrorCode code, ErrorType type)
-      {
-        // TODO Auto-generated method stub
-        _message.getRecord().setSimpleField("exception", e.getMessage());
-      }
-    }
-    @Override
-    public MessageHandler createHandler(Message message,
-        NotificationContext context)
-    {
-      // TODO Auto-generated method stub
-      _handlersCreated++;
-      return new CancellableHandler(message, context);
-    }
-
-    @Override
-    public String getMessageType()
-    {
-      // TODO Auto-generated method stub
-      return "Cancellable";
-    }
-
-    @Override
-    public void reset()
-    {
-      // TODO Auto-generated method stub
-      _handlersCreated = 0;
-      _processedMsgIds.clear();
-       _processingMsgIds.clear();
-      _timedOutMsgIds.clear();
-    }
-  }
-
-  @Test ()
-  public void testNormalMsgExecution() throws InterruptedException
-  {
-    System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
-    HelixTaskExecutor executor = new HelixTaskExecutor();
-    HelixManager manager = new MockClusterManager();
-
-    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
-    TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
-    executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
-
-    NotificationContext changeContext = new NotificationContext(manager);
-    List<Message> msgList = new ArrayList<Message>();
-
-    int nMsgs1 = 5;
-    for(int i = 0; i < nMsgs1; i++)
-    {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId(manager.getSessionId());
-      msg.setTgtName("Localhost_1123");
-      msg.setSrcName("127.101.1.23_2234");
-      msg.setCorrelationId(UUID.randomUUID().toString());
-      msgList.add(msg);
-    }
-
-
-    int nMsgs2 = 6;
-    for(int i = 0; i < nMsgs2; i++)
-    {
-      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId(manager.getSessionId());
-      msg.setTgtName("Localhost_1123");
-      msg.setSrcName("127.101.1.23_2234");
-      msg.setCorrelationId(UUID.randomUUID().toString());
-      msgList.add(msg);
-    }
-    executor.onMessage("someInstance", msgList, changeContext);
-
-    Thread.sleep(1000);
-
-    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
-    AssertJUnit.assertTrue(factory2._processedMsgIds.size() == nMsgs2);
-    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
-    AssertJUnit.assertTrue(factory2._handlersCreated == nMsgs2);
-
-    for(Message record : msgList)
-    {
-      AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(record.getId()) || factory2._processedMsgIds.containsKey(record.getId()));
-      AssertJUnit.assertFalse(factory._processedMsgIds.containsKey(record.getId()) && factory2._processedMsgIds.containsKey(record.getId()));
-
-    }
-    System.out.println("END TestCMTaskExecutor.testNormalMsgExecution()");
-  }
-
-  @Test ()
-  public void testUnknownTypeMsgExecution() throws InterruptedException
-  {
-    HelixTaskExecutor executor = new HelixTaskExecutor();
-    HelixManager manager = new MockClusterManager();
-
-    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
-    TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
-
-    NotificationContext changeContext = new NotificationContext(manager);
-    List<Message> msgList = new ArrayList<Message>();
-
-    int nMsgs1 = 5;
-    for(int i = 0; i < nMsgs1; i++)
-    {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId(manager.getSessionId());
-      msg.setTgtName("Localhost_1123");
-      msg.setSrcName("127.101.1.23_2234");
-      msgList.add(msg);
-    }
-
-
-    int nMsgs2 = 4;
-    for(int i = 0; i < nMsgs2; i++)
-    {
-      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId(manager.getSessionId());
-      msg.setTgtName("Localhost_1123");
-      msg.setSrcName("127.101.1.23_2234");
-      msgList.add(msg);
-    }
-    executor.onMessage("someInstance", msgList, changeContext);
-
-    Thread.sleep(1000);
-
-    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
-    AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
-    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
-    AssertJUnit.assertTrue(factory2._handlersCreated == 0);
-
-    for(Message message : msgList)
-    {
-      if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
-      {
-        AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
-      }
-    }
-  }
-
-
-  @Test ()
-  public void testMsgSessionId() throws InterruptedException
-  {
-    HelixTaskExecutor executor = new HelixTaskExecutor();
-    HelixManager manager = new MockClusterManager();
-
-    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
-    TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
-    executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
-
-    NotificationContext changeContext = new NotificationContext(manager);
-    List<Message> msgList = new ArrayList<Message>();
-
-    int nMsgs1 = 5;
-    for(int i = 0; i < nMsgs1; i++)
-    {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId("*");
-      msg.setTgtName("");
-      msgList.add(msg);
-    }
-
-
-    int nMsgs2 = 4;
-    for(int i = 0; i < nMsgs2; i++)
-    {
-      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId("some other session id");
-      msg.setTgtName("");
-      msgList.add(msg);
-    }
-    executor.onMessage("someInstance", msgList, changeContext);
-
-    Thread.sleep(1000);
-
-    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
-    AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
-    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
-    AssertJUnit.assertTrue(factory2._handlersCreated == 0);
-
-    for(Message message : msgList)
-    {
-      if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
-      {
-        AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
-      }
-    }
-  }
-
-  @Test()
-  public void testCreateHandlerException() throws InterruptedException
-  {
-    System.out.println("START TestCMTaskExecutor.testCreateHandlerException()");
-    HelixTaskExecutor executor = new HelixTaskExecutor();
-    HelixManager manager = new MockClusterManager();
-
-    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-    
-
-    NotificationContext changeContext = new NotificationContext(manager);
-    List<Message> msgList = new ArrayList<Message>();
-
-    int nMsgs1 = 5;
-    for(int i = 0; i < nMsgs1; i++)
-    {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId(manager.getSessionId());
-      msg.setTgtName("Localhost_1123");
-      msg.setSrcName("127.101.1.23_2234");
-      msg.setCorrelationId(UUID.randomUUID().toString());
-      msgList.add(msg);
-    }
-    Message exceptionMsg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-    exceptionMsg.setTgtSessionId(manager.getSessionId());
-    exceptionMsg.setMsgSubType("EXCEPTION");
-    exceptionMsg.setTgtName("Localhost_1123");
-    exceptionMsg.setSrcName("127.101.1.23_2234");
-    exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
-    msgList.add(exceptionMsg);
-    
-    executor.onMessage("someInstance", msgList, changeContext);
-
-    Thread.sleep(1000);
-
-    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
-    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
-
-    AssertJUnit.assertTrue(exceptionMsg.getMsgState() == MessageState.UNPROCESSABLE);
-    System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
-  }
-
-  @Test ()
-  public void testTaskCancellation() throws InterruptedException
-  {
-    HelixTaskExecutor executor = new HelixTaskExecutor();
-    HelixManager manager = new MockClusterManager();
-
-    CancellableHandlerFactory factory = new CancellableHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
-    NotificationContext changeContext = new NotificationContext(manager);
-    List<Message> msgList = new ArrayList<Message>();
-
-    int nMsgs1 = 0;
-    for(int i = 0; i < nMsgs1; i++)
-    {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId("*");
-      msg.setTgtName("Localhost_1123");
-      msg.setSrcName("127.101.1.23_2234");
-      msgList.add(msg);
-    }
-
-    List<Message> msgListToCancel = new ArrayList<Message>();
-    int nMsgs2 = 4;
-    for(int i = 0; i < nMsgs2; i++)
-    {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId("*");
-      msgList.add(msg);
-      msg.setTgtName("Localhost_1123");
-      msg.setSrcName("127.101.1.23_2234");
-      msgListToCancel.add(msg);
-    }
-    executor.onMessage("someInstance", msgList, changeContext);
-    Thread.sleep(500);
-    for(int i = 0; i < nMsgs2; i++)
-    {
-      executor.cancelTask(msgListToCancel.get(i), changeContext);
-    }
-    Thread.sleep(1500);
-
-    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
-    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1 + nMsgs2);
-
-    AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2);
-
-    for(Message message : msgList)
-    {
-      if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
-      {
-        AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
-      }
-    }
-  }
-
-
-  @Test ()
-  public void testShutdown() throws InterruptedException
-  {
-     System.out.println("START TestCMTaskExecutor.testShutdown()");
-     HelixTaskExecutor executor = new HelixTaskExecutor();
-      HelixManager manager = new MockClusterManager();
-
-      TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
-      executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
-      TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
-      executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
-
-      CancellableHandlerFactory factory3 = new CancellableHandlerFactory();
-      executor.registerMessageHandlerFactory(factory3.getMessageType(), factory3);
-      int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10;
-      List<Message> msgList = new ArrayList<Message>();
-
-      for(int i = 0; i < nMsg1; i++)
-      {
-        Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-        msg.setTgtSessionId("*");
-        msg.setTgtName("Localhost_1123");
-        msg.setSrcName("127.101.1.23_2234");
-        msgList.add(msg);
-      }
-
-      for(int i = 0; i < nMsg2; i++)
-      {
-        Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
-        msg.setTgtSessionId("*");
-        msgList.add(msg);
-        msg.setTgtName("Localhost_1123");
-        msg.setSrcName("127.101.1.23_2234");
-        msgList.add(msg);
-      }
-
-      for(int i = 0; i < nMsg3; i++)
-      {
-        Message msg = new Message(factory3.getMessageType(), UUID.randomUUID().toString());
-        msg.setTgtSessionId("*");
-        msgList.add(msg);
-        msg.setTgtName("Localhost_1123");
-        msg.setSrcName("127.101.1.23_2234");
-        msgList.add(msg);
-      }
-      NotificationContext changeContext = new NotificationContext(manager);
-      executor.onMessage("some", msgList, changeContext);
-      Thread.sleep(500);
-      for(ExecutorService svc : executor._threadpoolMap.values())
-      {
-        Assert.assertFalse(svc.isShutdown());
-      }
-      Assert.assertTrue(factory._processedMsgIds.size() > 0);
-      executor.shutDown();
-      for(ExecutorService svc : executor._threadpoolMap.values())
-      {
-        Assert.assertTrue(svc.isShutdown());
-      }
-      System.out.println("END TestCMTaskExecutor.testShutdown()");
-  }
-  
-  @Test ()
-  public void testRetryCount() throws InterruptedException
-  {
-    String p = "test_";
-    System.out.println(p.substring(p.lastIndexOf('_')+1));
-    HelixTaskExecutor executor = new HelixTaskExecutor();
-    HelixManager manager = new MockClusterManager();
-
-    CancellableHandlerFactory factory = new CancellableHandlerFactory();
-    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
-    NotificationContext changeContext = new NotificationContext(manager);
-
-    List<Message> msgList = new ArrayList<Message>();
-    int nMsgs2 = 4;
-    // Test the case in which retry = 0
-    for(int i = 0; i < nMsgs2; i++)
-    {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId("*");
-      msg.setTgtName("Localhost_1123");
-      msg.setSrcName("127.101.1.23_2234");
-      msg.setExecutionTimeout((i+1) * 600);
-      msgList.add(msg);
-    }
-    executor.onMessage("someInstance", msgList, changeContext);
-    
-    Thread.sleep(4000);
-
-    AssertJUnit.assertTrue(factory._handlersCreated ==  nMsgs2);
-    AssertJUnit.assertEquals(factory._timedOutMsgIds.size() , 2);
-    //AssertJUnit.assertFalse(msgList.get(0).getRecord().getSimpleFields().containsKey("TimeOut"));
-    for(int i = 0; i<nMsgs2 - 2; i++)
-    {
-      if(msgList.get(i).getMsgType().equalsIgnoreCase(factory.getMessageType()))
-      {
-        AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields().containsKey("Cancelcount"));
-        AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
-      }
-    }
-    factory.reset();
-    msgList.clear();
-    // Test the case that the message are executed for the second time
-    nMsgs2 = 4;
-    for(int i = 0; i < nMsgs2; i++)
-    {
-      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
-      msg.setTgtSessionId("*");
-      msg.setTgtName("Localhost_1123");
-      msg.setSrcName("127.101.1.23_2234");
-      msg.setExecutionTimeout((i+1) * 600);
-      msg.setRetryCount(1);
-      msgList.add(msg);
-    }
-    executor.onMessage("someInstance", msgList, changeContext);
-    Thread.sleep(3500);
-    AssertJUnit.assertEquals(factory._processedMsgIds.size(),3);
-    AssertJUnit.assertTrue(msgList.get(0).getRecord().getSimpleField("Cancelcount").equals("2"));
-    AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1"));
-    AssertJUnit.assertEquals(factory._timedOutMsgIds.size(),2);
-    AssertJUnit.assertTrue(executor._taskMap.size() == 0);
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestResourceThreadpoolSize.java
deleted file mode 100644
index 3c62a1e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.linkedin.helix.messaging.handling;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.integration.ZkStandAloneCMTestBase;
-import com.linkedin.helix.messaging.DefaultMessagingService;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase
-{
-  @Test
-  public void TestThreadPoolSizeConfig()
-  {
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    HelixManager manager = _startCMResultMap.get(instanceName)._manager;
-    ConfigAccessor accessor = manager.getConfigAccessor();
-    ConfigScope scope =
-        new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
-    accessor.set(scope, HelixTaskExecutor.MAX_THREADS, ""+12);
-    
-    _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
-    
-    boolean result = ClusterStateVerifier.verifyByPolling(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-    
-    long taskcount = 0; 
-    for (int i = 0; i < NODE_NR; i++)
-    {
-      instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      
-      DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
-      HelixTaskExecutor helixExecutor = svc.getExecutor();
-      ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get(MessageType.STATE_TRANSITION + "." + "NextDB"));
-      Assert.assertEquals(12, executor.getMaximumPoolSize());
-      taskcount += executor.getCompletedTaskCount();
-      Assert.assertTrue(executor.getCompletedTaskCount() > 0);
-    }
-    Assert.assertEquals(taskcount, 64 * 4);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/consumer/ConsumerAdapter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/ConsumerAdapter.java b/helix-core/src/test/java/com/linkedin/helix/mock/consumer/ConsumerAdapter.java
deleted file mode 100644
index 7138171..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/ConsumerAdapter.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.consumer;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.ExternalView;
-
-public class ConsumerAdapter implements ExternalViewChangeListener
-{
-
-  HelixManager relayHelixManager;
-  DataAccessor relayClusterClient;
-  private final ConcurrentHashMap<String, RelayConsumer> relayConsumers;
-  private final ConcurrentHashMap<String, RelayConfig> relayConfigs;
-  private static Logger logger = Logger.getLogger(ConsumerAdapter.class);
-
-  public ConsumerAdapter(String instanceName, String zkServer,
-      String clusterName) throws Exception
-  {
-    relayConsumers = new ConcurrentHashMap<String, RelayConsumer>();
-    relayConfigs = new ConcurrentHashMap<String, RelayConfig>();
-
-//    relayClusterManager = ClusterManagerFactory.getZKBasedManagerForSpectator(
-//        clusterName, zkServer);
-    relayHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
-                                                                    null,
-                                                                    InstanceType.SPECTATOR,
-                                                                    zkServer);
-
-    relayHelixManager.connect();
-    relayHelixManager.addExternalViewChangeListener(this);
-
-  }
-
-  private RelayConfig getRelayConfig(ZNRecord externalView, String partition)
-  {
-    LinkedHashMap<String, String> relayList = (LinkedHashMap<String, String>) externalView
-        .getMapField(partition);
-
-    if (relayList == null)
-      return null;
-
-    return new RelayConfig(relayList);
-
-  }
-
-  @Override
-  public void onExternalViewChange(List<ExternalView> externalViewList,
-      NotificationContext changeContext)
-  {
-    logger.info("onExternalViewChange invoked");
-
-    for (ExternalView subview : externalViewList)
-    {
-      Map<String, Map<String, String>> partitions = subview.getRecord().getMapFields();
-
-      for (Entry<String, Map<String, String>> partitionConsumer : partitions
-          .entrySet())
-      {
-        String partition = partitionConsumer.getKey();
-        Map<String, String> relayList = partitionConsumer.getValue();
-        RelayConfig relayConfig = new RelayConfig(relayList);
-        relayConfigs.put(partition, relayConfig);
-        RelayConsumer consumer = relayConsumers.get(partition);
-        if (consumer != null)
-        {
-          consumer.setConfig(relayConfig);
-        }
-      }
-    }
-  }
-
-  public void start()
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  public RelayConsumer getNewRelayConsumer(String dbName, String partition)
-      throws Exception
-  {
-    RelayConsumer consumer = new RelayConsumer(null, partition);
-
-    if (relayConsumers.putIfAbsent(partition, consumer) != null)
-    {
-      throw new Exception("Existing consumer");
-    }
-    logger.info("created new consumer for partition" + partition);
-
-    RelayConfig relayConfig = relayConfigs.get(partition);
-    if (relayConfig != null)
-    {
-      consumer.setConfig(relayConfig);
-    }
-
-    return consumer;
-  }
-
-  public void removeConsumer(String partition) throws Exception
-  {
-    if (relayConsumers.remove(partition) == null)
-    {
-      throw new Exception("Non Existing consumer for partition " + partition);
-    }
-    logger.info("Removed consumer for partition " + partition);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConfig.java b/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConfig.java
deleted file mode 100644
index f75c213..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.consumer;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.Map.Entry;
-
-public class RelayConfig
-{
-  String partitionId;
-  private ArrayList<String> masterRelays;
-  private ArrayList<String> slaveRelays;
-
-  public RelayConfig(Map<String, String> relayList)
-  {
-
-    masterRelays = new ArrayList<String>();
-    slaveRelays = new ArrayList<String>();
-
-    for (Entry<String, String> entry : relayList.entrySet())
-    {
-      String relayInstance = entry.getKey();
-      String relayState = entry.getValue();
-
-      if (relayState.equals("MASTER"))
-      {
-        masterRelays.add(relayInstance);
-      } else if (relayState.equals("SLAVE"))
-      {
-        slaveRelays.add(relayInstance);
-      }
-    }
-  }
-
-  public String getClusterName()
-  {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  public String getzkServer()
-  {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  public String getMaster()
-  {
-    if (masterRelays.isEmpty())
-      return null;
-
-    return masterRelays.get(0);
-  }
-
-  public List<String> getRelays()
-  {
-    List<String> relays = new ArrayList<String>();
-    relays.addAll(masterRelays);
-    relays.addAll(slaveRelays);
-
-    return relays;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConsumer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConsumer.java b/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConsumer.java
deleted file mode 100644
index 4f9b649..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConsumer.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.consumer;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixManager;
-
-public class RelayConsumer
-{
-  HelixManager relayHelixManager;
-  DataAccessor relayClusterClient;
-  private final String partition;
-  private RelayConfig currentRelay;
-  private static Logger logger = Logger.getLogger(RelayConsumer.class);
-
-  public RelayConsumer(RelayConfig relayConfig, String partition)
-  {
-    this.partition = partition;
-    this.currentRelay = relayConfig;
-  }
-
-  public void stop()
-  {
-    if (currentRelay != null)
-    {
-      logger.info("RelayConsumer stopping listening from relay "
-          + currentRelay.getMaster());
-    }
-  }
-
-  public boolean isPointingTo(RelayConfig relayConfig)
-  {
-    return false;
-  }
-
-  public void start()
-  {
-    if (currentRelay != null)
-    {
-      logger.info("RelayConsumer starting listening from relay "
-          + currentRelay.getMaster());
-    }
-  }
-
-  /*
-   * This is required at relayConsumer to reach out relays which are hosting
-   * data for slaved partitions.
-   */
-  void getRelaysForPartition(Integer partitionId)
-  {
-
-  }
-
-  public long getHwm()
-  {
-    // TODO this is supposed to return the last checkpoint from this
-    // consumer
-    return 0;
-  }
-
-  public String getPartition()
-  {
-    // TODO Auto-generated method stub
-    return partition;
-  }
-
-  public Object getCurrentRelay()
-  {
-    // TODO Auto-generated method stub
-    return currentRelay;
-  }
-
-  public synchronized void setConfig(RelayConfig relayConfig)
-  {
-    // TODO Auto-generated method stub
-    currentRelay = relayConfig;
-    logger.info("Setting config to relay " + relayConfig.getMaster());
-  }
-
-  public void flush()
-  {
-    assert (currentRelay != null);
-    logger.info("Consumer flushing for partition " + partition);
-    if (currentRelay == null || currentRelay.getRelays() == null)
-      return;
-
-    for (String relay : currentRelay.getRelays())
-    {
-      logger.info("Reading (flush) from relay " + relay);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/controller/ClusterController.java b/helix-core/src/test/java/com/linkedin/helix/mock/controller/ClusterController.java
deleted file mode 100644
index 0541f4b..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/controller/ClusterController.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package com.linkedin.helix.mock.controller;
-
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.participant.DistClusterControllerStateModelFactory;
-import com.linkedin.helix.participant.StateMachineEngine;
-
-public class ClusterController extends Thread
-{
-  private static Logger        LOG                      =
-                                                            Logger.getLogger(ClusterController.class);
-
-  private final CountDownLatch _startCountDown          = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown           = new CountDownLatch(1);
-  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-  private final String         _controllerMode;
-  private final String         _zkAddr;
-
-  private HelixManager   _manager;
-
-  public ClusterController(String clusterName, String controllerName, String zkAddr) throws Exception
-  {
-    this(clusterName, controllerName, zkAddr, HelixControllerMain.STANDALONE.toString());
-  }
-
-  public ClusterController(String clusterName,
-                           String controllerName,
-                           String zkAddr,
-                           String controllerMode) throws Exception
-  {
-    _controllerMode = controllerMode;
-    _zkAddr = zkAddr;
-
-    if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
-    {
-      _manager =
-          HelixManagerFactory.getZKHelixManager(clusterName,
-                                                controllerName,
-                                                InstanceType.CONTROLLER,
-                                                zkAddr);
-    }
-    else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
-    {
-      _manager =
-          HelixManagerFactory.getZKHelixManager(clusterName,
-                                                controllerName,
-                                                InstanceType.CONTROLLER_PARTICIPANT,
-                                                zkAddr);
-
-    }
-    else
-    {
-      throw new IllegalArgumentException("Controller mode: " + controllerMode
-          + " NOT recoginized");
-    }
-  }
-
-  public HelixManager getManager()
-  {
-    return _manager;
-  }
-
-  public void syncStop()
-  {
-    if (_manager == null)
-    {
-      LOG.warn("manager already stopped");
-      return;
-    }
-
-    _stopCountDown.countDown();
-    try
-    {
-      _waitStopFinishCountDown.await();
-    }
-    catch (InterruptedException e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-  }
-
-  public void syncStart()
-  {
-    // TODO: prevent start multiple times
-    
-    super.start();
-    try
-    {
-      _startCountDown.await();
-    }
-    catch (InterruptedException e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void run()
-  {
-    try
-    {
-      try
-      {
-        if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
-        {
-          _manager.connect();
-        }
-        else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
-        {
-          DistClusterControllerStateModelFactory stateModelFactory =
-              new DistClusterControllerStateModelFactory(_zkAddr);
-
-          StateMachineEngine stateMach = _manager.getStateMachineEngine();
-          stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
-          _manager.connect();
-        }
-      }
-      catch (Exception e)
-      {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-      finally
-      {
-        _startCountDown.countDown();
-        _stopCountDown.await();
-      }
-    }
-    catch (Exception e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    finally
-    {
-      synchronized (_manager)
-      {
-        _manager.disconnect();
-        _manager = null;
-      }
-      _waitStopFinishCountDown.countDown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockController.java b/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockController.java
deleted file mode 100644
index d67bd40..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockController.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.controller;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState.IdealStateProperty;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.util.HelixUtil;
-
-public class MockController
-{
-  private final ZkClient client;
-  private final String srcName;
-  private final String clusterName;
-
-  public MockController(String src, String zkServer, String cluster)
-  {
-    srcName = src;
-    clusterName = cluster;
-    client = new ZkClient(zkServer);
-    client.setZkSerializer(new ZNRecordSerializer());
-  }
-
-  void sendMessage(String msgId, String instanceName, String fromState,
-      String toState, String partitionKey, int partitionId)
-      throws InterruptedException, JsonGenerationException,
-      JsonMappingException, IOException
-  {
-    Message message = new Message(MessageType.STATE_TRANSITION, msgId);
-    message.setMsgId(msgId);
-    message.setSrcName(srcName);
-    message.setTgtName(instanceName);
-    message.setMsgState(MessageState.NEW);
-    message.setFromState(fromState);
-    message.setToState(toState);
-    // message.setPartitionId(partitionId);
-    message.setPartitionName(partitionKey);
-
-    String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/"
-        + message.getId();
-    ObjectMapper mapper = new ObjectMapper();
-    StringWriter sw = new StringWriter();
-    mapper.writeValueUsingView(sw, message, Message.class);
-    System.out.println(sw.toString());
-    client.delete(path);
-
-    Thread.sleep(10000);
-    ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName,
-        instanceName));
-    message.setTgtSessionId(record.getSimpleField(
-        LiveInstanceProperty.SESSION_ID.toString()).toString());
-    client.createPersistent(path, message);
-  }
-
-  public void createExternalView(List<String> instanceNames, int partitions,
-      int replicas, String dbName, long randomSeed)
-  {
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(client));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    ExternalView externalView = new ExternalView(computeRoutingTable(instanceNames, partitions,
-                                                                     replicas, dbName, randomSeed));
-
-    accessor.setProperty(keyBuilder.externalView(dbName), externalView);
-  }
-
-  public ZNRecord computeRoutingTable(List<String> instanceNames,
-      int partitions, int replicas, String dbName, long randomSeed)
-  {
-    assert (instanceNames.size() > replicas);
-    Collections.sort(instanceNames);
-
-    ZNRecord result = new ZNRecord(dbName);
-
-    Map<String, Object> externalView = new TreeMap<String, Object>();
-
-    List<Integer> partitionList = new ArrayList<Integer>(partitions);
-    for (int i = 0; i < partitions; i++)
-    {
-      partitionList.add(new Integer(i));
-    }
-    Random rand = new Random(randomSeed);
-    // Shuffle the partition list
-    Collections.shuffle(partitionList, rand);
-
-    for (int i = 0; i < partitionList.size(); i++)
-    {
-      int partitionId = partitionList.get(i);
-      Map<String, String> partitionAssignment = new TreeMap<String, String>();
-      int masterNode = i % instanceNames.size();
-      // the first in the list is the node that contains the master
-      partitionAssignment.put(instanceNames.get(masterNode), "MASTER");
-
-      // for the jth replica, we put it on (masterNode + j) % nodes-th
-      // node
-      for (int j = 1; j <= replicas; j++)
-      {
-        partitionAssignment
-            .put(instanceNames.get((masterNode + j) % instanceNames.size()),
-                "SLAVE");
-      }
-      String partitionName = dbName + ".partition-" + partitionId;
-      result.setMapField(partitionName, partitionAssignment);
-    }
-    result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "" + partitions);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockControllerProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockControllerProcess.java b/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockControllerProcess.java
deleted file mode 100644
index 5d2318b..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockControllerProcess.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.controller;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-
-public class MockControllerProcess
-{
-
-  /**
-   * @param args
-   * @throws IOException
-   * @throws JsonMappingException
-   * @throws JsonGenerationException
-   * @throws InterruptedException
-   */
-  public static void main(String[] args) throws JsonGenerationException,
-      JsonMappingException, InterruptedException, IOException
-  {
-
-    MockController storageController = new MockController("cm-instance-0",
-        "localhost:2181", "storage-cluster");
-    MockController relayController = new MockController("cm-instance-0",
-        "localhost:2181", "relay-cluster");
-
-    ArrayList<String> instanceNames = new ArrayList<String>();
-    instanceNames.add("relay0");
-    instanceNames.add("relay1");
-    instanceNames.add("relay2");
-    instanceNames.add("relay3");
-    instanceNames.add("relay4");
-
-    relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 0);
-
-    // Messages to initiate offline->slave->master->slave transitions
-
-    storageController.sendMessage("TestMessageId1", "localhost_8900",
-        "Offline", "Slave", "EspressoDB.partition-0", 0);
-    Thread.sleep(10000);
-    storageController.sendMessage("TestMessageId2", "localhost_8900", "Slave",
-        "Master", "EspressoDB.partition-0", 0);
-    Thread.sleep(10000);
-    storageController.sendMessage("TestMessageId3", "localhost_8900", "Master",
-        "Slave", "EspressoDB.partition-0", 0);
-    Thread.sleep(10000);
-
-    // Change the external view to trigger the consumer to listen from
-    // another relay
-    relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 10);
-
-    storageController.sendMessage("TestMessageId4", "localhost_8900", "Slave",
-        "Offline", "EspressoDB.partition-0", 0);
-    Thread.sleep(10000);
-  }
-
-}