You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC
[38/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
new file mode 100644
index 0000000..f03d44e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
@@ -0,0 +1,243 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.Mocks;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestDefaultMessagingService
+{
+ class MockHelixManager extends Mocks.MockManager
+ {
+ class MockDataAccessor extends Mocks.MockAccessor
+ {
+
+ @Override
+ public <T extends HelixProperty> T getProperty(PropertyKey key)
+ {
+
+ PropertyType type = key.getType();
+ if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
+ {
+ return (T) new ExternalView(_externalView);
+ }
+ return null;
+ }
+
+ @Override
+ public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
+ {
+ PropertyType type = key.getType();
+ List<T> result = new ArrayList<T>();
+ Class<? extends HelixProperty> clazz = key.getTypeClass();
+ if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
+ {
+ HelixProperty typedInstance = HelixProperty.convertToTypedInstance(clazz, _externalView);
+ result.add((T) typedInstance);
+ return result;
+ }
+ else if(type == PropertyType.LIVEINSTANCES)
+ {
+ return (List<T>) HelixProperty.convertToTypedList(clazz, _liveInstances);
+ }
+
+ return result;
+ }
+ }
+
+ HelixDataAccessor _accessor = new MockDataAccessor();
+ ZNRecord _externalView;
+ List<String> _instances;
+ List<ZNRecord> _liveInstances;
+ String _db = "DB";
+ int _replicas = 3;
+ int _partitions = 50;
+
+ public MockHelixManager()
+ {
+ _liveInstances = new ArrayList<ZNRecord>();
+ _instances = new ArrayList<String>();
+ for(int i = 0;i<5; i++)
+ {
+ String instance = "localhost_"+(12918+i);
+ _instances.add(instance);
+ ZNRecord metaData = new ZNRecord(instance);
+ metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
+ UUID.randomUUID().toString());
+ _liveInstances.add(metaData);
+ }
+ _externalView = IdealStateCalculatorForStorageNode.calculateIdealState(
+ _instances, _partitions, _replicas, _db, "MASTER", "SLAVE");
+
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ return true;
+ }
+
+ @Override
+ public HelixDataAccessor getHelixDataAccessor()
+ {
+ return _accessor;
+ }
+
+
+ @Override
+ public String getInstanceName()
+ {
+ return "localhost_12919";
+ }
+
+ @Override
+ public InstanceType getInstanceType()
+ {
+ return InstanceType.PARTICIPANT;
+ }
+ }
+
+ class TestMessageHandlerFactory implements MessageHandlerFactory
+ {
+ class TestMessageHandler extends MessageHandler
+ {
+
+ public TestMessageHandler(Message message, NotificationContext context)
+ {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError( Exception e, ErrorCode code, ErrorType type)
+ {
+ // TODO Auto-generated method stub
+
+ }
+ }
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ // TODO Auto-generated method stub
+ return new TestMessageHandler(message, context);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ // TODO Auto-generated method stub
+ return "TestingMessageHandler";
+ }
+
+ @Override
+ public void reset()
+ {
+ // TODO Auto-generated method stub
+
+ }
+ }
+
+ @Test()
+ public void TestMessageSend()
+ {
+ HelixManager manager = new MockHelixManager();
+ DefaultMessagingService svc = new DefaultMessagingService(manager);
+ TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+ svc.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setInstanceName("localhost_12919");
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setSelfExcluded(true);
+
+ Message template = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ AssertJUnit.assertEquals(0, svc.send(recipientCriteria, template));
+
+ recipientCriteria.setSelfExcluded(false);
+ AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
+
+
+ recipientCriteria.setSelfExcluded(false);
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setResource("DB");
+ recipientCriteria.setPartition("%");
+ AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
+
+ recipientCriteria.setSelfExcluded(true);
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setResource("DB");
+ recipientCriteria.setPartition("%");
+ AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
+
+ recipientCriteria.setSelfExcluded(true);
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setResource("DB");
+ recipientCriteria.setPartition("%");
+ AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
+
+ recipientCriteria.setSelfExcluded(true);
+ recipientCriteria.setInstanceName("localhost_12920");
+ recipientCriteria.setResource("DB");
+ recipientCriteria.setPartition("%");
+ AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
+
+
+ recipientCriteria.setSelfExcluded(true);
+ recipientCriteria.setInstanceName("localhost_12920");
+ recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
+ recipientCriteria.setResource("DB");
+ recipientCriteria.setPartition("%");
+ AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
new file mode 100644
index 0000000..cdd72de
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -0,0 +1,124 @@
+package org.apache.helix.messaging.handling;
+
+import java.util.HashSet;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.TestMessagingService.TestMessagingHandlerFactory.TestMessagingHandler;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+import org.apache.helix.model.Message;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase
+{
+ public static class TestMessagingHandlerFactory implements MessageHandlerFactory
+ {
+ public static HashSet<String> _processedMsgIds = new HashSet<String>();
+
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ return null;
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return "TestMsg";
+ }
+
+ @Override
+ public void reset()
+ {
+ // TODO Auto-generated method stub
+ }
+
+ }
+
+ public static class TestMessagingHandlerFactory2 implements MessageHandlerFactory
+ {
+ public static HashSet<String> _processedMsgIds = new HashSet<String>();
+
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ return null;
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return "TestMsg2";
+ }
+
+ @Override
+ public void reset()
+ {
+ // TODO Auto-generated method stub
+ }
+
+ }
+ @Test
+ public void TestThreadPoolSizeConfig()
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+ HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+
+ ConfigAccessor accessor = manager.getConfigAccessor();
+ ConfigScope scope =
+ new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(instanceName).build();
+ accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+12);
+
+ scope =
+ new ConfigScopeBuilder().forCluster(manager.getClusterName()).build();
+ accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+8);
+
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+
+ _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg", new TestMessagingHandlerFactory());
+ _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg2", new TestMessagingHandlerFactory2());
+
+
+ }
+
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+
+ DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
+ HelixTaskExecutor helixExecutor = svc.getExecutor();
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg"));
+
+ ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg2"));
+ if(i != 0)
+ {
+
+ Assert.assertEquals(8, executor.getMaximumPoolSize());
+ }
+ else
+ {
+ Assert.assertEquals(12, executor.getMaximumPoolSize());
+ }
+ Assert.assertEquals(HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, executor2.getMaximumPoolSize());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
new file mode 100644
index 0000000..aa4ecc8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -0,0 +1,588 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.Mocks;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestHelixTaskExecutor
+{
+ public static class MockClusterManager extends Mocks.MockManager
+ {
+ @Override
+ public String getSessionId()
+ {
+ return "123";
+ }
+ }
+
+ class TestMessageHandlerFactory implements MessageHandlerFactory
+ {
+ int _handlersCreated = 0;
+ ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
+ class TestMessageHandler extends MessageHandler
+ {
+ public TestMessageHandler(Message message, NotificationContext context)
+ {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ HelixTaskResult result = new HelixTaskResult();
+ _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+ Thread.currentThread().sleep(100);
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type)
+ {
+ // TODO Auto-generated method stub
+
+ }
+ }
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ // TODO Auto-generated method stub
+ if(message.getMsgSubType()!= null && message.getMsgSubType().equals("EXCEPTION"))
+ {
+ throw new HelixException("Test Message handler exception, can ignore");
+ }
+ _handlersCreated++;
+ return new TestMessageHandler(message, context);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ // TODO Auto-generated method stub
+ return "TestingMessageHandler";
+ }
+
+ @Override
+ public void reset()
+ {
+ // TODO Auto-generated method stub
+
+ }
+ }
+
+ class TestMessageHandlerFactory2 extends TestMessageHandlerFactory
+ {
+ @Override
+ public String getMessageType()
+ {
+ // TODO Auto-generated method stub
+ return "TestingMessageHandler2";
+ }
+
+ }
+
+ class CancellableHandlerFactory implements MessageHandlerFactory
+ {
+
+ int _handlersCreated = 0;
+ ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
+ ConcurrentHashMap<String, String> _processingMsgIds = new ConcurrentHashMap<String, String>();
+ ConcurrentHashMap<String, String> _timedOutMsgIds = new ConcurrentHashMap<String, String>();
+ class CancellableHandler extends MessageHandler
+ {
+ public CancellableHandler(Message message, NotificationContext context)
+ {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+ public boolean _interrupted = false;
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ HelixTaskResult result = new HelixTaskResult();
+ int sleepTimes = 15;
+ if(_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
+ {
+ sleepTimes = 10;
+ }
+ _processingMsgIds.put(_message.getMsgId(), _message.getMsgId());
+ try
+ {
+ for (int i = 0; i < sleepTimes; i++)
+ {
+ Thread.sleep(100);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _interrupted = true;
+ _timedOutMsgIds.put(_message.getMsgId(), "");
+ result.setInterrupted(true);
+ if(!_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
+ {
+ _message.getRecord().setSimpleField("Cancelcount", "1");
+ }
+ else
+ {
+ int c = Integer.parseInt( _message.getRecord().getSimpleField("Cancelcount"));
+ _message.getRecord().setSimpleField("Cancelcount", ""+(c + 1));
+ }
+ throw e;
+ }
+ _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+ result.setSuccess(true);
+ return result;
+ }
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type)
+ {
+ // TODO Auto-generated method stub
+ _message.getRecord().setSimpleField("exception", e.getMessage());
+ }
+ }
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ // TODO Auto-generated method stub
+ _handlersCreated++;
+ return new CancellableHandler(message, context);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ // TODO Auto-generated method stub
+ return "Cancellable";
+ }
+
+ @Override
+ public void reset()
+ {
+ // TODO Auto-generated method stub
+ _handlersCreated = 0;
+ _processedMsgIds.clear();
+ _processingMsgIds.clear();
+ _timedOutMsgIds.clear();
+ }
+ }
+
+ @Test ()
+ public void testNormalMsgExecution() throws InterruptedException
+ {
+ System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+
+ TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+ executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
+ executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+ List<Message> msgList = new ArrayList<Message>();
+
+ int nMsgs1 = 5;
+ for(int i = 0; i < nMsgs1; i++)
+ {
+ Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setCorrelationId(UUID.randomUUID().toString());
+ msgList.add(msg);
+ }
+
+
+ int nMsgs2 = 6;
+ for(int i = 0; i < nMsgs2; i++)
+ {
+ Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setCorrelationId(UUID.randomUUID().toString());
+ msgList.add(msg);
+ }
+ executor.onMessage("someInstance", msgList, changeContext);
+
+ Thread.sleep(1000);
+
+ AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+ AssertJUnit.assertTrue(factory2._processedMsgIds.size() == nMsgs2);
+ AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
+ AssertJUnit.assertTrue(factory2._handlersCreated == nMsgs2);
+
+ for(Message record : msgList)
+ {
+ AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(record.getId()) || factory2._processedMsgIds.containsKey(record.getId()));
+ AssertJUnit.assertFalse(factory._processedMsgIds.containsKey(record.getId()) && factory2._processedMsgIds.containsKey(record.getId()));
+
+ }
+ System.out.println("END TestCMTaskExecutor.testNormalMsgExecution()");
+ }
+
+ @Test ()
+ public void testUnknownTypeMsgExecution() throws InterruptedException
+ {
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+
+ TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+ executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
+
+ NotificationContext changeContext = new NotificationContext(manager);
+ List<Message> msgList = new ArrayList<Message>();
+
+ int nMsgs1 = 5;
+ for(int i = 0; i < nMsgs1; i++)
+ {
+ Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msgList.add(msg);
+ }
+
+
+ int nMsgs2 = 4;
+ for(int i = 0; i < nMsgs2; i++)
+ {
+ Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msgList.add(msg);
+ }
+ executor.onMessage("someInstance", msgList, changeContext);
+
+ Thread.sleep(1000);
+
+ AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+ AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
+ AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
+ AssertJUnit.assertTrue(factory2._handlersCreated == 0);
+
+ for(Message message : msgList)
+ {
+ if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
+ {
+ AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
+ }
+ }
+ }
+
+
+ @Test ()
+ public void testMsgSessionId() throws InterruptedException
+ {
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+
+ TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+ executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
+ executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+ List<Message> msgList = new ArrayList<Message>();
+
+ int nMsgs1 = 5;
+ for(int i = 0; i < nMsgs1; i++)
+ {
+ Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msg.setTgtName("");
+ msgList.add(msg);
+ }
+
+
+ int nMsgs2 = 4;
+ for(int i = 0; i < nMsgs2; i++)
+ {
+ Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("some other session id");
+ msg.setTgtName("");
+ msgList.add(msg);
+ }
+ executor.onMessage("someInstance", msgList, changeContext);
+
+ Thread.sleep(1000);
+
+ AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+ AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
+ AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
+ AssertJUnit.assertTrue(factory2._handlersCreated == 0);
+
+ for(Message message : msgList)
+ {
+ if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
+ {
+ AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
+ }
+ }
+ }
+
+ @Test()
+ public void testCreateHandlerException() throws InterruptedException
+ {
+ System.out.println("START TestCMTaskExecutor.testCreateHandlerException()");
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+
+ TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+ executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+
+ NotificationContext changeContext = new NotificationContext(manager);
+ List<Message> msgList = new ArrayList<Message>();
+
+ int nMsgs1 = 5;
+ for(int i = 0; i < nMsgs1; i++)
+ {
+ Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setCorrelationId(UUID.randomUUID().toString());
+ msgList.add(msg);
+ }
+ Message exceptionMsg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ exceptionMsg.setTgtSessionId(manager.getSessionId());
+ exceptionMsg.setMsgSubType("EXCEPTION");
+ exceptionMsg.setTgtName("Localhost_1123");
+ exceptionMsg.setSrcName("127.101.1.23_2234");
+ exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
+ msgList.add(exceptionMsg);
+
+ executor.onMessage("someInstance", msgList, changeContext);
+
+ Thread.sleep(1000);
+
+ AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+ AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
+
+ AssertJUnit.assertTrue(exceptionMsg.getMsgState() == MessageState.UNPROCESSABLE);
+ System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
+ }
+
+ @Test ()
+ public void testTaskCancellation() throws InterruptedException
+ {
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+
+ CancellableHandlerFactory factory = new CancellableHandlerFactory();
+ executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+ List<Message> msgList = new ArrayList<Message>();
+
+ int nMsgs1 = 0;
+ for(int i = 0; i < nMsgs1; i++)
+ {
+ Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msgList.add(msg);
+ }
+
+ List<Message> msgListToCancel = new ArrayList<Message>();
+ int nMsgs2 = 4;
+ for(int i = 0; i < nMsgs2; i++)
+ {
+ Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msgList.add(msg);
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msgListToCancel.add(msg);
+ }
+ executor.onMessage("someInstance", msgList, changeContext);
+ Thread.sleep(500);
+ for(int i = 0; i < nMsgs2; i++)
+ {
+ executor.cancelTask(msgListToCancel.get(i), changeContext);
+ }
+ Thread.sleep(1500);
+
+ AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
+ AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1 + nMsgs2);
+
+ AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2);
+
+ for(Message message : msgList)
+ {
+ if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
+ {
+ AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
+ }
+ }
+ }
+
+
+ @Test ()
+ public void testShutdown() throws InterruptedException
+ {
+ System.out.println("START TestCMTaskExecutor.testShutdown()");
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+
+ TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+ executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
+ executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
+
+ CancellableHandlerFactory factory3 = new CancellableHandlerFactory();
+ executor.registerMessageHandlerFactory(factory3.getMessageType(), factory3);
+ int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10;
+ List<Message> msgList = new ArrayList<Message>();
+
+ for(int i = 0; i < nMsg1; i++)
+ {
+ Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msgList.add(msg);
+ }
+
+ for(int i = 0; i < nMsg2; i++)
+ {
+ Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msgList.add(msg);
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msgList.add(msg);
+ }
+
+ for(int i = 0; i < nMsg3; i++)
+ {
+ Message msg = new Message(factory3.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msgList.add(msg);
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msgList.add(msg);
+ }
+ NotificationContext changeContext = new NotificationContext(manager);
+ executor.onMessage("some", msgList, changeContext);
+ Thread.sleep(500);
+ for(ExecutorService svc : executor._threadpoolMap.values())
+ {
+ Assert.assertFalse(svc.isShutdown());
+ }
+ Assert.assertTrue(factory._processedMsgIds.size() > 0);
+ executor.shutDown();
+ for(ExecutorService svc : executor._threadpoolMap.values())
+ {
+ Assert.assertTrue(svc.isShutdown());
+ }
+ System.out.println("END TestCMTaskExecutor.testShutdown()");
+ }
+
+ @Test ()
+ public void testRetryCount() throws InterruptedException
+ {
+ String p = "test_";
+ System.out.println(p.substring(p.lastIndexOf('_')+1));
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+
+ CancellableHandlerFactory factory = new CancellableHandlerFactory();
+ executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+
+ List<Message> msgList = new ArrayList<Message>();
+ int nMsgs2 = 4;
+ // Test the case in which retry = 0
+ for(int i = 0; i < nMsgs2; i++)
+ {
+ Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setExecutionTimeout((i+1) * 600);
+ msgList.add(msg);
+ }
+ executor.onMessage("someInstance", msgList, changeContext);
+
+ Thread.sleep(4000);
+
+ AssertJUnit.assertTrue(factory._handlersCreated == nMsgs2);
+ AssertJUnit.assertEquals(factory._timedOutMsgIds.size() , 2);
+ //AssertJUnit.assertFalse(msgList.get(0).getRecord().getSimpleFields().containsKey("TimeOut"));
+ for(int i = 0; i<nMsgs2 - 2; i++)
+ {
+ if(msgList.get(i).getMsgType().equalsIgnoreCase(factory.getMessageType()))
+ {
+ AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields().containsKey("Cancelcount"));
+ AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
+ }
+ }
+ factory.reset();
+ msgList.clear();
+ // Test the case that the message are executed for the second time
+ nMsgs2 = 4;
+ for(int i = 0; i < nMsgs2; i++)
+ {
+ Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setExecutionTimeout((i+1) * 600);
+ msg.setRetryCount(1);
+ msgList.add(msg);
+ }
+ executor.onMessage("someInstance", msgList, changeContext);
+ Thread.sleep(3500);
+ AssertJUnit.assertEquals(factory._processedMsgIds.size(),3);
+ AssertJUnit.assertTrue(msgList.get(0).getRecord().getSimpleField("Cancelcount").equals("2"));
+ AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1"));
+ AssertJUnit.assertEquals(factory._timedOutMsgIds.size(),2);
+ AssertJUnit.assertTrue(executor._taskMap.size() == 0);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
new file mode 100644
index 0000000..bf82c26
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -0,0 +1,51 @@
+package org.apache.helix.messaging.handling;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixManager;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase
+{
+ @Test
+ public void TestThreadPoolSizeConfig()
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+ HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ ConfigAccessor accessor = manager.getConfigAccessor();
+ ConfigScope scope =
+ new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
+ accessor.set(scope, HelixTaskExecutor.MAX_THREADS, ""+12);
+
+ _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ long taskcount = 0;
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+
+ DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
+ HelixTaskExecutor helixExecutor = svc.getExecutor();
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get(MessageType.STATE_TRANSITION + "." + "NextDB"));
+ Assert.assertEquals(12, executor.getMaximumPoolSize());
+ taskcount += executor.getCompletedTaskCount();
+ Assert.assertTrue(executor.getCompletedTaskCount() > 0);
+ }
+ Assert.assertEquals(taskcount, 64 * 4);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/consumer/ConsumerAdapter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/consumer/ConsumerAdapter.java b/helix-core/src/test/java/org/apache/helix/mock/consumer/ConsumerAdapter.java
new file mode 100644
index 0000000..8e8b1fc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/consumer/ConsumerAdapter.java
@@ -0,0 +1,134 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.consumer;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.log4j.Logger;
+
+
+public class ConsumerAdapter implements ExternalViewChangeListener
+{
+
+ HelixManager relayHelixManager;
+ DataAccessor relayClusterClient;
+ private final ConcurrentHashMap<String, RelayConsumer> relayConsumers;
+ private final ConcurrentHashMap<String, RelayConfig> relayConfigs;
+ private static Logger logger = Logger.getLogger(ConsumerAdapter.class);
+
+ public ConsumerAdapter(String instanceName, String zkServer,
+ String clusterName) throws Exception
+ {
+ relayConsumers = new ConcurrentHashMap<String, RelayConsumer>();
+ relayConfigs = new ConcurrentHashMap<String, RelayConfig>();
+
+// relayClusterManager = ClusterManagerFactory.getZKBasedManagerForSpectator(
+// clusterName, zkServer);
+ relayHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
+ null,
+ InstanceType.SPECTATOR,
+ zkServer);
+
+ relayHelixManager.connect();
+ relayHelixManager.addExternalViewChangeListener(this);
+
+ }
+
+ private RelayConfig getRelayConfig(ZNRecord externalView, String partition)
+ {
+ LinkedHashMap<String, String> relayList = (LinkedHashMap<String, String>) externalView
+ .getMapField(partition);
+
+ if (relayList == null)
+ return null;
+
+ return new RelayConfig(relayList);
+
+ }
+
+ @Override
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext)
+ {
+ logger.info("onExternalViewChange invoked");
+
+ for (ExternalView subview : externalViewList)
+ {
+ Map<String, Map<String, String>> partitions = subview.getRecord().getMapFields();
+
+ for (Entry<String, Map<String, String>> partitionConsumer : partitions
+ .entrySet())
+ {
+ String partition = partitionConsumer.getKey();
+ Map<String, String> relayList = partitionConsumer.getValue();
+ RelayConfig relayConfig = new RelayConfig(relayList);
+ relayConfigs.put(partition, relayConfig);
+ RelayConsumer consumer = relayConsumers.get(partition);
+ if (consumer != null)
+ {
+ consumer.setConfig(relayConfig);
+ }
+ }
+ }
+ }
+
+ public void start()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public RelayConsumer getNewRelayConsumer(String dbName, String partition)
+ throws Exception
+ {
+ RelayConsumer consumer = new RelayConsumer(null, partition);
+
+ if (relayConsumers.putIfAbsent(partition, consumer) != null)
+ {
+ throw new Exception("Existing consumer");
+ }
+ logger.info("created new consumer for partition" + partition);
+
+ RelayConfig relayConfig = relayConfigs.get(partition);
+ if (relayConfig != null)
+ {
+ consumer.setConfig(relayConfig);
+ }
+
+ return consumer;
+ }
+
+ public void removeConsumer(String partition) throws Exception
+ {
+ if (relayConsumers.remove(partition) == null)
+ {
+ throw new Exception("Non Existing consumer for partition " + partition);
+ }
+ logger.info("Removed consumer for partition " + partition);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConfig.java b/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConfig.java
new file mode 100644
index 0000000..f156ee7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConfig.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.consumer;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+public class RelayConfig
+{
+ String partitionId;
+ private ArrayList<String> masterRelays;
+ private ArrayList<String> slaveRelays;
+
+ public RelayConfig(Map<String, String> relayList)
+ {
+
+ masterRelays = new ArrayList<String>();
+ slaveRelays = new ArrayList<String>();
+
+ for (Entry<String, String> entry : relayList.entrySet())
+ {
+ String relayInstance = entry.getKey();
+ String relayState = entry.getValue();
+
+ if (relayState.equals("MASTER"))
+ {
+ masterRelays.add(relayInstance);
+ } else if (relayState.equals("SLAVE"))
+ {
+ slaveRelays.add(relayInstance);
+ }
+ }
+ }
+
+ public String getClusterName()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public String getzkServer()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public String getMaster()
+ {
+ if (masterRelays.isEmpty())
+ return null;
+
+ return masterRelays.get(0);
+ }
+
+ public List<String> getRelays()
+ {
+ List<String> relays = new ArrayList<String>();
+ relays.addAll(masterRelays);
+ relays.addAll(slaveRelays);
+
+ return relays;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConsumer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConsumer.java b/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConsumer.java
new file mode 100644
index 0000000..c4ea610
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/consumer/RelayConsumer.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.consumer;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.log4j.Logger;
+
+
+public class RelayConsumer
+{
+ HelixManager relayHelixManager;
+ DataAccessor relayClusterClient;
+ private final String partition;
+ private RelayConfig currentRelay;
+ private static Logger logger = Logger.getLogger(RelayConsumer.class);
+
+ public RelayConsumer(RelayConfig relayConfig, String partition)
+ {
+ this.partition = partition;
+ this.currentRelay = relayConfig;
+ }
+
+ public void stop()
+ {
+ if (currentRelay != null)
+ {
+ logger.info("RelayConsumer stopping listening from relay "
+ + currentRelay.getMaster());
+ }
+ }
+
+ public boolean isPointingTo(RelayConfig relayConfig)
+ {
+ return false;
+ }
+
+ public void start()
+ {
+ if (currentRelay != null)
+ {
+ logger.info("RelayConsumer starting listening from relay "
+ + currentRelay.getMaster());
+ }
+ }
+
+ /*
+ * This is required at relayConsumer to reach out relays which are hosting
+ * data for slaved partitions.
+ */
+ void getRelaysForPartition(Integer partitionId)
+ {
+
+ }
+
+ public long getHwm()
+ {
+ // TODO this is supposed to return the last checkpoint from this
+ // consumer
+ return 0;
+ }
+
+ public String getPartition()
+ {
+ // TODO Auto-generated method stub
+ return partition;
+ }
+
+ public Object getCurrentRelay()
+ {
+ // TODO Auto-generated method stub
+ return currentRelay;
+ }
+
+ public synchronized void setConfig(RelayConfig relayConfig)
+ {
+ // TODO Auto-generated method stub
+ currentRelay = relayConfig;
+ logger.info("Setting config to relay " + relayConfig.getMaster());
+ }
+
+ public void flush()
+ {
+ assert (currentRelay != null);
+ logger.info("Consumer flushing for partition " + partition);
+ if (currentRelay == null || currentRelay.getRelays() == null)
+ return;
+
+ for (String relay : currentRelay.getRelays())
+ {
+ logger.info("Reading (flush) from relay " + relay);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
new file mode 100644
index 0000000..a06f86a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
@@ -0,0 +1,152 @@
+package org.apache.helix.mock.controller;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.participant.DistClusterControllerStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class ClusterController extends Thread
+{
+ private static Logger LOG =
+ Logger.getLogger(ClusterController.class);
+
+ private final CountDownLatch _startCountDown = new CountDownLatch(1);
+ private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+ private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+ private final String _controllerMode;
+ private final String _zkAddr;
+
+ private HelixManager _manager;
+
+ public ClusterController(String clusterName, String controllerName, String zkAddr) throws Exception
+ {
+ this(clusterName, controllerName, zkAddr, HelixControllerMain.STANDALONE.toString());
+ }
+
+ public ClusterController(String clusterName,
+ String controllerName,
+ String zkAddr,
+ String controllerMode) throws Exception
+ {
+ _controllerMode = controllerMode;
+ _zkAddr = zkAddr;
+
+ if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
+ {
+ _manager =
+ HelixManagerFactory.getZKHelixManager(clusterName,
+ controllerName,
+ InstanceType.CONTROLLER,
+ zkAddr);
+ }
+ else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
+ {
+ _manager =
+ HelixManagerFactory.getZKHelixManager(clusterName,
+ controllerName,
+ InstanceType.CONTROLLER_PARTICIPANT,
+ zkAddr);
+
+ }
+ else
+ {
+ throw new IllegalArgumentException("Controller mode: " + controllerMode
+ + " NOT recoginized");
+ }
+ }
+
+ public HelixManager getManager()
+ {
+ return _manager;
+ }
+
+ public void syncStop()
+ {
+ if (_manager == null)
+ {
+ LOG.warn("manager already stopped");
+ return;
+ }
+
+ _stopCountDown.countDown();
+ try
+ {
+ _waitStopFinishCountDown.await();
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void syncStart()
+ {
+ // TODO: prevent start multiple times
+
+ super.start();
+ try
+ {
+ _startCountDown.await();
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ try
+ {
+ if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
+ {
+ _manager.connect();
+ }
+ else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
+ {
+ DistClusterControllerStateModelFactory stateModelFactory =
+ new DistClusterControllerStateModelFactory(_zkAddr);
+
+ StateMachineEngine stateMach = _manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
+ _manager.connect();
+ }
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ finally
+ {
+ _startCountDown.countDown();
+ _stopCountDown.await();
+ }
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ finally
+ {
+ synchronized (_manager)
+ {
+ _manager.disconnect();
+ _manager = null;
+ }
+ _waitStopFinishCountDown.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
new file mode 100644
index 0000000..6f087e6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
@@ -0,0 +1,143 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.controller;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.util.HelixUtil;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+public class MockController
+{
+ private final ZkClient client;
+ private final String srcName;
+ private final String clusterName;
+
+ public MockController(String src, String zkServer, String cluster)
+ {
+ srcName = src;
+ clusterName = cluster;
+ client = new ZkClient(zkServer);
+ client.setZkSerializer(new ZNRecordSerializer());
+ }
+
+ void sendMessage(String msgId, String instanceName, String fromState,
+ String toState, String partitionKey, int partitionId)
+ throws InterruptedException, JsonGenerationException,
+ JsonMappingException, IOException
+ {
+ Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+ message.setMsgId(msgId);
+ message.setSrcName(srcName);
+ message.setTgtName(instanceName);
+ message.setMsgState(MessageState.NEW);
+ message.setFromState(fromState);
+ message.setToState(toState);
+ // message.setPartitionId(partitionId);
+ message.setPartitionName(partitionKey);
+
+ String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/"
+ + message.getId();
+ ObjectMapper mapper = new ObjectMapper();
+ StringWriter sw = new StringWriter();
+ mapper.writeValueUsingView(sw, message, Message.class);
+ System.out.println(sw.toString());
+ client.delete(path);
+
+ Thread.sleep(10000);
+ ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName,
+ instanceName));
+ message.setTgtSessionId(record.getSimpleField(
+ LiveInstanceProperty.SESSION_ID.toString()).toString());
+ client.createPersistent(path, message);
+ }
+
+ public void createExternalView(List<String> instanceNames, int partitions,
+ int replicas, String dbName, long randomSeed)
+ {
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(client));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ ExternalView externalView = new ExternalView(computeRoutingTable(instanceNames, partitions,
+ replicas, dbName, randomSeed));
+
+ accessor.setProperty(keyBuilder.externalView(dbName), externalView);
+ }
+
+ public ZNRecord computeRoutingTable(List<String> instanceNames,
+ int partitions, int replicas, String dbName, long randomSeed)
+ {
+ assert (instanceNames.size() > replicas);
+ Collections.sort(instanceNames);
+
+ ZNRecord result = new ZNRecord(dbName);
+
+ Map<String, Object> externalView = new TreeMap<String, Object>();
+
+ List<Integer> partitionList = new ArrayList<Integer>(partitions);
+ for (int i = 0; i < partitions; i++)
+ {
+ partitionList.add(new Integer(i));
+ }
+ Random rand = new Random(randomSeed);
+ // Shuffle the partition list
+ Collections.shuffle(partitionList, rand);
+
+ for (int i = 0; i < partitionList.size(); i++)
+ {
+ int partitionId = partitionList.get(i);
+ Map<String, String> partitionAssignment = new TreeMap<String, String>();
+ int masterNode = i % instanceNames.size();
+ // the first in the list is the node that contains the master
+ partitionAssignment.put(instanceNames.get(masterNode), "MASTER");
+
+ // for the jth replica, we put it on (masterNode + j) % nodes-th
+ // node
+ for (int j = 1; j <= replicas; j++)
+ {
+ partitionAssignment
+ .put(instanceNames.get((masterNode + j) % instanceNames.size()),
+ "SLAVE");
+ }
+ String partitionName = dbName + ".partition-" + partitionId;
+ result.setMapField(partitionName, partitionAssignment);
+ }
+ result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "" + partitions);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java b/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
new file mode 100644
index 0000000..480f50a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.controller;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+
+public class MockControllerProcess
+{
+
+ /**
+ * @param args
+ * @throws IOException
+ * @throws JsonMappingException
+ * @throws JsonGenerationException
+ * @throws InterruptedException
+ */
+ public static void main(String[] args) throws JsonGenerationException,
+ JsonMappingException, InterruptedException, IOException
+ {
+
+ MockController storageController = new MockController("cm-instance-0",
+ "localhost:2181", "storage-cluster");
+ MockController relayController = new MockController("cm-instance-0",
+ "localhost:2181", "relay-cluster");
+
+ ArrayList<String> instanceNames = new ArrayList<String>();
+ instanceNames.add("relay0");
+ instanceNames.add("relay1");
+ instanceNames.add("relay2");
+ instanceNames.add("relay3");
+ instanceNames.add("relay4");
+
+ relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 0);
+
+ // Messages to initiate offline->slave->master->slave transitions
+
+ storageController.sendMessage("TestMessageId1", "localhost_8900",
+ "Offline", "Slave", "EspressoDB.partition-0", 0);
+ Thread.sleep(10000);
+ storageController.sendMessage("TestMessageId2", "localhost_8900", "Slave",
+ "Master", "EspressoDB.partition-0", 0);
+ Thread.sleep(10000);
+ storageController.sendMessage("TestMessageId3", "localhost_8900", "Master",
+ "Slave", "EspressoDB.partition-0", 0);
+ Thread.sleep(10000);
+
+ // Change the external view to trigger the consumer to listen from
+ // another relay
+ relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 10);
+
+ storageController.sendMessage("TestMessageId4", "localhost_8900", "Slave",
+ "Offline", "EspressoDB.partition-0", 0);
+ Thread.sleep(10000);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/relay/DummyRelayProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/relay/DummyRelayProcess.java b/helix-core/src/test/java/org/apache/helix/mock/relay/DummyRelayProcess.java
new file mode 100644
index 0000000..38465f3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/relay/DummyRelayProcess.java
@@ -0,0 +1,248 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.relay;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+
+
+public class DummyRelayProcess
+{
+
+ public static final String zkServer = "zkSvr";
+ public static final String cluster = "cluster";
+ public static final String hostAddress = "host";
+ public static final String hostPort = "port";
+ public static final String relayCluster = "relayCluster";
+ public static final String help = "help";
+ public static final String configFile = "configFile";
+
+ private final String zkConnectString;
+ private final String clusterName;
+ private final String instanceName;
+ private HelixManager manager;
+ private DummyStateModelFactory stateModelFactory;
+ private HelixStateMachineEngine genericStateMachineHandler;
+
+ private final String _clusterViewFile;
+
+ public DummyRelayProcess(String zkConnectString, String clusterName,
+ String instanceName, String clusterViewFile)
+ {
+ this.zkConnectString = zkConnectString;
+ this.clusterName = clusterName;
+ this.instanceName = instanceName;
+ this._clusterViewFile = clusterViewFile;
+ }
+
+ public void start() throws Exception
+ {
+ if (_clusterViewFile == null)
+ {
+ manager = HelixManagerFactory.getZKHelixManager(clusterName,
+ instanceName,
+ InstanceType.PARTICIPANT,
+ zkConnectString);
+ }
+ else
+ {
+ manager = HelixManagerFactory.getStaticFileHelixManager(clusterName,
+ instanceName,
+ InstanceType.PARTICIPANT,
+ _clusterViewFile);
+
+ }
+
+ stateModelFactory = new DummyStateModelFactory();
+ genericStateMachineHandler.registerStateModelFactory("OnlineOffline", stateModelFactory);
+ manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
+ manager.connect();
+ if (_clusterViewFile != null)
+ {
+ ClusterStateVerifier.verifyFileBasedClusterStates(_clusterViewFile, instanceName,
+ stateModelFactory);
+
+ }
+ }
+
+ public static class DummyStateModelFactory extends StateModelFactory<StateModel>
+ {
+ @Override
+ public StateModel createNewStateModel(String stateUnitKey)
+ {
+ System.out.println("Creating state model for "+ stateUnitKey);
+ return new DummyStateModel();
+ }
+
+ }
+
+ public static class DummyStateModel extends StateModel
+ {
+ public void onBecomeOnlineFromOffline(Message message,
+ NotificationContext context)
+ {
+
+ System.out.println("DummyStateModel.onBecomeSlaveFromOffline()");
+ }
+
+ public void onBecomeOfflineFromOnline(Message message,
+ NotificationContext context)
+ {
+ System.out.println("DummyStateModel.onBecomeOfflineFromSlave()");
+
+ }
+ }
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions()
+ {
+ Option helpOption = OptionBuilder.withLongOpt(help)
+ .withDescription("Prints command-line options info").create();
+
+ Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+ .withDescription("Provide zookeeper address").create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+ Option clusterOption = OptionBuilder.withLongOpt(cluster)
+ .withDescription("Provide cluster name").create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+ .withDescription("Provide host name").create();
+ hostOption.setArgs(1);
+ hostOption.setRequired(true);
+ hostOption.setArgName("Host name (Required)");
+
+ Option portOption = OptionBuilder.withLongOpt(hostPort)
+ .withDescription("Provide host port").create();
+ portOption.setArgs(1);
+ portOption.setRequired(true);
+ portOption.setArgName("Host port (Required)");
+
+ // add an option group including either --zkSvr or --configFile
+ Option fileOption = OptionBuilder.withLongOpt(configFile)
+ .withDescription("Provide file to read states/messages").create();
+ fileOption.setArgs(1);
+ fileOption.setRequired(true);
+ fileOption.setArgName("File to read states/messages (Optional)");
+
+ OptionGroup optionGroup = new OptionGroup();
+ optionGroup.addOption(zkServerOption);
+ optionGroup.addOption(fileOption);
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ // options.addOption(zkServerOption);
+ options.addOption(clusterOption);
+ options.addOption(hostOption);
+ options.addOption(portOption);
+
+ options.addOptionGroup(optionGroup);
+
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs)
+ throws Exception
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ CommandLine cmd = null;
+
+ try
+ {
+ return cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe)
+ {
+ System.err
+ .println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ String zkConnectString = "pganti-md:2181";
+ String clusterName = "SDR_RELAY";
+ String instanceName = "ela4-rly02.prod.linkedin.com_10015";
+ String file = null;
+
+ if (args.length > 0)
+ {
+ CommandLine cmd = processCommandLineArgs(args);
+ zkConnectString = cmd.getOptionValue(zkServer);
+ clusterName = cmd.getOptionValue(cluster);
+
+ String host = cmd.getOptionValue(hostAddress);
+ String portString = cmd.getOptionValue(hostPort);
+ int port = Integer.parseInt(portString);
+ instanceName = host + "_" + port;
+
+ file = cmd.getOptionValue(configFile);
+ if (file != null)
+ {
+ File f = new File(file);
+ if (!f.exists())
+ {
+ System.err.println("static config file doesn't exist");
+ System.exit(1);
+ }
+ }
+
+ }
+ // Espresso_driver.py will consume this
+ System.out.println("Dummy process started");
+
+ DummyRelayProcess process = new DummyRelayProcess(zkConnectString,
+ clusterName, instanceName, file);
+
+ process.start();
+ Thread.currentThread().join();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/relay/RelayIdealStateGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/relay/RelayIdealStateGenerator.java b/helix-core/src/test/java/org/apache/helix/mock/relay/RelayIdealStateGenerator.java
new file mode 100644
index 0000000..db326d6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/relay/RelayIdealStateGenerator.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.relay;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+public class RelayIdealStateGenerator
+{
+ public static void main(String[] args)
+ {
+ ZNRecord record = new ZNRecord("SdrRelay");
+ record.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "28");
+ for (int i = 22; i < 28; i++)
+ {
+ String key = "ela4-db-sdr.prod.linkedin.com_1521,sdr1,sdr_people_search_,p"
+ + i + ",MASTER";
+ Map<String, String> map = new HashMap<String, String>();
+ for (int j = 0; j < 4; j++)
+ {
+ String instanceName = "ela4-rly0" + j + ".prod.linkedin.com_10015";
+ map.put(instanceName, "ONLINE");
+ }
+ record.getMapFields().put(key, map);
+ }
+
+ ZNRecordSerializer serializer = new ZNRecordSerializer();
+ System.out.println(new String(serializer.serialize(record)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/router/MockRouterProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/router/MockRouterProcess.java b/helix-core/src/test/java/org/apache/helix/mock/router/MockRouterProcess.java
new file mode 100644
index 0000000..d2916e6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/router/MockRouterProcess.java
@@ -0,0 +1,158 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.router;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.HelixUtil;
+
+
+/**
+ * A MockRouter process to demonstrate the integration with cluster manager.
+ * This uses Zookeeper in local mode and runs at port 2188
+ *
+ * @author kgopalak
+ *
+ */
+public class MockRouterProcess
+{
+ private static final int port = 2188;
+ static long runId = System.currentTimeMillis();
+ private static final String dataDir = "/tmp/zkDataDir-" + runId;
+
+ private static final String logDir = "/tmp/zkLogDir-" + runId;
+
+ static String clusterName = "mock-cluster-" + runId;
+
+ static String zkConnectString = "localhost:2188";
+
+ private final RoutingTableProvider _routingTableProvider;
+ private static ZkServer zkServer;
+
+ public MockRouterProcess()
+ {
+ _routingTableProvider = new RoutingTableProvider();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ setup();
+ zkServer.getZkClient().setZkSerializer(new ZNRecordSerializer());
+ ZNRecord record = zkServer.getZkClient().readData(
+ HelixUtil.getIdealStatePath(clusterName, "TestDB"));
+
+ String externalViewPath = HelixUtil.getExternalViewPath(clusterName, "TestDB");
+
+ MockRouterProcess process = new MockRouterProcess();
+ process.start();
+ //try to route, there is no master or slave available
+ process.routeRequest("TestDB", "TestDB_1");
+
+ //update the externalview on zookeeper
+ zkServer.getZkClient().createPersistent(externalViewPath,record);
+ //sleep for sometime so that the ZK Callback is received.
+ Thread.sleep(1000);
+ process.routeRequest("TestDB", "TestDB_1");
+ System.exit(1);
+ }
+
+ private static void setup()
+ {
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+ {
+ @Override
+ public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient client)
+ {
+ client.deleteRecursive("/" + clusterName);
+
+ }
+ };
+
+ zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+ zkServer.start();
+ ClusterSetup clusterSetup = new ClusterSetup(zkConnectString);
+ clusterSetup.setupTestCluster(clusterName);
+ try
+ {
+ Thread.sleep(1000);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public void routeRequest(String database, String partition)
+ {
+ List<InstanceConfig> masters;
+ List<InstanceConfig> slaves;
+ masters = _routingTableProvider.getInstances(database, partition, "MASTER");
+ if (masters != null && !masters.isEmpty())
+ {
+ System.out.println("Available masters to route request");
+ for (InstanceConfig config : masters)
+ {
+ System.out.println("HostName:" + config.getHostName() + " Port:"
+ + config.getPort());
+ }
+ } else
+ {
+ System.out.println("No masters available to route request");
+ }
+ slaves = _routingTableProvider.getInstances(database, partition, "SLAVE");
+ if (slaves != null && !slaves.isEmpty())
+ {
+ System.out.println("Available slaves to route request");
+ for (InstanceConfig config : slaves)
+ {
+ System.out.println("HostName:" + config.getHostName() + " Port:"
+ + config.getPort());
+ }
+ } else
+ {
+ System.out.println("No slaves available to route request");
+ }
+ }
+
+ public void start()
+ {
+
+ try
+ {
+ HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName,
+ null,
+ InstanceType.SPECTATOR,
+ zkConnectString);
+
+
+ manager.connect();
+ manager.addExternalViewChangeListener(_routingTableProvider);
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/router/RouterAdapter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/router/RouterAdapter.java b/helix-core/src/test/java/org/apache/helix/mock/router/RouterAdapter.java
new file mode 100644
index 0000000..f9bccaa
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/router/RouterAdapter.java
@@ -0,0 +1,21 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.router;
+
+public class RouterAdapter
+{
+
+}