You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC

[38/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
new file mode 100644
index 0000000..f03d44e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
@@ -0,0 +1,243 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.Mocks;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestDefaultMessagingService
+{
+  class MockHelixManager extends Mocks.MockManager
+  {
+    class MockDataAccessor extends Mocks.MockAccessor
+    {
+      
+      @Override
+      public <T extends HelixProperty> T getProperty(PropertyKey key)
+      {
+        
+        PropertyType type = key.getType();
+        if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
+        {
+          return (T) new ExternalView(_externalView);
+        }
+        return null;
+      }
+
+      @Override
+      public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
+      {
+        PropertyType type = key.getType();
+        List<T> result = new ArrayList<T>();
+        Class<? extends HelixProperty> clazz = key.getTypeClass();
+        if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
+        {
+          HelixProperty typedInstance = HelixProperty.convertToTypedInstance(clazz, _externalView);
+          result.add((T) typedInstance);
+          return result;
+        }
+        else if(type == PropertyType.LIVEINSTANCES)
+        {
+          return (List<T>) HelixProperty.convertToTypedList(clazz, _liveInstances);
+        }
+
+        return result;
+      }
+    }
+
+    HelixDataAccessor _accessor = new MockDataAccessor();
+    ZNRecord _externalView;
+    List<String> _instances;
+    List<ZNRecord> _liveInstances;
+    String _db = "DB";
+    int _replicas = 3;
+    int _partitions = 50;
+
+    public MockHelixManager()
+    {
+      _liveInstances = new ArrayList<ZNRecord>();
+      _instances = new ArrayList<String>();
+      for(int i = 0;i<5; i++)
+      {
+        String instance = "localhost_"+(12918+i);
+        _instances.add(instance);
+        ZNRecord metaData = new ZNRecord(instance);
+        metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
+            UUID.randomUUID().toString());
+        _liveInstances.add(metaData);
+      }
+      _externalView = IdealStateCalculatorForStorageNode.calculateIdealState(
+          _instances, _partitions, _replicas, _db, "MASTER", "SLAVE");
+
+    }
+
+    @Override
+    public boolean isConnected()
+    {
+      return true;
+    }
+
+    @Override
+    public HelixDataAccessor getHelixDataAccessor()
+    {
+      return _accessor;
+    }
+
+
+    @Override
+    public String getInstanceName()
+    {
+      return "localhost_12919";
+    }
+
+    @Override
+    public InstanceType getInstanceType()
+    {
+      return InstanceType.PARTICIPANT;
+    }
+  }
+
+  class TestMessageHandlerFactory implements MessageHandlerFactory
+  {
+    class TestMessageHandler extends MessageHandler
+    {
+
+      public TestMessageHandler(Message message, NotificationContext context)
+      {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException
+      {
+        HelixTaskResult result = new HelixTaskResult();
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override
+      public void onError( Exception e, ErrorCode code, ErrorType type)
+      {
+        // TODO Auto-generated method stub
+        
+      }
+    }
+    @Override
+    public MessageHandler createHandler(Message message,
+        NotificationContext context)
+    {
+      // TODO Auto-generated method stub
+      return new TestMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType()
+    {
+      // TODO Auto-generated method stub
+      return "TestingMessageHandler";
+    }
+
+    @Override
+    public void reset()
+    {
+      // TODO Auto-generated method stub
+
+    }
+  }
+
+  @Test()
+  public void TestMessageSend()
+  {
+    HelixManager manager = new MockHelixManager();
+    DefaultMessagingService svc = new DefaultMessagingService(manager);
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+    svc.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setInstanceName("localhost_12919");
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setSelfExcluded(true);
+
+    Message template = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+    AssertJUnit.assertEquals(0, svc.send(recipientCriteria, template));
+
+    recipientCriteria.setSelfExcluded(false);
+    AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
+
+
+    recipientCriteria.setSelfExcluded(false);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setResource("DB");
+    recipientCriteria.setPartition("%");
+    AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
+
+    recipientCriteria.setSelfExcluded(true);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setResource("DB");
+    recipientCriteria.setPartition("%");
+    AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
+
+    recipientCriteria.setSelfExcluded(true);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setResource("DB");
+    recipientCriteria.setPartition("%");
+    AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
+
+    recipientCriteria.setSelfExcluded(true);
+    recipientCriteria.setInstanceName("localhost_12920");
+    recipientCriteria.setResource("DB");
+    recipientCriteria.setPartition("%");
+    AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
+
+
+    recipientCriteria.setSelfExcluded(true);
+    recipientCriteria.setInstanceName("localhost_12920");
+    recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
+    recipientCriteria.setResource("DB");
+    recipientCriteria.setPartition("%");
+    AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
new file mode 100644
index 0000000..cdd72de
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -0,0 +1,124 @@
+package org.apache.helix.messaging.handling;
+
+import java.util.HashSet;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.TestMessagingService.TestMessagingHandlerFactory.TestMessagingHandler;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+import org.apache.helix.model.Message;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase
+{
+  public static class TestMessagingHandlerFactory implements MessageHandlerFactory
+  {
+    public static HashSet<String> _processedMsgIds = new HashSet<String>();
+    
+    @Override
+    public MessageHandler createHandler(Message message,
+        NotificationContext context)
+    {
+      return null;
+    }
+    
+    @Override
+    public String getMessageType()
+    {
+      return "TestMsg";
+    }
+    
+    @Override
+    public void reset()
+    {
+      // TODO Auto-generated method stub
+    }
+    
+  }
+  
+  public static class TestMessagingHandlerFactory2 implements MessageHandlerFactory
+  {
+    public static HashSet<String> _processedMsgIds = new HashSet<String>();
+    
+    @Override
+    public MessageHandler createHandler(Message message,
+        NotificationContext context)
+    {
+      return null;
+    }
+    
+    @Override
+    public String getMessageType()
+    {
+      return "TestMsg2";
+    }
+    
+    @Override
+    public void reset()
+    {
+      // TODO Auto-generated method stub
+    }
+    
+  }
+  @Test
+  public void TestThreadPoolSizeConfig()
+  {
+    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+    HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+    
+    ConfigAccessor accessor = manager.getConfigAccessor();
+    ConfigScope scope =
+        new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(instanceName).build();
+    accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+12);
+    
+    scope =
+        new ConfigScopeBuilder().forCluster(manager.getClusterName()).build();
+    accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+8);
+    
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      
+      _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg", new TestMessagingHandlerFactory());
+      _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg2", new TestMessagingHandlerFactory2());
+      
+    
+    }
+    
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      
+      DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
+      HelixTaskExecutor helixExecutor = svc.getExecutor();
+      ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg"));
+      
+      ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg2"));
+      if(i != 0)
+      {
+        
+        Assert.assertEquals(8, executor.getMaximumPoolSize());
+      }
+      else
+      {
+        Assert.assertEquals(12, executor.getMaximumPoolSize());
+      }
+      Assert.assertEquals(HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, executor2.getMaximumPoolSize());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
new file mode 100644
index 0000000..aa4ecc8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -0,0 +1,588 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.Mocks;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestHelixTaskExecutor
+{
+  public static class MockClusterManager extends Mocks.MockManager
+  {
+    @Override
+    public String getSessionId()
+    {
+      return "123";
+    }
+  }
+
+  class TestMessageHandlerFactory implements MessageHandlerFactory
+  {
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
+    class TestMessageHandler extends MessageHandler
+    {
+      public TestMessageHandler(Message message, NotificationContext context)
+      {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException
+      {
+        HelixTaskResult result = new HelixTaskResult();
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        Thread.currentThread().sleep(100);
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type)
+      {
+        // TODO Auto-generated method stub
+        
+      }
+    }
+    @Override
+    public MessageHandler createHandler(Message message,
+        NotificationContext context)
+    {
+      // TODO Auto-generated method stub
+      if(message.getMsgSubType()!= null && message.getMsgSubType().equals("EXCEPTION"))
+      {
+        throw new HelixException("Test Message handler exception, can ignore");
+      }
+      _handlersCreated++;
+      return new TestMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType()
+    {
+      // TODO Auto-generated method stub
+      return "TestingMessageHandler";
+    }
+
+    @Override
+    public void reset()
+    {
+      // TODO Auto-generated method stub
+
+    }
+  }
+
+  class TestMessageHandlerFactory2 extends TestMessageHandlerFactory
+  {
+    @Override
+    public String getMessageType()
+    {
+      // TODO Auto-generated method stub
+      return "TestingMessageHandler2";
+    }
+    
+  }
+
+  class CancellableHandlerFactory implements MessageHandlerFactory
+  {
+
+    int _handlersCreated = 0;
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
+    ConcurrentHashMap<String, String> _processingMsgIds = new ConcurrentHashMap<String, String>();
+    ConcurrentHashMap<String, String> _timedOutMsgIds = new ConcurrentHashMap<String, String>();
+    class CancellableHandler extends MessageHandler
+    {
+      public CancellableHandler(Message message, NotificationContext context)
+      {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+      public boolean _interrupted = false;
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException
+      {
+        HelixTaskResult result = new HelixTaskResult();
+        int sleepTimes = 15;
+        if(_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
+        {
+          sleepTimes = 10;
+        }
+        _processingMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        try
+        {
+          for (int i = 0; i < sleepTimes; i++)
+          {
+            Thread.sleep(100);
+          }
+        } 
+        catch (InterruptedException e)
+        {
+          _interrupted = true;
+          _timedOutMsgIds.put(_message.getMsgId(), "");
+          result.setInterrupted(true);
+          if(!_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
+          {
+            _message.getRecord().setSimpleField("Cancelcount", "1");
+          }
+          else
+          {
+            int c = Integer.parseInt( _message.getRecord().getSimpleField("Cancelcount"));
+            _message.getRecord().setSimpleField("Cancelcount", ""+(c + 1));
+          }
+          throw e;
+        }
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        result.setSuccess(true);
+        return result;
+      }
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type)
+      {
+        // TODO Auto-generated method stub
+        _message.getRecord().setSimpleField("exception", e.getMessage());
+      }
+    }
+    @Override
+    public MessageHandler createHandler(Message message,
+        NotificationContext context)
+    {
+      // TODO Auto-generated method stub
+      _handlersCreated++;
+      return new CancellableHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType()
+    {
+      // TODO Auto-generated method stub
+      return "Cancellable";
+    }
+
+    @Override
+    public void reset()
+    {
+      // TODO Auto-generated method stub
+      _handlersCreated = 0;
+      _processedMsgIds.clear();
+       _processingMsgIds.clear();
+      _timedOutMsgIds.clear();
+    }
+  }
+
+  @Test ()
+  public void testNormalMsgExecution() throws InterruptedException
+  {
+    System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
+    executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs1 = 5;
+    for(int i = 0; i < nMsgs1; i++)
+    {
+      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setCorrelationId(UUID.randomUUID().toString());
+      msgList.add(msg);
+    }
+
+
+    int nMsgs2 = 6;
+    for(int i = 0; i < nMsgs2; i++)
+    {
+      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setCorrelationId(UUID.randomUUID().toString());
+      msgList.add(msg);
+    }
+    executor.onMessage("someInstance", msgList, changeContext);
+
+    Thread.sleep(1000);
+
+    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+    AssertJUnit.assertTrue(factory2._processedMsgIds.size() == nMsgs2);
+    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
+    AssertJUnit.assertTrue(factory2._handlersCreated == nMsgs2);
+
+    for(Message record : msgList)
+    {
+      AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(record.getId()) || factory2._processedMsgIds.containsKey(record.getId()));
+      AssertJUnit.assertFalse(factory._processedMsgIds.containsKey(record.getId()) && factory2._processedMsgIds.containsKey(record.getId()));
+
+    }
+    System.out.println("END TestCMTaskExecutor.testNormalMsgExecution()");
+  }
+
+  @Test ()
+  public void testUnknownTypeMsgExecution() throws InterruptedException
+  {
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs1 = 5;
+    for(int i = 0; i < nMsgs1; i++)
+    {
+      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msgList.add(msg);
+    }
+
+
+    int nMsgs2 = 4;
+    for(int i = 0; i < nMsgs2; i++)
+    {
+      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msgList.add(msg);
+    }
+    executor.onMessage("someInstance", msgList, changeContext);
+
+    Thread.sleep(1000);
+
+    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+    AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
+    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
+    AssertJUnit.assertTrue(factory2._handlersCreated == 0);
+
+    for(Message message : msgList)
+    {
+      if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
+      {
+        AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
+      }
+    }
+  }
+
+
+  @Test ()
+  public void testMsgSessionId() throws InterruptedException
+  {
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
+    executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs1 = 5;
+    for(int i = 0; i < nMsgs1; i++)
+    {
+      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId("*");
+      msg.setTgtName("");
+      msgList.add(msg);
+    }
+
+
+    int nMsgs2 = 4;
+    for(int i = 0; i < nMsgs2; i++)
+    {
+      Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId("some other session id");
+      msg.setTgtName("");
+      msgList.add(msg);
+    }
+    executor.onMessage("someInstance", msgList, changeContext);
+
+    Thread.sleep(1000);
+
+    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+    AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
+    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
+    AssertJUnit.assertTrue(factory2._handlersCreated == 0);
+
+    for(Message message : msgList)
+    {
+      if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
+      {
+        AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
+      }
+    }
+  }
+
+  @Test()
+  public void testCreateHandlerException() throws InterruptedException
+  {
+    System.out.println("START TestCMTaskExecutor.testCreateHandlerException()");
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+    
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs1 = 5;
+    for(int i = 0; i < nMsgs1; i++)
+    {
+      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setCorrelationId(UUID.randomUUID().toString());
+      msgList.add(msg);
+    }
+    Message exceptionMsg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+    exceptionMsg.setTgtSessionId(manager.getSessionId());
+    exceptionMsg.setMsgSubType("EXCEPTION");
+    exceptionMsg.setTgtName("Localhost_1123");
+    exceptionMsg.setSrcName("127.101.1.23_2234");
+    exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
+    msgList.add(exceptionMsg);
+    
+    executor.onMessage("someInstance", msgList, changeContext);
+
+    Thread.sleep(1000);
+
+    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
+
+    AssertJUnit.assertTrue(exceptionMsg.getMsgState() == MessageState.UNPROCESSABLE);
+    System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
+  }
+
+  @Test ()
+  public void testTaskCancellation() throws InterruptedException
+  {
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    CancellableHandlerFactory factory = new CancellableHandlerFactory();
+    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs1 = 0;
+    for(int i = 0; i < nMsgs1; i++)
+    {
+      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId("*");
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msgList.add(msg);
+    }
+
+    List<Message> msgListToCancel = new ArrayList<Message>();
+    int nMsgs2 = 4;
+    for(int i = 0; i < nMsgs2; i++)
+    {
+      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId("*");
+      msgList.add(msg);
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msgListToCancel.add(msg);
+    }
+    executor.onMessage("someInstance", msgList, changeContext);
+    Thread.sleep(500);
+    for(int i = 0; i < nMsgs2; i++)
+    {
+      executor.cancelTask(msgListToCancel.get(i), changeContext);
+    }
+    Thread.sleep(1500);
+
+    AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+    AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1 + nMsgs2);
+
+    AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2);
+
+    for(Message message : msgList)
+    {
+      if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
+      {
+        AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
+      }
+    }
+  }
+
+
+  @Test ()
+  public void testShutdown() throws InterruptedException
+  {
+     System.out.println("START TestCMTaskExecutor.testShutdown()");
+     HelixTaskExecutor executor = new HelixTaskExecutor();
+      HelixManager manager = new MockClusterManager();
+
+      TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+      executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+      TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
+      executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+
+      CancellableHandlerFactory factory3 = new CancellableHandlerFactory();
+      executor.registerMessageHandlerFactory(factory3.getMessageType(), factory3);
+      int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10;
+      List<Message> msgList = new ArrayList<Message>();
+
+      for(int i = 0; i < nMsg1; i++)
+      {
+        Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+        msg.setTgtSessionId("*");
+        msg.setTgtName("Localhost_1123");
+        msg.setSrcName("127.101.1.23_2234");
+        msgList.add(msg);
+      }
+
+      for(int i = 0; i < nMsg2; i++)
+      {
+        Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+        msg.setTgtSessionId("*");
+        msgList.add(msg);
+        msg.setTgtName("Localhost_1123");
+        msg.setSrcName("127.101.1.23_2234");
+        msgList.add(msg);
+      }
+
+      for(int i = 0; i < nMsg3; i++)
+      {
+        Message msg = new Message(factory3.getMessageType(), UUID.randomUUID().toString());
+        msg.setTgtSessionId("*");
+        msgList.add(msg);
+        msg.setTgtName("Localhost_1123");
+        msg.setSrcName("127.101.1.23_2234");
+        msgList.add(msg);
+      }
+      NotificationContext changeContext = new NotificationContext(manager);
+      executor.onMessage("some", msgList, changeContext);
+      Thread.sleep(500);
+      for(ExecutorService svc : executor._threadpoolMap.values())
+      {
+        Assert.assertFalse(svc.isShutdown());
+      }
+      Assert.assertTrue(factory._processedMsgIds.size() > 0);
+      executor.shutDown();
+      for(ExecutorService svc : executor._threadpoolMap.values())
+      {
+        Assert.assertTrue(svc.isShutdown());
+      }
+      System.out.println("END TestCMTaskExecutor.testShutdown()");
+  }
+  
+  @Test ()
+  public void testRetryCount() throws InterruptedException
+  {
+    String p = "test_";
+    System.out.println(p.substring(p.lastIndexOf('_')+1));
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    CancellableHandlerFactory factory = new CancellableHandlerFactory();
+    executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+
+    List<Message> msgList = new ArrayList<Message>();
+    int nMsgs2 = 4;
+    // Test the case in which retry = 0
+    for(int i = 0; i < nMsgs2; i++)
+    {
+      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId("*");
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setExecutionTimeout((i+1) * 600);
+      msgList.add(msg);
+    }
+    executor.onMessage("someInstance", msgList, changeContext);
+    
+    Thread.sleep(4000);
+
+    AssertJUnit.assertTrue(factory._handlersCreated ==  nMsgs2);
+    AssertJUnit.assertEquals(factory._timedOutMsgIds.size() , 2);
+    //AssertJUnit.assertFalse(msgList.get(0).getRecord().getSimpleFields().containsKey("TimeOut"));
+    for(int i = 0; i<nMsgs2 - 2; i++)
+    {
+      if(msgList.get(i).getMsgType().equalsIgnoreCase(factory.getMessageType()))
+      {
+        AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields().containsKey("Cancelcount"));
+        AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
+      }
+    }
+    factory.reset();
+    msgList.clear();
+    // Test the case that the message are executed for the second time
+    nMsgs2 = 4;
+    for(int i = 0; i < nMsgs2; i++)
+    {
+      Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+      msg.setTgtSessionId("*");
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setExecutionTimeout((i+1) * 600);
+      msg.setRetryCount(1);
+      msgList.add(msg);
+    }
+    executor.onMessage("someInstance", msgList, changeContext);
+    Thread.sleep(3500);
+    AssertJUnit.assertEquals(factory._processedMsgIds.size(),3);
+    AssertJUnit.assertTrue(msgList.get(0).getRecord().getSimpleField("Cancelcount").equals("2"));
+    AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1"));
+    AssertJUnit.assertEquals(factory._timedOutMsgIds.size(),2);
+    AssertJUnit.assertTrue(executor._taskMap.size() == 0);
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
new file mode 100644
index 0000000..bf82c26
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -0,0 +1,51 @@
+package org.apache.helix.messaging.handling;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixManager;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase
+{
+  @Test
+  public void TestThreadPoolSizeConfig()
+  {
+    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+    HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+    ConfigAccessor accessor = manager.getConfigAccessor();
+    ConfigScope scope =
+        new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
+    accessor.set(scope, HelixTaskExecutor.MAX_THREADS, ""+12);
+    
+    _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
+    
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+    
+    long taskcount = 0; 
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      
+      DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
+      HelixTaskExecutor helixExecutor = svc.getExecutor();
+      ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get(MessageType.STATE_TRANSITION + "." + "NextDB"));
+      Assert.assertEquals(12, executor.getMaximumPoolSize());
+      taskcount += executor.getCompletedTaskCount();
+      Assert.assertTrue(executor.getCompletedTaskCount() > 0);
+    }
+    Assert.assertEquals(taskcount, 64 * 4);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/consumer/ConsumerAdapter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/consumer/ConsumerAdapter.java b/helix-core/src/test/java/org/apache/helix/mock/consumer/ConsumerAdapter.java
new file mode 100644
index 0000000..8e8b1fc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/consumer/ConsumerAdapter.java
@@ -0,0 +1,134 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.consumer;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.log4j.Logger;
+
+
+public class ConsumerAdapter implements ExternalViewChangeListener
+{
+
+  HelixManager relayHelixManager;
+  DataAccessor relayClusterClient;
+  private final ConcurrentHashMap<String, RelayConsumer> relayConsumers;
+  private final ConcurrentHashMap<String, RelayConfig> relayConfigs;
+  private static Logger logger = Logger.getLogger(ConsumerAdapter.class);
+
+  public ConsumerAdapter(String instanceName, String zkServer,
+      String clusterName) throws Exception
+  {
+    relayConsumers = new ConcurrentHashMap<String, RelayConsumer>();
+    relayConfigs = new ConcurrentHashMap<String, RelayConfig>();
+
+//    relayClusterManager = ClusterManagerFactory.getZKBasedManagerForSpectator(
+//        clusterName, zkServer);
+    relayHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                                    null,
+                                                                    InstanceType.SPECTATOR,
+                                                                    zkServer);
+
+    relayHelixManager.connect();
+    relayHelixManager.addExternalViewChangeListener(this);
+
+  }
+
+  private RelayConfig getRelayConfig(ZNRecord externalView, String partition)
+  {
+    LinkedHashMap<String, String> relayList = (LinkedHashMap<String, String>) externalView
+        .getMapField(partition);
+
+    if (relayList == null)
+      return null;
+
+    return new RelayConfig(relayList);
+
+  }
+
+  @Override
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+      NotificationContext changeContext)
+  {
+    logger.info("onExternalViewChange invoked");
+
+    for (ExternalView subview : externalViewList)
+    {
+      Map<String, Map<String, String>> partitions = subview.getRecord().getMapFields();
+
+      for (Entry<String, Map<String, String>> partitionConsumer : partitions
+          .entrySet())
+      {
+        String partition = partitionConsumer.getKey();
+        Map<String, String> relayList = partitionConsumer.getValue();
+        RelayConfig relayConfig = new RelayConfig(relayList);
+        relayConfigs.put(partition, relayConfig);
+        RelayConsumer consumer = relayConsumers.get(partition);
+        if (consumer != null)
+        {
+          consumer.setConfig(relayConfig);
+        }
+      }
+    }
+  }
+
+  public void start()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  public RelayConsumer getNewRelayConsumer(String dbName, String partition)
+      throws Exception
+  {
+    RelayConsumer consumer = new RelayConsumer(null, partition);
+
+    if (relayConsumers.putIfAbsent(partition, consumer) != null)
+    {
+      throw new Exception("Existing consumer");
+    }
+    logger.info("created new consumer for partition" + partition);
+
+    RelayConfig relayConfig = relayConfigs.get(partition);
+    if (relayConfig != null)
+    {
+      consumer.setConfig(relayConfig);
+    }
+
+    return consumer;
+  }
+
+  public void removeConsumer(String partition) throws Exception
+  {
+    if (relayConsumers.remove(partition) == null)
+    {
+      throw new Exception("Non Existing consumer for partition " + partition);
+    }
+    logger.info("Removed consumer for partition " + partition);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConfig.java b/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConfig.java
new file mode 100644
index 0000000..f156ee7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConfig.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.consumer;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+public class RelayConfig
+{
+  String partitionId;
+  private ArrayList<String> masterRelays;
+  private ArrayList<String> slaveRelays;
+
+  public RelayConfig(Map<String, String> relayList)
+  {
+
+    masterRelays = new ArrayList<String>();
+    slaveRelays = new ArrayList<String>();
+
+    for (Entry<String, String> entry : relayList.entrySet())
+    {
+      String relayInstance = entry.getKey();
+      String relayState = entry.getValue();
+
+      if (relayState.equals("MASTER"))
+      {
+        masterRelays.add(relayInstance);
+      } else if (relayState.equals("SLAVE"))
+      {
+        slaveRelays.add(relayInstance);
+      }
+    }
+  }
+
+  public String getClusterName()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public String getzkServer()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public String getMaster()
+  {
+    if (masterRelays.isEmpty())
+      return null;
+
+    return masterRelays.get(0);
+  }
+
+  public List<String> getRelays()
+  {
+    List<String> relays = new ArrayList<String>();
+    relays.addAll(masterRelays);
+    relays.addAll(slaveRelays);
+
+    return relays;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConsumer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConsumer.java b/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConsumer.java
new file mode 100644
index 0000000..c4ea610
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConsumer.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.consumer;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.log4j.Logger;
+
+
+public class RelayConsumer
+{
+  HelixManager relayHelixManager;
+  DataAccessor relayClusterClient;
+  private final String partition;
+  private RelayConfig currentRelay;
+  private static Logger logger = Logger.getLogger(RelayConsumer.class);
+
+  public RelayConsumer(RelayConfig relayConfig, String partition)
+  {
+    this.partition = partition;
+    this.currentRelay = relayConfig;
+  }
+
+  public void stop()
+  {
+    if (currentRelay != null)
+    {
+      logger.info("RelayConsumer stopping listening from relay "
+          + currentRelay.getMaster());
+    }
+  }
+
+  public boolean isPointingTo(RelayConfig relayConfig)
+  {
+    return false;
+  }
+
+  public void start()
+  {
+    if (currentRelay != null)
+    {
+      logger.info("RelayConsumer starting listening from relay "
+          + currentRelay.getMaster());
+    }
+  }
+
+  /*
+   * This is required at relayConsumer to reach out relays which are hosting
+   * data for slaved partitions.
+   */
+  void getRelaysForPartition(Integer partitionId)
+  {
+
+  }
+
+  public long getHwm()
+  {
+    // TODO this is supposed to return the last checkpoint from this
+    // consumer
+    return 0;
+  }
+
+  public String getPartition()
+  {
+    // TODO Auto-generated method stub
+    return partition;
+  }
+
+  public Object getCurrentRelay()
+  {
+    // TODO Auto-generated method stub
+    return currentRelay;
+  }
+
+  public synchronized void setConfig(RelayConfig relayConfig)
+  {
+    // TODO Auto-generated method stub
+    currentRelay = relayConfig;
+    logger.info("Setting config to relay " + relayConfig.getMaster());
+  }
+
+  public void flush()
+  {
+    assert (currentRelay != null);
+    logger.info("Consumer flushing for partition " + partition);
+    if (currentRelay == null || currentRelay.getRelays() == null)
+      return;
+
+    for (String relay : currentRelay.getRelays())
+    {
+      logger.info("Reading (flush) from relay " + relay);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
new file mode 100644
index 0000000..a06f86a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
@@ -0,0 +1,152 @@
+package org.apache.helix.mock.controller;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.participant.DistClusterControllerStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class ClusterController extends Thread
+{
+  private static Logger        LOG                      =
+                                                            Logger.getLogger(ClusterController.class);
+
+  private final CountDownLatch _startCountDown          = new CountDownLatch(1);
+  private final CountDownLatch _stopCountDown           = new CountDownLatch(1);
+  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+  private final String         _controllerMode;
+  private final String         _zkAddr;
+
+  private HelixManager   _manager;
+
+  public ClusterController(String clusterName, String controllerName, String zkAddr) throws Exception
+  {
+    this(clusterName, controllerName, zkAddr, HelixControllerMain.STANDALONE.toString());
+  }
+
+  public ClusterController(String clusterName,
+                           String controllerName,
+                           String zkAddr,
+                           String controllerMode) throws Exception
+  {
+    _controllerMode = controllerMode;
+    _zkAddr = zkAddr;
+
+    if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
+    {
+      _manager =
+          HelixManagerFactory.getZKHelixManager(clusterName,
+                                                controllerName,
+                                                InstanceType.CONTROLLER,
+                                                zkAddr);
+    }
+    else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
+    {
+      _manager =
+          HelixManagerFactory.getZKHelixManager(clusterName,
+                                                controllerName,
+                                                InstanceType.CONTROLLER_PARTICIPANT,
+                                                zkAddr);
+
+    }
+    else
+    {
+      throw new IllegalArgumentException("Controller mode: " + controllerMode
+          + " NOT recoginized");
+    }
+  }
+
+  public HelixManager getManager()
+  {
+    return _manager;
+  }
+
+  public void syncStop()
+  {
+    if (_manager == null)
+    {
+      LOG.warn("manager already stopped");
+      return;
+    }
+
+    _stopCountDown.countDown();
+    try
+    {
+      _waitStopFinishCountDown.await();
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  public void syncStart()
+  {
+    // TODO: prevent start multiple times
+    
+    super.start();
+    try
+    {
+      _startCountDown.await();
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void run()
+  {
+    try
+    {
+      try
+      {
+        if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
+        {
+          _manager.connect();
+        }
+        else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
+        {
+          DistClusterControllerStateModelFactory stateModelFactory =
+              new DistClusterControllerStateModelFactory(_zkAddr);
+
+          StateMachineEngine stateMach = _manager.getStateMachineEngine();
+          stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
+          _manager.connect();
+        }
+      }
+      catch (Exception e)
+      {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      finally
+      {
+        _startCountDown.countDown();
+        _stopCountDown.await();
+      }
+    }
+    catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    finally
+    {
+      synchronized (_manager)
+      {
+        _manager.disconnect();
+        _manager = null;
+      }
+      _waitStopFinishCountDown.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
new file mode 100644
index 0000000..6f087e6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
@@ -0,0 +1,143 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.controller;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.util.HelixUtil;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+public class MockController
+{
+  private final ZkClient client;
+  private final String srcName;
+  private final String clusterName;
+
+  public MockController(String src, String zkServer, String cluster)
+  {
+    srcName = src;
+    clusterName = cluster;
+    client = new ZkClient(zkServer);
+    client.setZkSerializer(new ZNRecordSerializer());
+  }
+
+  void sendMessage(String msgId, String instanceName, String fromState,
+      String toState, String partitionKey, int partitionId)
+      throws InterruptedException, JsonGenerationException,
+      JsonMappingException, IOException
+  {
+    Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+    message.setMsgId(msgId);
+    message.setSrcName(srcName);
+    message.setTgtName(instanceName);
+    message.setMsgState(MessageState.NEW);
+    message.setFromState(fromState);
+    message.setToState(toState);
+    // message.setPartitionId(partitionId);
+    message.setPartitionName(partitionKey);
+
+    String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/"
+        + message.getId();
+    ObjectMapper mapper = new ObjectMapper();
+    StringWriter sw = new StringWriter();
+    mapper.writeValueUsingView(sw, message, Message.class);
+    System.out.println(sw.toString());
+    client.delete(path);
+
+    Thread.sleep(10000);
+    ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName,
+        instanceName));
+    message.setTgtSessionId(record.getSimpleField(
+        LiveInstanceProperty.SESSION_ID.toString()).toString());
+    client.createPersistent(path, message);
+  }
+
+  public void createExternalView(List<String> instanceNames, int partitions,
+      int replicas, String dbName, long randomSeed)
+  {
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(client));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ExternalView externalView = new ExternalView(computeRoutingTable(instanceNames, partitions,
+                                                                     replicas, dbName, randomSeed));
+
+    accessor.setProperty(keyBuilder.externalView(dbName), externalView);
+  }
+
+  public ZNRecord computeRoutingTable(List<String> instanceNames,
+      int partitions, int replicas, String dbName, long randomSeed)
+  {
+    assert (instanceNames.size() > replicas);
+    Collections.sort(instanceNames);
+
+    ZNRecord result = new ZNRecord(dbName);
+
+    Map<String, Object> externalView = new TreeMap<String, Object>();
+
+    List<Integer> partitionList = new ArrayList<Integer>(partitions);
+    for (int i = 0; i < partitions; i++)
+    {
+      partitionList.add(new Integer(i));
+    }
+    Random rand = new Random(randomSeed);
+    // Shuffle the partition list
+    Collections.shuffle(partitionList, rand);
+
+    for (int i = 0; i < partitionList.size(); i++)
+    {
+      int partitionId = partitionList.get(i);
+      Map<String, String> partitionAssignment = new TreeMap<String, String>();
+      int masterNode = i % instanceNames.size();
+      // the first in the list is the node that contains the master
+      partitionAssignment.put(instanceNames.get(masterNode), "MASTER");
+
+      // for the jth replica, we put it on (masterNode + j) % nodes-th
+      // node
+      for (int j = 1; j <= replicas; j++)
+      {
+        partitionAssignment
+            .put(instanceNames.get((masterNode + j) % instanceNames.size()),
+                "SLAVE");
+      }
+      String partitionName = dbName + ".partition-" + partitionId;
+      result.setMapField(partitionName, partitionAssignment);
+    }
+    result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "" + partitions);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java b/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
new file mode 100644
index 0000000..480f50a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.controller;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+
+public class MockControllerProcess
+{
+
+  /**
+   * @param args
+   * @throws IOException
+   * @throws JsonMappingException
+   * @throws JsonGenerationException
+   * @throws InterruptedException
+   */
+  public static void main(String[] args) throws JsonGenerationException,
+      JsonMappingException, InterruptedException, IOException
+  {
+
+    MockController storageController = new MockController("cm-instance-0",
+        "localhost:2181", "storage-cluster");
+    MockController relayController = new MockController("cm-instance-0",
+        "localhost:2181", "relay-cluster");
+
+    ArrayList<String> instanceNames = new ArrayList<String>();
+    instanceNames.add("relay0");
+    instanceNames.add("relay1");
+    instanceNames.add("relay2");
+    instanceNames.add("relay3");
+    instanceNames.add("relay4");
+
+    relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 0);
+
+    // Messages to initiate offline->slave->master->slave transitions
+
+    storageController.sendMessage("TestMessageId1", "localhost_8900",
+        "Offline", "Slave", "EspressoDB.partition-0", 0);
+    Thread.sleep(10000);
+    storageController.sendMessage("TestMessageId2", "localhost_8900", "Slave",
+        "Master", "EspressoDB.partition-0", 0);
+    Thread.sleep(10000);
+    storageController.sendMessage("TestMessageId3", "localhost_8900", "Master",
+        "Slave", "EspressoDB.partition-0", 0);
+    Thread.sleep(10000);
+
+    // Change the external view to trigger the consumer to listen from
+    // another relay
+    relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 10);
+
+    storageController.sendMessage("TestMessageId4", "localhost_8900", "Slave",
+        "Offline", "EspressoDB.partition-0", 0);
+    Thread.sleep(10000);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/relay/DummyRelayProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/relay/DummyRelayProcess.java b/helix-core/src/test/java/org/apache/helix/mock/relay/DummyRelayProcess.java
new file mode 100644
index 0000000..38465f3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/relay/DummyRelayProcess.java
@@ -0,0 +1,248 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.relay;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+
+
+public class DummyRelayProcess
+{
+
+  public static final String zkServer = "zkSvr";
+  public static final String cluster = "cluster";
+  public static final String hostAddress = "host";
+  public static final String hostPort = "port";
+  public static final String relayCluster = "relayCluster";
+  public static final String help = "help";
+  public static final String configFile = "configFile";
+
+  private final String zkConnectString;
+  private final String clusterName;
+  private final String instanceName;
+  private HelixManager manager;
+  private DummyStateModelFactory stateModelFactory;
+  private HelixStateMachineEngine genericStateMachineHandler;
+
+  private final String _clusterViewFile;
+
+  public DummyRelayProcess(String zkConnectString, String clusterName,
+      String instanceName, String clusterViewFile)
+  {
+    this.zkConnectString = zkConnectString;
+    this.clusterName = clusterName;
+    this.instanceName = instanceName;
+    this._clusterViewFile = clusterViewFile;
+  }
+
+  public void start() throws Exception
+  {
+    if (_clusterViewFile == null)
+    {
+      manager = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                          instanceName,
+                                                          InstanceType.PARTICIPANT,
+                                                          zkConnectString);
+    }
+    else
+    {
+      manager = HelixManagerFactory.getStaticFileHelixManager(clusterName,
+                                                                  instanceName,
+                                                                  InstanceType.PARTICIPANT,
+                                                                  _clusterViewFile);
+
+    }
+
+    stateModelFactory = new DummyStateModelFactory();
+    genericStateMachineHandler.registerStateModelFactory("OnlineOffline", stateModelFactory);
+    manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
+    manager.connect();
+    if (_clusterViewFile != null)
+    {
+      ClusterStateVerifier.verifyFileBasedClusterStates(_clusterViewFile, instanceName,
+          stateModelFactory);
+
+    }
+  }
+
+  public static class DummyStateModelFactory extends StateModelFactory<StateModel>
+  {
+    @Override
+    public StateModel createNewStateModel(String stateUnitKey)
+    {
+      System.out.println("Creating state model for "+ stateUnitKey);
+      return new DummyStateModel();
+    }
+
+  }
+
+  public static class DummyStateModel extends StateModel
+  {
+    public void onBecomeOnlineFromOffline(Message message,
+        NotificationContext context)
+    {
+
+      System.out.println("DummyStateModel.onBecomeSlaveFromOffline()");
+    }
+
+    public void onBecomeOfflineFromOnline(Message message,
+        NotificationContext context)
+    {
+      System.out.println("DummyStateModel.onBecomeOfflineFromSlave()");
+
+    }
+  }
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions()
+  {
+    Option helpOption = OptionBuilder.withLongOpt(help)
+        .withDescription("Prints command-line options info").create();
+
+    Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+        .withDescription("Provide zookeeper address").create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+    Option clusterOption = OptionBuilder.withLongOpt(cluster)
+        .withDescription("Provide cluster name").create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+        .withDescription("Provide host name").create();
+    hostOption.setArgs(1);
+    hostOption.setRequired(true);
+    hostOption.setArgName("Host name (Required)");
+
+    Option portOption = OptionBuilder.withLongOpt(hostPort)
+        .withDescription("Provide host port").create();
+    portOption.setArgs(1);
+    portOption.setRequired(true);
+    portOption.setArgName("Host port (Required)");
+
+    // add an option group including either --zkSvr or --configFile
+    Option fileOption = OptionBuilder.withLongOpt(configFile)
+        .withDescription("Provide file to read states/messages").create();
+    fileOption.setArgs(1);
+    fileOption.setRequired(true);
+    fileOption.setArgName("File to read states/messages (Optional)");
+
+    OptionGroup optionGroup = new OptionGroup();
+    optionGroup.addOption(zkServerOption);
+    optionGroup.addOption(fileOption);
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    // options.addOption(zkServerOption);
+    options.addOption(clusterOption);
+    options.addOption(hostOption);
+    options.addOption(portOption);
+
+    options.addOptionGroup(optionGroup);
+
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs)
+      throws Exception
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    CommandLine cmd = null;
+
+    try
+    {
+      return cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe)
+    {
+      System.err
+          .println("CommandLineClient: failed to parse command-line options: "
+              + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    String zkConnectString = "pganti-md:2181";
+    String clusterName = "SDR_RELAY";
+    String instanceName = "ela4-rly02.prod.linkedin.com_10015";
+    String file = null;
+
+    if (args.length > 0)
+    {
+      CommandLine cmd = processCommandLineArgs(args);
+      zkConnectString = cmd.getOptionValue(zkServer);
+      clusterName = cmd.getOptionValue(cluster);
+
+      String host = cmd.getOptionValue(hostAddress);
+      String portString = cmd.getOptionValue(hostPort);
+      int port = Integer.parseInt(portString);
+      instanceName = host + "_" + port;
+
+      file = cmd.getOptionValue(configFile);
+      if (file != null)
+      {
+        File f = new File(file);
+        if (!f.exists())
+        {
+          System.err.println("static config file doesn't exist");
+          System.exit(1);
+        }
+      }
+
+    }
+    // Espresso_driver.py will consume this
+    System.out.println("Dummy process started");
+
+    DummyRelayProcess process = new DummyRelayProcess(zkConnectString,
+        clusterName, instanceName, file);
+
+    process.start();
+    Thread.currentThread().join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/relay/RelayIdealStateGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/relay/RelayIdealStateGenerator.java b/helix-core/src/test/java/org/apache/helix/mock/relay/RelayIdealStateGenerator.java
new file mode 100644
index 0000000..db326d6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/relay/RelayIdealStateGenerator.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.relay;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+public class RelayIdealStateGenerator
+{
+  public static void main(String[] args)
+  {
+    ZNRecord record = new ZNRecord("SdrRelay");
+    record.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "28");
+    for (int i = 22; i < 28; i++)
+    {
+      String key = "ela4-db-sdr.prod.linkedin.com_1521,sdr1,sdr_people_search_,p"
+          + i + ",MASTER";
+      Map<String, String> map = new HashMap<String, String>();
+      for (int j = 0; j < 4; j++)
+      {
+        String instanceName = "ela4-rly0" + j + ".prod.linkedin.com_10015";
+        map.put(instanceName, "ONLINE");
+      }
+      record.getMapFields().put(key, map);
+    }
+
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    System.out.println(new String(serializer.serialize(record)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/router/MockRouterProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/router/MockRouterProcess.java b/helix-core/src/test/java/org/apache/helix/mock/router/MockRouterProcess.java
new file mode 100644
index 0000000..d2916e6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/router/MockRouterProcess.java
@@ -0,0 +1,158 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.router;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.HelixUtil;
+
+
+/**
+ * A MockRouter process to demonstrate the integration with cluster manager.
+ * This uses Zookeeper in local mode and runs at port 2188
+ *
+ * @author kgopalak
+ *
+ */
+public class MockRouterProcess
+{
+  private static final int port = 2188;
+  static long runId = System.currentTimeMillis();
+  private static final String dataDir = "/tmp/zkDataDir-" + runId;
+
+  private static final String logDir = "/tmp/zkLogDir-" + runId;
+
+  static String clusterName = "mock-cluster-" + runId;
+
+  static String zkConnectString = "localhost:2188";
+
+  private final RoutingTableProvider _routingTableProvider;
+  private static ZkServer zkServer;
+
+  public MockRouterProcess()
+  {
+    _routingTableProvider = new RoutingTableProvider();
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    setup();
+    zkServer.getZkClient().setZkSerializer(new ZNRecordSerializer());
+    ZNRecord record = zkServer.getZkClient().readData(
+        HelixUtil.getIdealStatePath(clusterName, "TestDB"));
+
+    String externalViewPath = HelixUtil.getExternalViewPath(clusterName, "TestDB");
+
+    MockRouterProcess process = new MockRouterProcess();
+    process.start();
+    //try to route, there is no master or slave available
+    process.routeRequest("TestDB", "TestDB_1");
+
+    //update the externalview on zookeeper
+    zkServer.getZkClient().createPersistent(externalViewPath,record);
+    //sleep for sometime so that the ZK Callback is received.
+    Thread.sleep(1000);
+    process.routeRequest("TestDB", "TestDB_1");
+    System.exit(1);
+  }
+
+  private static void setup()
+  {
+
+    IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+    {
+      @Override
+      public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient client)
+      {
+        client.deleteRecursive("/" + clusterName);
+
+      }
+    };
+
+    zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+    zkServer.start();
+    ClusterSetup clusterSetup = new ClusterSetup(zkConnectString);
+    clusterSetup.setupTestCluster(clusterName);
+    try
+    {
+      Thread.sleep(1000);
+    } catch (InterruptedException e)
+    {
+      e.printStackTrace();
+    }
+  }
+
+  public void routeRequest(String database, String partition)
+  {
+    List<InstanceConfig> masters;
+    List<InstanceConfig> slaves;
+    masters = _routingTableProvider.getInstances(database, partition, "MASTER");
+    if (masters != null && !masters.isEmpty())
+    {
+      System.out.println("Available masters to route request");
+      for (InstanceConfig config : masters)
+      {
+        System.out.println("HostName:" + config.getHostName() + " Port:"
+            + config.getPort());
+      }
+    } else
+    {
+      System.out.println("No masters available to route request");
+    }
+    slaves = _routingTableProvider.getInstances(database, partition, "SLAVE");
+    if (slaves != null && !slaves.isEmpty())
+    {
+      System.out.println("Available slaves to route request");
+      for (InstanceConfig config : slaves)
+      {
+        System.out.println("HostName:" + config.getHostName() + " Port:"
+            + config.getPort());
+      }
+    } else
+    {
+      System.out.println("No slaves available to route request");
+    }
+  }
+
+  public void start()
+  {
+
+    try
+    {
+      HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                                         null,
+                                                                         InstanceType.SPECTATOR,
+                                                                         zkConnectString);
+
+
+      manager.connect();
+      manager.addExternalViewChangeListener(_routingTableProvider);
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/router/RouterAdapter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/router/RouterAdapter.java b/helix-core/src/test/java/org/apache/helix/mock/router/RouterAdapter.java
new file mode 100644
index 0000000..f9bccaa
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/router/RouterAdapter.java
@@ -0,0 +1,21 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.router;
+
+public class RouterAdapter
+{
+
+}