You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[42/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkClusterManager.java
deleted file mode 100644
index fa6e18e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkClusterManager.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.manager.MockListener;
-import com.linkedin.helix.store.PropertyStore;
-
-public class TestZkClusterManager extends ZkUnitTestBase
-{
- final String className = getShortClassName();
-
- @Test()
- public void testController() throws Exception
- {
- System.out.println("START " + className + ".testController() at " + new Date(System.currentTimeMillis()));
- final String clusterName = CLUSTER_PREFIX + "_" + className + "_controller";
-
- // basic test
- if (_gZkClient.exists("/" + clusterName))
- {
- _gZkClient.deleteRecursive("/" + clusterName);
- }
-
- ZKHelixManager controller = new ZKHelixManager(clusterName, null,
- InstanceType.CONTROLLER,
- ZK_ADDR);
- try
- {
- controller.connect();
- Assert.fail("Should throw HelixException if initial cluster structure is not setup");
- } catch (HelixException e)
- {
- // OK
- }
-
- TestHelper.setupEmptyCluster(_gZkClient, clusterName);
-
- controller.connect();
- AssertJUnit.assertTrue(controller.isConnected());
- controller.connect();
- AssertJUnit.assertTrue(controller.isConnected());
-
- MockListener listener = new MockListener();
- listener.reset();
-
- try
- {
- controller.addControllerListener(null);
- Assert.fail("Should throw HelixException");
- } catch (HelixException e)
- {
- // OK
- }
-
- controller.addControllerListener(listener);
- AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
- controller.removeListener(listener);
-
- PropertyStore<ZNRecord> store = controller.getPropertyStore();
- ZNRecord record = new ZNRecord("node_1");
- store.setProperty("node_1", record);
- record = store.getProperty("node_1");
- AssertJUnit.assertEquals("node_1", record.getId());
-
- controller.getMessagingService();
- controller.getHealthReportCollector();
- controller.getClusterManagmentTool();
-
- controller.handleNewSession();
- controller.disconnect();
- AssertJUnit.assertFalse(controller.isConnected());
-
- System.out.println("END " + className + ".testController() at " + new Date(System.currentTimeMillis()));
- }
-
- @Test()
- public void testAdministrator() throws Exception
- {
- System.out.println("START " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
- final String clusterName = CLUSTER_PREFIX + "_" + className + "_admin";
-
- // basic test
- if (_gZkClient.exists("/" + clusterName))
- {
- _gZkClient.deleteRecursive("/" + clusterName);
- }
-
- ZKHelixManager admin = new ZKHelixManager(clusterName, null,
- InstanceType.ADMINISTRATOR,
- ZK_ADDR);
-
- TestHelper.setupEmptyCluster(_gZkClient, clusterName);
-
- admin.connect();
- AssertJUnit.assertTrue(admin.isConnected());
-
- HelixAdmin adminTool = admin.getClusterManagmentTool();
- ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName)
- .forResource("testResource").forPartition("testPartition").build();
- Map<String, String> properties = new HashMap<String, String>();
- properties.put("pKey1", "pValue1");
- properties.put("pKey2", "pValue2");
- adminTool.setConfig(scope, properties);
-
- properties = adminTool.getConfig(scope, TestHelper.setOf("pKey1", "pKey2"));
- Assert.assertEquals(properties.size(), 2);
- Assert.assertEquals(properties.get("pKey1"), "pValue1");
- Assert.assertEquals(properties.get("pKey2"), "pValue2");
-
- admin.disconnect();
- AssertJUnit.assertFalse(admin.isConnected());
-
- System.out.println("END " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkHelixAdmin.java
deleted file mode 100644
index 1bd2d27..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/manager/zk/TestZkHelixAdmin.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.StateModelDefinition;
-
-public class TestZkHelixAdmin extends ZkUnitTestBase
-{
- @Test()
- public void testZkHelixAdmin()
- {
- System.out.println("START testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
-
- final String clusterName = getShortClassName();
- if (_gZkClient.exists("/" + clusterName))
- {
- _gZkClient.deleteRecursive("/" + clusterName);
- }
-
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
- tool.addCluster(clusterName, true);
- Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
- tool.addCluster(clusterName, true);
- Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
-
- List<String> list = tool.getClusters();
- AssertJUnit.assertTrue(list.size() > 0);
-
- try
- {
- tool.addCluster(clusterName, false);
- Assert.fail("should fail if add an already existing cluster");
- } catch (HelixException e)
- {
- // OK
- }
-
- InstanceConfig config = new InstanceConfig("host1_9999");
- config.setHostName("host1");
- config.setPort("9999");
- tool.addInstance(clusterName, config);
- tool.enableInstance(clusterName, "host1_9999", true);
- String path = PropertyPathConfig.getPath(PropertyType.INSTANCES,
- clusterName, "host1_9999");
- AssertJUnit.assertTrue(_gZkClient.exists(path));
-
- try
- {
- tool.addInstance(clusterName, config);
- Assert.fail("should fail if add an alredy-existing instance");
- } catch (HelixException e)
- {
- // OK
- }
- config = tool.getInstanceConfig(clusterName, "host1_9999");
- AssertJUnit.assertEquals(config.getId(), "host1_9999");
-
- tool.dropInstance(clusterName, config);
- try
- {
- tool.getInstanceConfig(clusterName, "host1_9999");
- Assert.fail("should fail if get a non-existent instance");
- } catch (HelixException e)
- {
- // OK
- }
- try
- {
- tool.dropInstance(clusterName, config);
- Assert.fail("should fail if drop on a non-existent instance");
- } catch (HelixException e)
- {
- // OK
- }
- try
- {
- tool.enableInstance(clusterName, "host1_9999", false);
- Assert.fail("should fail if enable a non-existent instance");
- } catch (HelixException e)
- {
- // OK
- }
- ZNRecord stateModelRecord = new ZNRecord("id1");
- try
- {
- tool.addStateModelDef(clusterName, "id1", new StateModelDefinition(
- stateModelRecord));
- path = PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
- clusterName, "id1");
- AssertJUnit.assertTrue(_gZkClient.exists(path));
- Assert.fail("should fail");
- } catch (HelixException e)
- {
- // OK
- }
- try
- {
- tool.addStateModelDef(clusterName, "id1", new StateModelDefinition(
- stateModelRecord));
- Assert.fail("should fail if add an already-existing state model");
- } catch (HelixException e)
- {
- // OK
- }
- list = tool.getStateModelDefs(clusterName);
- AssertJUnit.assertEquals(list.size(), 0);
-
- try
- {
- tool.addResource(clusterName, "resource", 10,
- "nonexistStateModelDef");
- Assert
- .fail("should fail if add a resource without an existing state model");
- } catch (HelixException e)
- {
- // OK
- }
- try
- {
- tool.addResource(clusterName, "resource", 10, "id1");
- Assert.fail("should fail");
- } catch (HelixException e)
- {
- // OK
- }
- list = tool.getResourcesInCluster(clusterName);
- AssertJUnit.assertEquals(list.size(), 0);
- try
- {
- tool.addResource(clusterName, "resource", 10, "id1");
- Assert.fail("should fail");
- } catch (HelixException e)
- {
- // OK
- }
- list = tool.getResourcesInCluster(clusterName);
- AssertJUnit.assertEquals(list.size(), 0);
-
- ExternalView resourceExternalView = tool.getResourceExternalView(
- clusterName, "resource");
- AssertJUnit.assertNull(resourceExternalView);
-
- // test config support
- ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName)
- .forResource("testResource").forPartition("testPartition").build();
- Map<String, String> properties = new HashMap<String, String>();
- properties.put("pKey1", "pValue1");
- properties.put("pKey2", "pValue2");
-
- // make sure calling set/getConfig() many times will not drain zkClient resources
- // int nbOfZkClients = ZkClient.getNumberOfConnections();
- for (int i = 0; i < 100; i++)
- {
- tool.setConfig(scope, properties);
- Map<String, String> newProperties = tool.getConfig(scope, properties.keySet());
- Assert.assertEquals(newProperties.size(), 2);
- Assert.assertEquals(newProperties.get("pKey1"), "pValue1");
- Assert.assertEquals(newProperties.get("pKey2"), "pValue2");
- }
- // Assert.assertTrue(ZkClient.getNumberOfConnections() - nbOfZkClients < 5);
-
- System.out.println("END testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallback.java b/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallback.java
deleted file mode 100644
index 31c5d25..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallback.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.model.Message;
-
-public class TestAsyncCallback
-{
- class AsyncCallbackSample extends AsyncCallback
- {
- int _onTimeOutCalled = 0;
- int _onReplyMessageCalled = 0;
- @Override
- public void onTimeOut()
- {
- // TODO Auto-generated method stub
- _onTimeOutCalled ++;
- }
-
- @Override
- public void onReplyMessage(Message message)
- {
- _onReplyMessageCalled++;
- }
- }
-
- @Test()
- public void testAsyncCallback() throws Exception
- {
- System.out.println("START TestAsyncCallback at " + new Date(System.currentTimeMillis()));
- AsyncCallbackSample callback = new AsyncCallbackSample();
- AssertJUnit.assertFalse(callback.isInterrupted());
- AssertJUnit.assertFalse(callback.isTimedOut());
- AssertJUnit.assertTrue(callback.getMessageReplied().size() == 0);
-
- int nMsgs = 5;
-
- List<Message> messageSent = new ArrayList<Message>();
- for(int i = 0;i < nMsgs; i++)
- {
- messageSent.add(new Message("Test", UUID.randomUUID().toString()));
- }
-
- callback.setMessagesSent(messageSent);
-
- for(int i = 0;i < nMsgs; i++)
- {
- AssertJUnit.assertFalse(callback.isDone());
- callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
- }
- AssertJUnit.assertTrue(callback.isDone());
-
- AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
-
- sleep(50);
- callback = new AsyncCallbackSample();
- callback.setMessagesSent(messageSent);
- callback.setTimeout(1000);
- sleep(50);
- callback.startTimer();
- AssertJUnit.assertFalse(callback.isTimedOut());
- for(int i = 0;i < nMsgs - 1; i++)
- {
- sleep(50);
- AssertJUnit.assertFalse(callback.isDone());
- AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
- callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
- }
- sleep(1000);
- AssertJUnit.assertTrue(callback.isTimedOut());
- AssertJUnit.assertTrue(callback._onTimeOutCalled == 1 );
- AssertJUnit.assertFalse(callback.isDone());
-
- callback = new AsyncCallbackSample();
- callback.setMessagesSent(messageSent);
- callback.setTimeout(1000);
- callback.startTimer();
- sleep(50);
- AssertJUnit.assertFalse(callback.isTimedOut());
- for(int i = 0;i < nMsgs; i++)
- {
- AssertJUnit.assertFalse(callback.isDone());
- sleep(50);
- AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
- callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
- }
- AssertJUnit.assertTrue(callback.isDone());
- sleep(1300);
- AssertJUnit.assertFalse(callback.isTimedOut());
- AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
- System.out.println("END TestAsyncCallback at " + new Date(System.currentTimeMillis()));
- }
-
- void sleep(int time)
- {
- try
- {
- Thread.sleep(time);
- }
- catch(Exception e)
- {
- System.out.println(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallbackSvc.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallbackSvc.java b/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallbackSvc.java
deleted file mode 100644
index e44e707..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/TestAsyncCallbackSvc.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import org.testng.annotations.Test;
-import org.testng.AssertJUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.Mocks;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.AsyncCallbackService;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.TestHelixTaskExecutor.MockClusterManager;
-import com.linkedin.helix.model.Message;
-
-public class TestAsyncCallbackSvc
-{
- class MockHelixManager extends Mocks.MockManager
- {
- public String getSessionId()
- {
- return "123";
- }
- }
-
- class TestAsyncCallback extends AsyncCallback
- {
- HashSet<String> _repliedMessageId = new HashSet<String>();
- @Override
- public void onTimeOut()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void onReplyMessage(Message message)
- {
- // TODO Auto-generated method stub
- _repliedMessageId.add(message.getMsgId());
- }
-
- }
- @Test(groups =
- { "unitTest" })
- public void testAsyncCallbackSvc() throws Exception
- {
- AsyncCallbackService svc = new AsyncCallbackService();
- HelixManager manager = new MockHelixManager();
- NotificationContext changeContext = new NotificationContext(manager);
-
- Message msg = new Message(svc.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId(manager.getSessionId());
- try
- {
- MessageHandler aHandler = svc.createHandler(msg, changeContext);
- }
- catch(HelixException e)
- {
- AssertJUnit.assertTrue(e.getMessage().indexOf(msg.getMsgId())!= -1);
- }
- Message msg2 = new Message("RandomType", UUID.randomUUID().toString());
- msg2.setTgtSessionId(manager.getSessionId());
- try
- {
- MessageHandler aHandler = svc.createHandler(msg2, changeContext);
- }
- catch(HelixException e)
- {
- AssertJUnit.assertTrue(e.getMessage().indexOf(msg2.getMsgId())!= -1);
- }
- Message msg3 = new Message(svc.getMessageType(), UUID.randomUUID().toString());
- msg3.setTgtSessionId(manager.getSessionId());
- msg3.setCorrelationId("wfwegw");
- try
- {
- MessageHandler aHandler = svc.createHandler(msg3, changeContext);
- }
- catch(HelixException e)
- {
- AssertJUnit.assertTrue(e.getMessage().indexOf(msg3.getMsgId())!= -1);
- }
-
- TestAsyncCallback callback = new TestAsyncCallback();
- String corrId = UUID.randomUUID().toString();
- svc.registerAsyncCallback(corrId, new TestAsyncCallback());
- svc.registerAsyncCallback(corrId, callback);
-
- List<Message> msgSent = new ArrayList<Message>();
- msgSent.add(new Message("Test", UUID.randomUUID().toString()));
- callback.setMessagesSent(msgSent);
-
- msg = new Message(svc.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("*");
- msg.setCorrelationId(corrId);
-
- MessageHandler aHandler = svc.createHandler(msg, changeContext);
- Map<String, String> resultMap = new HashMap<String, String>();
- aHandler.handleMessage();
-
- AssertJUnit.assertTrue(callback.isDone());
- AssertJUnit.assertTrue(callback._repliedMessageId.contains(msg.getMsgId()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/TestDefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/com/linkedin/helix/messaging/TestDefaultMessagingService.java
deleted file mode 100644
index 16bfc03..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/TestDefaultMessagingService.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.Mocks;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.messaging.DefaultMessagingService;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorType;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
-
-public class TestDefaultMessagingService
-{
- class MockHelixManager extends Mocks.MockManager
- {
- class MockDataAccessor extends Mocks.MockAccessor
- {
-
- @Override
- public <T extends HelixProperty> T getProperty(PropertyKey key)
- {
-
- PropertyType type = key.getType();
- if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
- {
- return (T) new ExternalView(_externalView);
- }
- return null;
- }
-
- @Override
- public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
- {
- PropertyType type = key.getType();
- List<T> result = new ArrayList<T>();
- Class<? extends HelixProperty> clazz = key.getTypeClass();
- if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
- {
- HelixProperty typedInstance = HelixProperty.convertToTypedInstance(clazz, _externalView);
- result.add((T) typedInstance);
- return result;
- }
- else if(type == PropertyType.LIVEINSTANCES)
- {
- return (List<T>) HelixProperty.convertToTypedList(clazz, _liveInstances);
- }
-
- return result;
- }
- }
-
- HelixDataAccessor _accessor = new MockDataAccessor();
- ZNRecord _externalView;
- List<String> _instances;
- List<ZNRecord> _liveInstances;
- String _db = "DB";
- int _replicas = 3;
- int _partitions = 50;
-
- public MockHelixManager()
- {
- _liveInstances = new ArrayList<ZNRecord>();
- _instances = new ArrayList<String>();
- for(int i = 0;i<5; i++)
- {
- String instance = "localhost_"+(12918+i);
- _instances.add(instance);
- ZNRecord metaData = new ZNRecord(instance);
- metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
- UUID.randomUUID().toString());
- _liveInstances.add(metaData);
- }
- _externalView = IdealStateCalculatorForStorageNode.calculateIdealState(
- _instances, _partitions, _replicas, _db, "MASTER", "SLAVE");
-
- }
-
- @Override
- public boolean isConnected()
- {
- return true;
- }
-
- @Override
- public HelixDataAccessor getHelixDataAccessor()
- {
- return _accessor;
- }
-
-
- @Override
- public String getInstanceName()
- {
- return "localhost_12919";
- }
-
- @Override
- public InstanceType getInstanceType()
- {
- return InstanceType.PARTICIPANT;
- }
- }
-
- class TestMessageHandlerFactory implements MessageHandlerFactory
- {
- class TestMessageHandler extends MessageHandler
- {
-
- public TestMessageHandler(Message message, NotificationContext context)
- {
- super(message, context);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- HelixTaskResult result = new HelixTaskResult();
- result.setSuccess(true);
- return result;
- }
-
- @Override
- public void onError( Exception e, ErrorCode code, ErrorType type)
- {
- // TODO Auto-generated method stub
-
- }
- }
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- // TODO Auto-generated method stub
- return new TestMessageHandler(message, context);
- }
-
- @Override
- public String getMessageType()
- {
- // TODO Auto-generated method stub
- return "TestingMessageHandler";
- }
-
- @Override
- public void reset()
- {
- // TODO Auto-generated method stub
-
- }
- }
-
- @Test()
- public void TestMessageSend()
- {
- HelixManager manager = new MockHelixManager();
- DefaultMessagingService svc = new DefaultMessagingService(manager);
- TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- svc.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setInstanceName("localhost_12919");
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setSelfExcluded(true);
-
- Message template = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- AssertJUnit.assertEquals(0, svc.send(recipientCriteria, template));
-
- recipientCriteria.setSelfExcluded(false);
- AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
-
-
- recipientCriteria.setSelfExcluded(false);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource("DB");
- recipientCriteria.setPartition("%");
- AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
-
- recipientCriteria.setSelfExcluded(true);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource("DB");
- recipientCriteria.setPartition("%");
- AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
-
- recipientCriteria.setSelfExcluded(true);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource("DB");
- recipientCriteria.setPartition("%");
- AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
-
- recipientCriteria.setSelfExcluded(true);
- recipientCriteria.setInstanceName("localhost_12920");
- recipientCriteria.setResource("DB");
- recipientCriteria.setPartition("%");
- AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
-
-
- recipientCriteria.setSelfExcluded(true);
- recipientCriteria.setInstanceName("localhost_12920");
- recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
- recipientCriteria.setResource("DB");
- recipientCriteria.setPartition("%");
- AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestConfigThreadpoolSize.java
deleted file mode 100644
index 08c2f86..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package com.linkedin.helix.messaging.handling;
-
-import java.util.HashSet;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.integration.ZkStandAloneCMTestBase;
-import com.linkedin.helix.integration.TestMessagingService.TestMessagingHandlerFactory.TestMessagingHandler;
-import com.linkedin.helix.messaging.DefaultMessagingService;
-import com.linkedin.helix.messaging.handling.HelixTaskExecutor;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorType;
-import com.linkedin.helix.model.Message;
-
-public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase
-{
- public static class TestMessagingHandlerFactory implements MessageHandlerFactory
- {
- public static HashSet<String> _processedMsgIds = new HashSet<String>();
-
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- return null;
- }
-
- @Override
- public String getMessageType()
- {
- return "TestMsg";
- }
-
- @Override
- public void reset()
- {
- // TODO Auto-generated method stub
- }
-
- }
-
- public static class TestMessagingHandlerFactory2 implements MessageHandlerFactory
- {
- public static HashSet<String> _processedMsgIds = new HashSet<String>();
-
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- return null;
- }
-
- @Override
- public String getMessageType()
- {
- return "TestMsg2";
- }
-
- @Override
- public void reset()
- {
- // TODO Auto-generated method stub
- }
-
- }
- @Test
- public void TestThreadPoolSizeConfig()
- {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
-
- ConfigAccessor accessor = manager.getConfigAccessor();
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(instanceName).build();
- accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+12);
-
- scope =
- new ConfigScopeBuilder().forCluster(manager.getClusterName()).build();
- accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+8);
-
- for (int i = 0; i < NODE_NR; i++)
- {
- instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-
- _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg", new TestMessagingHandlerFactory());
- _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg2", new TestMessagingHandlerFactory2());
-
-
- }
-
- for (int i = 0; i < NODE_NR; i++)
- {
- instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-
- DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
- HelixTaskExecutor helixExecutor = svc.getExecutor();
- ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg"));
-
- ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get("TestMsg2"));
- if(i != 0)
- {
-
- Assert.assertEquals(8, executor.getMaximumPoolSize());
- }
- else
- {
- Assert.assertEquals(12, executor.getMaximumPoolSize());
- }
- Assert.assertEquals(HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, executor2.getMaximumPoolSize());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestHelixTaskExecutor.java
deleted file mode 100644
index 2a4619e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestHelixTaskExecutor.java
+++ /dev/null
@@ -1,584 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.Mocks;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-
-public class TestHelixTaskExecutor
-{
- public static class MockClusterManager extends Mocks.MockManager
- {
- @Override
- public String getSessionId()
- {
- return "123";
- }
- }
-
- class TestMessageHandlerFactory implements MessageHandlerFactory
- {
- int _handlersCreated = 0;
- ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
- class TestMessageHandler extends MessageHandler
- {
- public TestMessageHandler(Message message, NotificationContext context)
- {
- super(message, context);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- HelixTaskResult result = new HelixTaskResult();
- _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
- Thread.currentThread().sleep(100);
- result.setSuccess(true);
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type)
- {
- // TODO Auto-generated method stub
-
- }
- }
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- // TODO Auto-generated method stub
- if(message.getMsgSubType()!= null && message.getMsgSubType().equals("EXCEPTION"))
- {
- throw new HelixException("Test Message handler exception, can ignore");
- }
- _handlersCreated++;
- return new TestMessageHandler(message, context);
- }
-
- @Override
- public String getMessageType()
- {
- // TODO Auto-generated method stub
- return "TestingMessageHandler";
- }
-
- @Override
- public void reset()
- {
- // TODO Auto-generated method stub
-
- }
- }
-
- class TestMessageHandlerFactory2 extends TestMessageHandlerFactory
- {
- @Override
- public String getMessageType()
- {
- // TODO Auto-generated method stub
- return "TestingMessageHandler2";
- }
-
- }
-
- class CancellableHandlerFactory implements MessageHandlerFactory
- {
-
- int _handlersCreated = 0;
- ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
- ConcurrentHashMap<String, String> _processingMsgIds = new ConcurrentHashMap<String, String>();
- ConcurrentHashMap<String, String> _timedOutMsgIds = new ConcurrentHashMap<String, String>();
- class CancellableHandler extends MessageHandler
- {
- public CancellableHandler(Message message, NotificationContext context)
- {
- super(message, context);
- // TODO Auto-generated constructor stub
- }
- public boolean _interrupted = false;
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- HelixTaskResult result = new HelixTaskResult();
- int sleepTimes = 15;
- if(_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
- {
- sleepTimes = 10;
- }
- _processingMsgIds.put(_message.getMsgId(), _message.getMsgId());
- try
- {
- for (int i = 0; i < sleepTimes; i++)
- {
- Thread.sleep(100);
- }
- }
- catch (InterruptedException e)
- {
- _interrupted = true;
- _timedOutMsgIds.put(_message.getMsgId(), "");
- result.setInterrupted(true);
- if(!_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
- {
- _message.getRecord().setSimpleField("Cancelcount", "1");
- }
- else
- {
- int c = Integer.parseInt( _message.getRecord().getSimpleField("Cancelcount"));
- _message.getRecord().setSimpleField("Cancelcount", ""+(c + 1));
- }
- throw e;
- }
- _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
- result.setSuccess(true);
- return result;
- }
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type)
- {
- // TODO Auto-generated method stub
- _message.getRecord().setSimpleField("exception", e.getMessage());
- }
- }
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- // TODO Auto-generated method stub
- _handlersCreated++;
- return new CancellableHandler(message, context);
- }
-
- @Override
- public String getMessageType()
- {
- // TODO Auto-generated method stub
- return "Cancellable";
- }
-
- @Override
- public void reset()
- {
- // TODO Auto-generated method stub
- _handlersCreated = 0;
- _processedMsgIds.clear();
- _processingMsgIds.clear();
- _timedOutMsgIds.clear();
- }
- }
-
- @Test ()
- public void testNormalMsgExecution() throws InterruptedException
- {
- System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
- HelixTaskExecutor executor = new HelixTaskExecutor();
- HelixManager manager = new MockClusterManager();
-
- TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
- executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
-
- NotificationContext changeContext = new NotificationContext(manager);
- List<Message> msgList = new ArrayList<Message>();
-
- int nMsgs1 = 5;
- for(int i = 0; i < nMsgs1; i++)
- {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId(manager.getSessionId());
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msg.setCorrelationId(UUID.randomUUID().toString());
- msgList.add(msg);
- }
-
-
- int nMsgs2 = 6;
- for(int i = 0; i < nMsgs2; i++)
- {
- Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId(manager.getSessionId());
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msg.setCorrelationId(UUID.randomUUID().toString());
- msgList.add(msg);
- }
- executor.onMessage("someInstance", msgList, changeContext);
-
- Thread.sleep(1000);
-
- AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
- AssertJUnit.assertTrue(factory2._processedMsgIds.size() == nMsgs2);
- AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
- AssertJUnit.assertTrue(factory2._handlersCreated == nMsgs2);
-
- for(Message record : msgList)
- {
- AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(record.getId()) || factory2._processedMsgIds.containsKey(record.getId()));
- AssertJUnit.assertFalse(factory._processedMsgIds.containsKey(record.getId()) && factory2._processedMsgIds.containsKey(record.getId()));
-
- }
- System.out.println("END TestCMTaskExecutor.testNormalMsgExecution()");
- }
-
- @Test ()
- public void testUnknownTypeMsgExecution() throws InterruptedException
- {
- HelixTaskExecutor executor = new HelixTaskExecutor();
- HelixManager manager = new MockClusterManager();
-
- TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
-
- NotificationContext changeContext = new NotificationContext(manager);
- List<Message> msgList = new ArrayList<Message>();
-
- int nMsgs1 = 5;
- for(int i = 0; i < nMsgs1; i++)
- {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId(manager.getSessionId());
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msgList.add(msg);
- }
-
-
- int nMsgs2 = 4;
- for(int i = 0; i < nMsgs2; i++)
- {
- Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId(manager.getSessionId());
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msgList.add(msg);
- }
- executor.onMessage("someInstance", msgList, changeContext);
-
- Thread.sleep(1000);
-
- AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
- AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
- AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
- AssertJUnit.assertTrue(factory2._handlersCreated == 0);
-
- for(Message message : msgList)
- {
- if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
- {
- AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
- }
- }
- }
-
-
- @Test ()
- public void testMsgSessionId() throws InterruptedException
- {
- HelixTaskExecutor executor = new HelixTaskExecutor();
- HelixManager manager = new MockClusterManager();
-
- TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
- executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
-
- NotificationContext changeContext = new NotificationContext(manager);
- List<Message> msgList = new ArrayList<Message>();
-
- int nMsgs1 = 5;
- for(int i = 0; i < nMsgs1; i++)
- {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("*");
- msg.setTgtName("");
- msgList.add(msg);
- }
-
-
- int nMsgs2 = 4;
- for(int i = 0; i < nMsgs2; i++)
- {
- Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("some other session id");
- msg.setTgtName("");
- msgList.add(msg);
- }
- executor.onMessage("someInstance", msgList, changeContext);
-
- Thread.sleep(1000);
-
- AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
- AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
- AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
- AssertJUnit.assertTrue(factory2._handlersCreated == 0);
-
- for(Message message : msgList)
- {
- if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
- {
- AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
- }
- }
- }
-
- @Test()
- public void testCreateHandlerException() throws InterruptedException
- {
- System.out.println("START TestCMTaskExecutor.testCreateHandlerException()");
- HelixTaskExecutor executor = new HelixTaskExecutor();
- HelixManager manager = new MockClusterManager();
-
- TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
-
- NotificationContext changeContext = new NotificationContext(manager);
- List<Message> msgList = new ArrayList<Message>();
-
- int nMsgs1 = 5;
- for(int i = 0; i < nMsgs1; i++)
- {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId(manager.getSessionId());
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msg.setCorrelationId(UUID.randomUUID().toString());
- msgList.add(msg);
- }
- Message exceptionMsg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- exceptionMsg.setTgtSessionId(manager.getSessionId());
- exceptionMsg.setMsgSubType("EXCEPTION");
- exceptionMsg.setTgtName("Localhost_1123");
- exceptionMsg.setSrcName("127.101.1.23_2234");
- exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
- msgList.add(exceptionMsg);
-
- executor.onMessage("someInstance", msgList, changeContext);
-
- Thread.sleep(1000);
-
- AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
- AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
-
- AssertJUnit.assertTrue(exceptionMsg.getMsgState() == MessageState.UNPROCESSABLE);
- System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
- }
-
- @Test ()
- public void testTaskCancellation() throws InterruptedException
- {
- HelixTaskExecutor executor = new HelixTaskExecutor();
- HelixManager manager = new MockClusterManager();
-
- CancellableHandlerFactory factory = new CancellableHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- NotificationContext changeContext = new NotificationContext(manager);
- List<Message> msgList = new ArrayList<Message>();
-
- int nMsgs1 = 0;
- for(int i = 0; i < nMsgs1; i++)
- {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("*");
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msgList.add(msg);
- }
-
- List<Message> msgListToCancel = new ArrayList<Message>();
- int nMsgs2 = 4;
- for(int i = 0; i < nMsgs2; i++)
- {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("*");
- msgList.add(msg);
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msgListToCancel.add(msg);
- }
- executor.onMessage("someInstance", msgList, changeContext);
- Thread.sleep(500);
- for(int i = 0; i < nMsgs2; i++)
- {
- executor.cancelTask(msgListToCancel.get(i), changeContext);
- }
- Thread.sleep(1500);
-
- AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
- AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1 + nMsgs2);
-
- AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2);
-
- for(Message message : msgList)
- {
- if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
- {
- AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
- }
- }
- }
-
-
- @Test ()
- public void testShutdown() throws InterruptedException
- {
- System.out.println("START TestCMTaskExecutor.testShutdown()");
- HelixTaskExecutor executor = new HelixTaskExecutor();
- HelixManager manager = new MockClusterManager();
-
- TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
- executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
-
- CancellableHandlerFactory factory3 = new CancellableHandlerFactory();
- executor.registerMessageHandlerFactory(factory3.getMessageType(), factory3);
- int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10;
- List<Message> msgList = new ArrayList<Message>();
-
- for(int i = 0; i < nMsg1; i++)
- {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("*");
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msgList.add(msg);
- }
-
- for(int i = 0; i < nMsg2; i++)
- {
- Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("*");
- msgList.add(msg);
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msgList.add(msg);
- }
-
- for(int i = 0; i < nMsg3; i++)
- {
- Message msg = new Message(factory3.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("*");
- msgList.add(msg);
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msgList.add(msg);
- }
- NotificationContext changeContext = new NotificationContext(manager);
- executor.onMessage("some", msgList, changeContext);
- Thread.sleep(500);
- for(ExecutorService svc : executor._threadpoolMap.values())
- {
- Assert.assertFalse(svc.isShutdown());
- }
- Assert.assertTrue(factory._processedMsgIds.size() > 0);
- executor.shutDown();
- for(ExecutorService svc : executor._threadpoolMap.values())
- {
- Assert.assertTrue(svc.isShutdown());
- }
- System.out.println("END TestCMTaskExecutor.testShutdown()");
- }
-
- @Test ()
- public void testRetryCount() throws InterruptedException
- {
- String p = "test_";
- System.out.println(p.substring(p.lastIndexOf('_')+1));
- HelixTaskExecutor executor = new HelixTaskExecutor();
- HelixManager manager = new MockClusterManager();
-
- CancellableHandlerFactory factory = new CancellableHandlerFactory();
- executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- NotificationContext changeContext = new NotificationContext(manager);
-
- List<Message> msgList = new ArrayList<Message>();
- int nMsgs2 = 4;
- // Test the case in which retry = 0
- for(int i = 0; i < nMsgs2; i++)
- {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("*");
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msg.setExecutionTimeout((i+1) * 600);
- msgList.add(msg);
- }
- executor.onMessage("someInstance", msgList, changeContext);
-
- Thread.sleep(4000);
-
- AssertJUnit.assertTrue(factory._handlersCreated == nMsgs2);
- AssertJUnit.assertEquals(factory._timedOutMsgIds.size() , 2);
- //AssertJUnit.assertFalse(msgList.get(0).getRecord().getSimpleFields().containsKey("TimeOut"));
- for(int i = 0; i<nMsgs2 - 2; i++)
- {
- if(msgList.get(i).getMsgType().equalsIgnoreCase(factory.getMessageType()))
- {
- AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields().containsKey("Cancelcount"));
- AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
- }
- }
- factory.reset();
- msgList.clear();
- // Test the case that the message are executed for the second time
- nMsgs2 = 4;
- for(int i = 0; i < nMsgs2; i++)
- {
- Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
- msg.setTgtSessionId("*");
- msg.setTgtName("Localhost_1123");
- msg.setSrcName("127.101.1.23_2234");
- msg.setExecutionTimeout((i+1) * 600);
- msg.setRetryCount(1);
- msgList.add(msg);
- }
- executor.onMessage("someInstance", msgList, changeContext);
- Thread.sleep(3500);
- AssertJUnit.assertEquals(factory._processedMsgIds.size(),3);
- AssertJUnit.assertTrue(msgList.get(0).getRecord().getSimpleField("Cancelcount").equals("2"));
- AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1"));
- AssertJUnit.assertEquals(factory._timedOutMsgIds.size(),2);
- AssertJUnit.assertTrue(executor._taskMap.size() == 0);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestResourceThreadpoolSize.java
deleted file mode 100644
index 3c62a1e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.linkedin.helix.messaging.handling;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.integration.ZkStandAloneCMTestBase;
-import com.linkedin.helix.messaging.DefaultMessagingService;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase
-{
- @Test
- public void TestThreadPoolSizeConfig()
- {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
- ConfigAccessor accessor = manager.getConfigAccessor();
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
- accessor.set(scope, HelixTaskExecutor.MAX_THREADS, ""+12);
-
- _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
- _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
-
- boolean result = ClusterStateVerifier.verifyByPolling(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
-
- long taskcount = 0;
- for (int i = 0; i < NODE_NR; i++)
- {
- instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-
- DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
- HelixTaskExecutor helixExecutor = svc.getExecutor();
- ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._threadpoolMap.get(MessageType.STATE_TRANSITION + "." + "NextDB"));
- Assert.assertEquals(12, executor.getMaximumPoolSize());
- taskcount += executor.getCompletedTaskCount();
- Assert.assertTrue(executor.getCompletedTaskCount() > 0);
- }
- Assert.assertEquals(taskcount, 64 * 4);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/consumer/ConsumerAdapter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/ConsumerAdapter.java b/helix-core/src/test/java/com/linkedin/helix/mock/consumer/ConsumerAdapter.java
deleted file mode 100644
index 7138171..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/ConsumerAdapter.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.consumer;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.ExternalView;
-
-public class ConsumerAdapter implements ExternalViewChangeListener
-{
-
- HelixManager relayHelixManager;
- DataAccessor relayClusterClient;
- private final ConcurrentHashMap<String, RelayConsumer> relayConsumers;
- private final ConcurrentHashMap<String, RelayConfig> relayConfigs;
- private static Logger logger = Logger.getLogger(ConsumerAdapter.class);
-
- public ConsumerAdapter(String instanceName, String zkServer,
- String clusterName) throws Exception
- {
- relayConsumers = new ConcurrentHashMap<String, RelayConsumer>();
- relayConfigs = new ConcurrentHashMap<String, RelayConfig>();
-
-// relayClusterManager = ClusterManagerFactory.getZKBasedManagerForSpectator(
-// clusterName, zkServer);
- relayHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
- null,
- InstanceType.SPECTATOR,
- zkServer);
-
- relayHelixManager.connect();
- relayHelixManager.addExternalViewChangeListener(this);
-
- }
-
- private RelayConfig getRelayConfig(ZNRecord externalView, String partition)
- {
- LinkedHashMap<String, String> relayList = (LinkedHashMap<String, String>) externalView
- .getMapField(partition);
-
- if (relayList == null)
- return null;
-
- return new RelayConfig(relayList);
-
- }
-
- @Override
- public void onExternalViewChange(List<ExternalView> externalViewList,
- NotificationContext changeContext)
- {
- logger.info("onExternalViewChange invoked");
-
- for (ExternalView subview : externalViewList)
- {
- Map<String, Map<String, String>> partitions = subview.getRecord().getMapFields();
-
- for (Entry<String, Map<String, String>> partitionConsumer : partitions
- .entrySet())
- {
- String partition = partitionConsumer.getKey();
- Map<String, String> relayList = partitionConsumer.getValue();
- RelayConfig relayConfig = new RelayConfig(relayList);
- relayConfigs.put(partition, relayConfig);
- RelayConsumer consumer = relayConsumers.get(partition);
- if (consumer != null)
- {
- consumer.setConfig(relayConfig);
- }
- }
- }
- }
-
- public void start()
- {
- // TODO Auto-generated method stub
-
- }
-
- public RelayConsumer getNewRelayConsumer(String dbName, String partition)
- throws Exception
- {
- RelayConsumer consumer = new RelayConsumer(null, partition);
-
- if (relayConsumers.putIfAbsent(partition, consumer) != null)
- {
- throw new Exception("Existing consumer");
- }
- logger.info("created new consumer for partition" + partition);
-
- RelayConfig relayConfig = relayConfigs.get(partition);
- if (relayConfig != null)
- {
- consumer.setConfig(relayConfig);
- }
-
- return consumer;
- }
-
- public void removeConsumer(String partition) throws Exception
- {
- if (relayConsumers.remove(partition) == null)
- {
- throw new Exception("Non Existing consumer for partition " + partition);
- }
- logger.info("Removed consumer for partition " + partition);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConfig.java b/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConfig.java
deleted file mode 100644
index f75c213..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.consumer;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.Map.Entry;
-
-public class RelayConfig
-{
- String partitionId;
- private ArrayList<String> masterRelays;
- private ArrayList<String> slaveRelays;
-
- public RelayConfig(Map<String, String> relayList)
- {
-
- masterRelays = new ArrayList<String>();
- slaveRelays = new ArrayList<String>();
-
- for (Entry<String, String> entry : relayList.entrySet())
- {
- String relayInstance = entry.getKey();
- String relayState = entry.getValue();
-
- if (relayState.equals("MASTER"))
- {
- masterRelays.add(relayInstance);
- } else if (relayState.equals("SLAVE"))
- {
- slaveRelays.add(relayInstance);
- }
- }
- }
-
- public String getClusterName()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public String getzkServer()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public String getMaster()
- {
- if (masterRelays.isEmpty())
- return null;
-
- return masterRelays.get(0);
- }
-
- public List<String> getRelays()
- {
- List<String> relays = new ArrayList<String>();
- relays.addAll(masterRelays);
- relays.addAll(slaveRelays);
-
- return relays;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConsumer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConsumer.java b/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConsumer.java
deleted file mode 100644
index 4f9b649..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/consumer/RelayConsumer.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.consumer;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixManager;
-
-public class RelayConsumer
-{
- HelixManager relayHelixManager;
- DataAccessor relayClusterClient;
- private final String partition;
- private RelayConfig currentRelay;
- private static Logger logger = Logger.getLogger(RelayConsumer.class);
-
- public RelayConsumer(RelayConfig relayConfig, String partition)
- {
- this.partition = partition;
- this.currentRelay = relayConfig;
- }
-
- public void stop()
- {
- if (currentRelay != null)
- {
- logger.info("RelayConsumer stopping listening from relay "
- + currentRelay.getMaster());
- }
- }
-
- public boolean isPointingTo(RelayConfig relayConfig)
- {
- return false;
- }
-
- public void start()
- {
- if (currentRelay != null)
- {
- logger.info("RelayConsumer starting listening from relay "
- + currentRelay.getMaster());
- }
- }
-
- /*
- * This is required at relayConsumer to reach out relays which are hosting
- * data for slaved partitions.
- */
- void getRelaysForPartition(Integer partitionId)
- {
-
- }
-
- public long getHwm()
- {
- // TODO this is supposed to return the last checkpoint from this
- // consumer
- return 0;
- }
-
- public String getPartition()
- {
- // TODO Auto-generated method stub
- return partition;
- }
-
- public Object getCurrentRelay()
- {
- // TODO Auto-generated method stub
- return currentRelay;
- }
-
- public synchronized void setConfig(RelayConfig relayConfig)
- {
- // TODO Auto-generated method stub
- currentRelay = relayConfig;
- logger.info("Setting config to relay " + relayConfig.getMaster());
- }
-
- public void flush()
- {
- assert (currentRelay != null);
- logger.info("Consumer flushing for partition " + partition);
- if (currentRelay == null || currentRelay.getRelays() == null)
- return;
-
- for (String relay : currentRelay.getRelays())
- {
- logger.info("Reading (flush) from relay " + relay);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/controller/ClusterController.java b/helix-core/src/test/java/com/linkedin/helix/mock/controller/ClusterController.java
deleted file mode 100644
index 0541f4b..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/controller/ClusterController.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package com.linkedin.helix.mock.controller;
-
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.participant.DistClusterControllerStateModelFactory;
-import com.linkedin.helix.participant.StateMachineEngine;
-
-public class ClusterController extends Thread
-{
- private static Logger LOG =
- Logger.getLogger(ClusterController.class);
-
- private final CountDownLatch _startCountDown = new CountDownLatch(1);
- private final CountDownLatch _stopCountDown = new CountDownLatch(1);
- private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
- private final String _controllerMode;
- private final String _zkAddr;
-
- private HelixManager _manager;
-
- public ClusterController(String clusterName, String controllerName, String zkAddr) throws Exception
- {
- this(clusterName, controllerName, zkAddr, HelixControllerMain.STANDALONE.toString());
- }
-
- public ClusterController(String clusterName,
- String controllerName,
- String zkAddr,
- String controllerMode) throws Exception
- {
- _controllerMode = controllerMode;
- _zkAddr = zkAddr;
-
- if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
- {
- _manager =
- HelixManagerFactory.getZKHelixManager(clusterName,
- controllerName,
- InstanceType.CONTROLLER,
- zkAddr);
- }
- else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
- {
- _manager =
- HelixManagerFactory.getZKHelixManager(clusterName,
- controllerName,
- InstanceType.CONTROLLER_PARTICIPANT,
- zkAddr);
-
- }
- else
- {
- throw new IllegalArgumentException("Controller mode: " + controllerMode
- + " NOT recoginized");
- }
- }
-
- public HelixManager getManager()
- {
- return _manager;
- }
-
- public void syncStop()
- {
- if (_manager == null)
- {
- LOG.warn("manager already stopped");
- return;
- }
-
- _stopCountDown.countDown();
- try
- {
- _waitStopFinishCountDown.await();
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public void syncStart()
- {
- // TODO: prevent start multiple times
-
- super.start();
- try
- {
- _startCountDown.await();
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- @Override
- public void run()
- {
- try
- {
- try
- {
- if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
- {
- _manager.connect();
- }
- else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
- {
- DistClusterControllerStateModelFactory stateModelFactory =
- new DistClusterControllerStateModelFactory(_zkAddr);
-
- StateMachineEngine stateMach = _manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
- _manager.connect();
- }
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- finally
- {
- _startCountDown.countDown();
- _stopCountDown.await();
- }
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- finally
- {
- synchronized (_manager)
- {
- _manager.disconnect();
- _manager = null;
- }
- _waitStopFinishCountDown.countDown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockController.java b/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockController.java
deleted file mode 100644
index d67bd40..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockController.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.controller;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState.IdealStateProperty;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.util.HelixUtil;
-
-public class MockController
-{
- private final ZkClient client;
- private final String srcName;
- private final String clusterName;
-
- public MockController(String src, String zkServer, String cluster)
- {
- srcName = src;
- clusterName = cluster;
- client = new ZkClient(zkServer);
- client.setZkSerializer(new ZNRecordSerializer());
- }
-
- void sendMessage(String msgId, String instanceName, String fromState,
- String toState, String partitionKey, int partitionId)
- throws InterruptedException, JsonGenerationException,
- JsonMappingException, IOException
- {
- Message message = new Message(MessageType.STATE_TRANSITION, msgId);
- message.setMsgId(msgId);
- message.setSrcName(srcName);
- message.setTgtName(instanceName);
- message.setMsgState(MessageState.NEW);
- message.setFromState(fromState);
- message.setToState(toState);
- // message.setPartitionId(partitionId);
- message.setPartitionName(partitionKey);
-
- String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/"
- + message.getId();
- ObjectMapper mapper = new ObjectMapper();
- StringWriter sw = new StringWriter();
- mapper.writeValueUsingView(sw, message, Message.class);
- System.out.println(sw.toString());
- client.delete(path);
-
- Thread.sleep(10000);
- ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName,
- instanceName));
- message.setTgtSessionId(record.getSimpleField(
- LiveInstanceProperty.SESSION_ID.toString()).toString());
- client.createPersistent(path, message);
- }
-
- public void createExternalView(List<String> instanceNames, int partitions,
- int replicas, String dbName, long randomSeed)
- {
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(client));
- Builder keyBuilder = accessor.keyBuilder();
-
- ExternalView externalView = new ExternalView(computeRoutingTable(instanceNames, partitions,
- replicas, dbName, randomSeed));
-
- accessor.setProperty(keyBuilder.externalView(dbName), externalView);
- }
-
- public ZNRecord computeRoutingTable(List<String> instanceNames,
- int partitions, int replicas, String dbName, long randomSeed)
- {
- assert (instanceNames.size() > replicas);
- Collections.sort(instanceNames);
-
- ZNRecord result = new ZNRecord(dbName);
-
- Map<String, Object> externalView = new TreeMap<String, Object>();
-
- List<Integer> partitionList = new ArrayList<Integer>(partitions);
- for (int i = 0; i < partitions; i++)
- {
- partitionList.add(new Integer(i));
- }
- Random rand = new Random(randomSeed);
- // Shuffle the partition list
- Collections.shuffle(partitionList, rand);
-
- for (int i = 0; i < partitionList.size(); i++)
- {
- int partitionId = partitionList.get(i);
- Map<String, String> partitionAssignment = new TreeMap<String, String>();
- int masterNode = i % instanceNames.size();
- // the first in the list is the node that contains the master
- partitionAssignment.put(instanceNames.get(masterNode), "MASTER");
-
- // for the jth replica, we put it on (masterNode + j) % nodes-th
- // node
- for (int j = 1; j <= replicas; j++)
- {
- partitionAssignment
- .put(instanceNames.get((masterNode + j) % instanceNames.size()),
- "SLAVE");
- }
- String partitionName = dbName + ".partition-" + partitionId;
- result.setMapField(partitionName, partitionAssignment);
- }
- result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "" + partitions);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockControllerProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockControllerProcess.java b/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockControllerProcess.java
deleted file mode 100644
index 5d2318b..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/controller/MockControllerProcess.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.controller;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-
-public class MockControllerProcess
-{
-
- /**
- * @param args
- * @throws IOException
- * @throws JsonMappingException
- * @throws JsonGenerationException
- * @throws InterruptedException
- */
- public static void main(String[] args) throws JsonGenerationException,
- JsonMappingException, InterruptedException, IOException
- {
-
- MockController storageController = new MockController("cm-instance-0",
- "localhost:2181", "storage-cluster");
- MockController relayController = new MockController("cm-instance-0",
- "localhost:2181", "relay-cluster");
-
- ArrayList<String> instanceNames = new ArrayList<String>();
- instanceNames.add("relay0");
- instanceNames.add("relay1");
- instanceNames.add("relay2");
- instanceNames.add("relay3");
- instanceNames.add("relay4");
-
- relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 0);
-
- // Messages to initiate offline->slave->master->slave transitions
-
- storageController.sendMessage("TestMessageId1", "localhost_8900",
- "Offline", "Slave", "EspressoDB.partition-0", 0);
- Thread.sleep(10000);
- storageController.sendMessage("TestMessageId2", "localhost_8900", "Slave",
- "Master", "EspressoDB.partition-0", 0);
- Thread.sleep(10000);
- storageController.sendMessage("TestMessageId3", "localhost_8900", "Master",
- "Slave", "EspressoDB.partition-0", 0);
- Thread.sleep(10000);
-
- // Change the external view to trigger the consumer to listen from
- // another relay
- relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 10);
-
- storageController.sendMessage("TestMessageId4", "localhost_8900", "Slave",
- "Offline", "EspressoDB.partition-0", 0);
- Thread.sleep(10000);
- }
-
-}