You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[38/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestGroupMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestGroupMessage.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestGroupMessage.java
deleted file mode 100644
index b880cdd..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestGroupMessage.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.List;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.mock.controller.ClusterController;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestGroupMessage extends ZkIntegrationTestBase
-{
- class TestZkChildListener implements IZkChildListener
- {
- int _maxNbOfChilds = 0;
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
- {
- System.out.println(parentPath + " has " + currentChilds.size() + " messages");
- if (currentChilds.size() > _maxNbOfChilds)
- {
- _maxNbOfChilds = currentChilds.size();
- }
- }
-
- }
-
- @Test
- public void testBasic() throws Exception
- {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- int n = 2;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 32, // partitions per resource
- n, // number of nodes
- 2, // replicas
- "MasterSlave",
- true); // do rebalance
-
- // enable group message
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
- IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
- idealState.setGroupMessageMode(true);
- accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
- // registry a message listener so we know how many message generated
- TestZkChildListener listener = new TestZkChildListener();
- _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
-
-
- ClusterController controller =
- new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- // start participants
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
- Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
-
- // clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- controller.syncStop();
- for (int i = 0; i < n; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-
- // a non-group-message run followed by a group-message-enabled run
- @Test
- public void testChangeGroupMessageMode() throws Exception
- {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- int n = 2;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 32, // partitions per resource
- n, // number of nodes
- 2, // replicas
- "MasterSlave",
- true); // do rebalance
-
- ClusterController controller =
- new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- // start participants
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // stop all participants
- Thread.sleep(1000);
- for (int i = 0; i < n; i++)
- {
- participants[i].syncStop();
- }
-
- // enable group message
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
- IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
- idealState.setGroupMessageMode(true);
- accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
- // registry a message listener so we know how many message generated
- TestZkChildListener listener = new TestZkChildListener();
- _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
-
- // restart all participants
- for (int i = 0; i < n; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
- }
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
- Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
-
-
- // clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- controller.syncStop();
- for (int i = 0; i < n; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestHelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestHelixCustomCodeRunner.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestHelixCustomCodeRunner.java
deleted file mode 100644
index 8c32cce..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestHelixCustomCodeRunner.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixConstants.ChangeType;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.NotificationContext.Type;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.mock.storage.MockJobIntf;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.participant.CustomCodeCallbackHandler;
-import com.linkedin.helix.participant.HelixCustomCodeRunner;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase
-{
- private final String _clusterName = "CLUSTER_" + getShortClassName();
- private final int _nodeNb = 5;
- private final int _startPort = 12918;
- private final MockCallback _callback = new MockCallback();
-
- class MockCallback implements CustomCodeCallbackHandler
- {
- boolean _isCallbackInvoked;
-
- @Override
- public void onCallback(NotificationContext context)
- {
- HelixManager manager = context.getManager();
- Type type = context.getType();
- _isCallbackInvoked = true;
-// System.out.println(type + ": TestCallback invoked on " + manager.getInstanceName());
- }
-
- }
-
- class MockJob implements MockJobIntf
- {
- @Override
- public void doPreConnectJob(HelixManager manager)
- {
- try
- {
- // delay the start of the 1st participant
- // so there will be a leadership transfer from localhost_12919 to 12918
- if (manager.getInstanceName().equals("localhost_12918"))
- {
- Thread.sleep(2000);
- }
-
- HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
- customCodeRunner.invoke(_callback)
- .on(ChangeType.LIVE_INSTANCE)
- .usingLeaderStandbyModel("TestParticLeader")
- .start();
- } catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- @Override
- public void doPostConnectJob(HelixManager manager)
- {
- // TODO Auto-generated method stub
-
- }
-
- }
-
- @Test
- public void testCustomCodeRunner() throws Exception
- {
- System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(_clusterName,
- ZK_ADDR,
- _startPort,
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resourceNb
- 5, // partitionNb
- _nodeNb, // nodesNb
- _nodeNb, // replica
- "MasterSlave",
- true);
-
- TestHelper.startController(_clusterName,
- "controller_0",
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
-
- MockParticipant[] partics = new MockParticipant[5];
- for (int i = 0; i < _nodeNb; i++)
- {
- String instanceName = "localhost_" + (_startPort + i);
-
- partics[i] = new MockParticipant(_clusterName, instanceName, ZK_ADDR,
- null, new MockJob());
- partics[i].syncStart();
-// new Thread(partics[i]).start();
- }
- boolean result = ClusterStateVerifier.verifyByPolling(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
- Assert.assertTrue(result);
-
- Thread.sleep(1000); // wait for the INIT type callback to finish
- Assert.assertTrue(_callback._isCallbackInvoked);
- _callback._isCallbackInvoked = false;
-
- // add a new live instance
- ZkClient zkClient = new ZkClient(ZK_ADDR);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- LiveInstance newLiveIns = new LiveInstance("newLiveInstance");
- newLiveIns.setHelixVersion("0.0.0");
- newLiveIns.setSessionId("randomSessionId");
- accessor.setProperty(keyBuilder.liveInstance("newLiveInstance"), newLiveIns);
-
- Thread.sleep(1000); // wait for the CALLBACK type callback to finish
- Assert.assertTrue(_callback._isCallbackInvoked);
-
- System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestHelixUsingDifferentParams.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestHelixUsingDifferentParams.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestHelixUsingDifferentParams.java
deleted file mode 100644
index c9337ce..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestHelixUsingDifferentParams.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-import org.testng.annotations.Test;
-
-public class TestHelixUsingDifferentParams extends ZkIntegrationTestBase
-{
- private static Logger LOG = Logger.getLogger(TestHelixUsingDifferentParams.class);
-
- @Test()
- public void testCMUsingDifferentParams() throws Exception
- {
- System.out.println("START " + getShortClassName() + " at "
- + new Date(System.currentTimeMillis()));
-
- int numResourceArray[] = new int[] { 1 }; // , 2}; // , 3, 6};
- int numPartitionsPerResourceArray[] = new int[] { 10 }; // , 20, 50, 100}; // ,
- // 1000};
- int numInstances[] = new int[] { 5 }; // , 10}; // , 50, 100, 1000};
- int replicas[] = new int[] { 2 }; // , 3}; //, 4, 5};
-
- for (int numResources : numResourceArray)
- {
- for (int numPartitionsPerResource : numPartitionsPerResourceArray)
- {
- for (int numInstance : numInstances)
- {
- for (int replica : replicas)
- {
- String uniqClusterName = "TestDiffParam_" + "rg" + numResources + "_p"
- + numPartitionsPerResource + "_n" + numInstance + "_r" + replica;
- System.out.println("START " + uniqClusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestDriver.setupCluster(uniqClusterName, ZK_ADDR, numResources,
- numPartitionsPerResource, numInstance, replica);
-
- for (int i = 0; i < numInstance; i++)
- {
- TestDriver.startDummyParticipant(uniqClusterName, i);
- }
-
- TestDriver.startController(uniqClusterName);
- TestDriver.verifyCluster(uniqClusterName, 1000, 50 * 1000);
- TestDriver.stopCluster(uniqClusterName);
-
- System.out.println("END " + uniqClusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
- }
- }
- }
-
- System.out
- .println("END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestMessagePartitionStateMismatch.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestMessagePartitionStateMismatch.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestMessagePartitionStateMismatch.java
deleted file mode 100644
index a304d3a..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestMessagePartitionStateMismatch.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-
-public class TestMessagePartitionStateMismatch extends ZkStandAloneCMTestBase
-{
- @Test
- public void testStateMismatch() throws InterruptedException
- {
- String controllerName = CONTROLLER_PREFIX + "_0";
-
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder kb = accessor.keyBuilder();
- ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
- Map<String, LiveInstance> liveinstanceMap = accessor.getChildValuesMap(accessor.keyBuilder().liveInstances());
-
- for(String instanceName : liveinstanceMap.keySet())
- {
- String sessionid = liveinstanceMap.get(instanceName).getSessionId();
- for(String partition : ev.getPartitionSet())
- {
- if(ev.getStateMap(partition).containsKey(instanceName))
- {
- String uuid = UUID.randomUUID().toString();
- Message message = new Message(MessageType.STATE_TRANSITION, uuid);
- boolean rand = new Random().nextInt(10) > 5;
- if(ev.getStateMap(partition).get(instanceName).equals("MASTER"))
- {
- message.setSrcName(manager.getInstanceName());
- message.setTgtName(instanceName);
- message.setMsgState(MessageState.NEW);
- message.setPartitionName(partition);
- message.setResourceName(TEST_DB);
- message.setFromState(rand ? "SLAVE" : "OFFLINE");
- message.setToState(rand ? "MASTER" : "SLAVE");
- message.setTgtSessionId(sessionid);
- message.setSrcSessionId(manager.getSessionId());
- message.setStateModelDef("MasterSlave");
- message.setStateModelFactoryName("DEFAULT");
- }
- else if (ev.getStateMap(partition).get(instanceName).equals("SLAVE"))
- {
- message.setSrcName(manager.getInstanceName());
- message.setTgtName(instanceName);
- message.setMsgState(MessageState.NEW);
- message.setPartitionName(partition);
- message.setResourceName(TEST_DB);
- message.setFromState(rand ? "MASTER" : "OFFLINE");
- message.setToState(rand ? "SLAVE" : "SLAVE");
- message.setTgtSessionId(sessionid);
- message.setSrcSessionId(manager.getSessionId());
- message.setStateModelDef("MasterSlave");
- message.setStateModelFactoryName("DEFAULT");
- }
- accessor.setProperty(accessor.keyBuilder().message(instanceName, message.getMsgId()), message);
- }
- }
- }
- Thread.sleep(3000);
- ExternalView ev2 = accessor.getProperty(kb.externalView(TEST_DB));
- Assert.assertTrue(ev.equals(ev2));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestMessageThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestMessageThrottle.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestMessageThrottle.java
deleted file mode 100644
index cf21d51..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestMessageThrottle.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.manager.zk.ZKHelixAdmin;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.model.ClusterConstraints;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintType;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
-
-public class TestMessageThrottle extends ZkIntegrationTestBase
-{
- @Test()
- public void testMessageThrottle() throws Exception
- {
- // Logger.getRootLogger().setLevel(Level.INFO);
-
- String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
- // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
- // port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave",
- true); // do rebalance
-
- // setup message constraint
- // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,CONSTRAINT_VALUE=1";
- HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
- Map<String, String> constraints = new TreeMap<String, String>();
- constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
- // constraints.put("TRANSITION", "OFFLINE-SLAVE");
- constraints.put("CONSTRAINT_VALUE", "1");
- constraints.put("INSTANCE", ".*");
- admin.addMessageConstraint(clusterName, "constraint1", constraints);
-
-
- final ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-
- // make sure we never see more than 1 state transition message for each participant
- final AtomicBoolean success = new AtomicBoolean(true);
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
- String msgPath =
- PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, instanceName);
-
- _gZkClient.subscribeChildChanges(msgPath, new IZkChildListener()
- {
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
- {
- if (currentChilds != null && currentChilds.size() > 1)
- {
- List<ZNRecord> records = accessor.getBaseDataAccessor().getChildren(parentPath, null, 0);
- int transitionMsgCount = 0;
- for (ZNRecord record : records)
- {
- Message msg = new Message(record);
- if(msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()))
- {
- transitionMsgCount++;
- }
- }
-
- if (transitionMsgCount > 1)
- {
- success.set(false);
- Assert.fail("Should not see more than 1 message");
- }
- }
-
-
- }
- });
- }
-
- TestHelper.startController(clusterName,
- "controller_0",
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- // start participants
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- Assert.assertTrue(success.get());
-
-
- // clean up
- for (int i = 0; i < 5; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestMessagingService.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestMessagingService.java
deleted file mode 100644
index 264bc2f..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestMessagingService.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.HashSet;
-import java.util.UUID;
-
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.Criteria.DataSource;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-
-public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServerCheck
-{
- public static class TestMessagingHandlerFactory implements
- MessageHandlerFactory
- {
- public static HashSet<String> _processedMsgIds = new HashSet<String>();
-
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- return new TestMessagingHandler(message, context);
- }
-
- @Override
- public String getMessageType()
- {
- return "TestExtensibility";
- }
-
- @Override
- public void reset()
- {
- // TODO Auto-generated method stub
-
- }
-
- public static class TestMessagingHandler extends MessageHandler
- {
- public TestMessagingHandler(Message message, NotificationContext context)
- {
- super(message, context);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- HelixTaskResult result = new HelixTaskResult();
- result.setSuccess(true);
- Thread.sleep(1000);
- System.out.println("TestMessagingHandler " + _message.getMsgId());
- _processedMsgIds.add(_message.getRecord().getSimpleField(
- "TestMessagingPara"));
- result.getTaskResultMap().put("ReplyMessage", "TestReplyMessage");
- return result;
- }
-
-
- @Override
- public void onError( Exception e, ErrorCode code, ErrorType type)
- {
- // TODO Auto-generated method stub
-
- }
- }
- }
-
- @Test()
- public void TestMessageSimpleSend() throws Exception
- {
- String hostSrc = "localhost_" + START_PORT;
- String hostDest = "localhost_" + (START_PORT + 1);
-
- TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- _startCMResultMap.get(hostDest)._manager.getMessagingService()
- .registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- String msgId = new UUID(123, 456).toString();
- Message msg = new Message(factory.getMessageType(),msgId);
- msg.setMsgId(msgId);
- msg.setSrcName(hostSrc);
- msg.setTgtSessionId("*");
- msg.setMsgState(MessageState.NEW);
- String para = "Testing messaging para";
- msg.getRecord().setSimpleField("TestMessagingPara", para);
-
- Criteria cr = new Criteria();
- cr.setInstanceName(hostDest);
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
-
- int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
- AssertJUnit.assertTrue(nMsgs == 1);
- Thread.sleep(2500);
- // Thread.currentThread().join();
- AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds
- .contains(para));
-
- cr = new Criteria();
- cr.setInstanceName(hostDest);
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setDataSource(DataSource.IDEALSTATES);
-
- nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
- AssertJUnit.assertTrue(nMsgs == 1);
- Thread.sleep(2500);
- // Thread.currentThread().join();
- AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds
- .contains(para));
-
- }
-
- public static class MockAsyncCallback extends AsyncCallback
- {
-
- public MockAsyncCallback()
- {
- }
-
- @Override
- public void onTimeOut()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void onReplyMessage(Message message)
- {
- // TODO Auto-generated method stub
-
- }
-
- }
-
- public static class TestAsyncCallback extends AsyncCallback
- {
- public TestAsyncCallback(long timeout)
- {
- super(timeout);
- }
-
- static HashSet<String> _replyedMessageContents = new HashSet<String>();
- public boolean timeout = false;
-
- @Override
- public void onTimeOut()
- {
- timeout = true;
- }
-
- @Override
- public void onReplyMessage(Message message)
- {
- // TODO Auto-generated method stub
- System.out.println("OnreplyMessage: "
- + message.getRecord()
- .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
- .get("ReplyMessage"));
- if(message.getRecord()
- .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
- .get("ReplyMessage") == null)
- {
- int x = 0;
- x++;
- }
- _replyedMessageContents.add(message.getRecord()
- .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
- .get("ReplyMessage"));
- }
-
- }
-
- @Test()
- public void TestMessageSimpleSendReceiveAsync() throws Exception
- {
- String hostSrc = "localhost_" + START_PORT;
- String hostDest = "localhost_" + (START_PORT + 1);
-
- TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- _startCMResultMap.get(hostDest)._manager.getMessagingService()
- .registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- String msgId = new UUID(123, 456).toString();
- Message msg = new Message(factory.getMessageType(),msgId);
- msg.setMsgId(msgId);
- msg.setSrcName(hostSrc);
-
- msg.setTgtSessionId("*");
- msg.setMsgState(MessageState.NEW);
- String para = "Testing messaging para";
- msg.getRecord().setSimpleField("TestMessagingPara", para);
-
- Criteria cr = new Criteria();
- cr.setInstanceName(hostDest);
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
-
- TestAsyncCallback callback = new TestAsyncCallback(60000);
-
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
-
- Thread.sleep(2000);
- // Thread.currentThread().join();
- AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents
- .contains("TestReplyMessage"));
- AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
-
- TestAsyncCallback callback2 = new TestAsyncCallback(500);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
-
- Thread.sleep(3000);
- // Thread.currentThread().join();
- AssertJUnit.assertTrue(callback2.isTimedOut());
-
- cr = new Criteria();
- cr.setInstanceName(hostDest);
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setDataSource(DataSource.IDEALSTATES);
-
- callback = new TestAsyncCallback(60000);
-
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
-
- Thread.sleep(2000);
- // Thread.currentThread().join();
- AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents
- .contains("TestReplyMessage"));
- AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
-
- callback2 = new TestAsyncCallback(500);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
-
- Thread.sleep(3000);
- // Thread.currentThread().join();
- AssertJUnit.assertTrue(callback2.isTimedOut());
-
- }
-
- @Test()
- public void TestBlockingSendReceive() throws Exception
- {
- String hostSrc = "localhost_" + START_PORT;
- String hostDest = "localhost_" + (START_PORT + 1);
-
- TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- _startCMResultMap.get(hostDest)._manager.getMessagingService()
- .registerMessageHandlerFactory(factory.getMessageType(), factory);
-
- String msgId = new UUID(123, 456).toString();
- Message msg = new Message(factory.getMessageType(),msgId);
- msg.setMsgId(msgId);
- msg.setSrcName(hostSrc);
-
- msg.setTgtSessionId("*");
- msg.setMsgState(MessageState.NEW);
- String para = "Testing messaging para";
- msg.getRecord().setSimpleField("TestMessagingPara", para);
-
- Criteria cr = new Criteria();
- cr.setInstanceName(hostDest);
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
-
- AsyncCallback asyncCallback = new MockAsyncCallback();
- int messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, asyncCallback, 60000);
-
- AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
- .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
- .get("ReplyMessage").equals("TestReplyMessage"));
- AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
-
-
- AsyncCallback asyncCallback2 = new MockAsyncCallback();
- messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, asyncCallback2, 500);
- AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
-
- }
-
- @Test()
- public void TestMultiMessageCriteria() throws Exception
- {
- String hostSrc = "localhost_" + START_PORT;
-
- for (int i = 0; i < NODE_NR; i++)
- {
- TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService()
- .registerMessageHandlerFactory(factory.getMessageType(), factory);
- }
- String msgId = new UUID(123, 456).toString();
- Message msg = new Message(
- new TestMessagingHandlerFactory().getMessageType(),msgId);
- msg.setMsgId(msgId);
- msg.setSrcName(hostSrc);
-
- msg.setTgtSessionId("*");
- msg.setMsgState(MessageState.NEW);
- String para = "Testing messaging para";
- msg.getRecord().setSimpleField("TestMessagingPara", para);
-
- Criteria cr = new Criteria();
- cr.setInstanceName("%");
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- AsyncCallback callback1 = new MockAsyncCallback();
- int messageSent1 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback1, 2000);
-
- AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
- .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
- .get("ReplyMessage").equals("TestReplyMessage"));
- AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
-
- AsyncCallback callback2 = new MockAsyncCallback();
- int messageSent2 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback2, 500);
- AssertJUnit.assertTrue(callback2.isTimedOut());
-
- cr.setPartition("TestDB_17");
- AsyncCallback callback3 = new MockAsyncCallback();
- int messageSent3 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback3, 2000);
- AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
-
-
- cr.setPartition("TestDB_15");
- AsyncCallback callback4 = new MockAsyncCallback();
- int messageSent4 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback4, 2000);
- AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
-
- cr.setPartitionState("SLAVE");
- AsyncCallback callback5 = new MockAsyncCallback();
- int messageSent5 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback5, 2000);
- AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
-
- cr.setDataSource(DataSource.IDEALSTATES);
- AsyncCallback callback6 = new MockAsyncCallback();
- int messageSent6 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback6, 2000);
- AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
- }
-
- @Test()
- public void sendSelfMsg()
- {
- String hostSrc = "localhost_" + START_PORT;
-
- for (int i = 0; i < NODE_NR; i++)
- {
- TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService()
- .registerMessageHandlerFactory(factory.getMessageType(), factory);
- }
- String msgId = new UUID(123, 456).toString();
- Message msg = new Message(
- new TestMessagingHandlerFactory().getMessageType(),msgId);
- msg.setMsgId(msgId);
- msg.setSrcName(hostSrc);
-
- msg.setTgtSessionId("*");
- msg.setMsgState(MessageState.NEW);
- String para = "Testing messaging para";
- msg.getRecord().setSimpleField("TestMessagingPara", para);
-
- Criteria cr = new Criteria();
- cr.setInstanceName("%");
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setSelfExcluded(false);
- AsyncCallback callback1 = new MockAsyncCallback();
- int messageSent1 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback1, 3000);
-
- AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
- AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
- .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
- .get("ReplyMessage").equals("TestReplyMessage"));
- }
-
- @Test()
- public void TestControllerMessage() throws Exception
- {
- String hostSrc = "localhost_" + START_PORT;
-
- for (int i = 0; i < NODE_NR; i++)
- {
- TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService()
- .registerMessageHandlerFactory(factory.getMessageType(), factory);
- }
- String msgId = new UUID(123, 456).toString();
- Message msg = new Message(MessageType.CONTROLLER_MSG,msgId);
- msg.setMsgId(msgId);
- msg.setSrcName(hostSrc);
-
- msg.setTgtSessionId("*");
- msg.setMsgState(MessageState.NEW);
- String para = "Testing messaging para";
- msg.getRecord().setSimpleField("TestMessagingPara", para);
-
- Criteria cr = new Criteria();
- cr.setInstanceName("*");
- cr.setRecipientInstanceType(InstanceType.CONTROLLER);
- cr.setSessionSpecific(false);
-
- AsyncCallback callback1 = new MockAsyncCallback();
- int messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback1, 2000);
-
- AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
- .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
- .get("ControllerResult").indexOf(hostSrc) != -1);
- AssertJUnit.assertTrue(callback1.getMessageReplied().size() == 1);
-
- msgId = UUID.randomUUID().toString();
- msg.setMsgId(msgId);
- cr.setPartition("TestDB_17");
- AsyncCallback callback2 = new MockAsyncCallback();
- messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback2, 2000);
- AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
- .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
- .get("ControllerResult").indexOf(hostSrc) != -1);
-
- AssertJUnit.assertTrue(callback2.getMessageReplied().size() == 1);
-
- msgId = UUID.randomUUID().toString();
- msg.setMsgId(msgId);
- cr.setPartitionState("SLAVE");
- AsyncCallback callback3 = new MockAsyncCallback();
- messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
- .sendAndWait(cr, msg, callback3, 2000);
- AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
- .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
- .get("ControllerResult").indexOf(hostSrc) != -1);
-
- AssertJUnit.assertTrue(callback3.getMessageReplied().size() == 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestNonOfflineInitState.java
deleted file mode 100644
index bf3d6c3..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestNonOfflineInitState.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockParticipant.MockBootstrapModelFactory;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestNonOfflineInitState extends ZkIntegrationTestBase
-{
- private static Logger LOG = Logger.getLogger(TestNonOfflineInitState.class);
-
- @Test
- public void testNonOfflineInitState() throws Exception
- {
- System.out.println("START testNonOfflineInitState at "
- + new Date(System.currentTimeMillis()));
- String clusterName = getShortClassName();
-
- setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 1, // replicas
- "Bootstrap",
- true); // do rebalance
-
- TestHelper.startController(clusterName,
- "controller_0",
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
-
- // start participants
- MockParticipant[] participants = new MockParticipant[5];
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
-
- // add a state model with non-OFFLINE initial state
- StateMachineEngine stateMach = participants[i].getManager().getStateMachineEngine();
- MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
- stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
-
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- System.out.println("END testNonOfflineInitState at "
- + new Date(System.currentTimeMillis()));
- }
-
- private static void setupCluster(String clusterName,
- String ZkAddr,
- int startPort,
- String participantNamePrefix,
- String resourceNamePrefix,
- int resourceNb,
- int partitionNb,
- int nodesNb,
- int replica,
- String stateModelDef,
- boolean doRebalance) throws Exception
- {
- ZkClient zkClient = new ZkClient(ZkAddr);
- if (zkClient.exists("/" + clusterName))
- {
- LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
- zkClient.deleteRecursive("/" + clusterName);
- }
-
- ClusterSetup setupTool = new ClusterSetup(ZkAddr);
- setupTool.addCluster(clusterName, true);
- setupTool.addStateModelDef(clusterName,
- "Bootstrap",
- TestHelper.generateStateModelDefForBootstrap());
-
- for (int i = 0; i < nodesNb; i++)
- {
- int port = startPort + i;
- setupTool.addInstanceToCluster(clusterName, participantNamePrefix + ":" + port);
- }
-
- for (int i = 0; i < resourceNb; i++)
- {
- String dbName = resourceNamePrefix + i;
- setupTool.addResourceToCluster(clusterName, dbName, partitionNb, stateModelDef);
- if (doRebalance)
- {
- setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
- }
- }
- zkClient.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestNullReplica.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestNullReplica.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestNullReplica.java
deleted file mode 100644
index fff4bff..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestNullReplica.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.mock.controller.ClusterController;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestNullReplica extends ZkIntegrationTestBase
-{
-
- @Test
- public void testNullReplica() throws Exception
- {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- MockParticipant[] participants = new MockParticipant[5];
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave",
- true); // do rebalance
- // set replica in ideal state to null
- String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, "TestDB0");
- ZNRecord idealState = _gZkClient.readData(idealStatePath);
- idealState.getSimpleFields().remove(IdealState.IdealStateProperty.REPLICAS.toString());
- _gZkClient.writeData(idealStatePath, idealState);
-
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- // start participants
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // clean up
- for (int i = 0; i < 5; i++)
- {
- participants[i].syncStop();
- }
-
- Thread.sleep(2000);
- controller.syncStop();
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestParticipantErrorMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestParticipantErrorMessage.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestParticipantErrorMessage.java
deleted file mode 100644
index 55cff9a..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestParticipantErrorMessage.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.UUID;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory;
-import com.linkedin.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase
-{
- @Test()
- public void TestParticipantErrorMessageSend()
- {
- String participant1 = "localhost_" + START_PORT;
- String participant2 = "localhost_" + (START_PORT + 1);
-
- Message errorMessage1
- = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
- errorMessage1.setTgtSessionId("*");
- errorMessage1.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_INSTANCE.toString());
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
- recipientCriteria.setSessionSpecific(false);
- _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria, errorMessage1);
-
- Message errorMessage2
- = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
- errorMessage2.setTgtSessionId("*");
- errorMessage2.setResourceName("TestDB");
- errorMessage2.setPartitionName("TestDB_14");
- errorMessage2.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_PARTITION.toString());
- Criteria recipientCriteria2 = new Criteria();
- recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
- recipientCriteria2.setSessionSpecific(false);
- _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2, errorMessage2);
-
- try
- {
- Thread.sleep(1500);
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
- Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder();
- ExternalView externalView = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
-
- for(String partitionName : externalView.getRecord().getMapFields().keySet())
- {
- for(String hostName : externalView.getRecord().getMapField(partitionName).keySet())
- {
- if(hostName.equals(participant1))
- {
- Assert.assertTrue(externalView.getRecord().getMapField(partitionName).get(hostName).equalsIgnoreCase("OFFLINE"));
- }
- }
- }
- Assert.assertTrue(externalView.getRecord().getMapField("TestDB_14").get(participant2).equalsIgnoreCase("OFFLINE"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestParticipantNameCollision.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestParticipantNameCollision.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestParticipantNameCollision.java
deleted file mode 100644
index 190637a..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestParticipantNameCollision.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-
-public class TestParticipantNameCollision extends ZkStandAloneCMTestBase
-{
- private static Logger logger = Logger.getLogger(TestParticipantNameCollision.class);
-
- @Test()
- public void testParticiptantNameCollision() throws Exception
- {
- logger.info("RUN TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
-
- StartCMResult result = null;
- for (int i = 0; i < 1; i++)
- {
- String instanceName = "localhost_" + (START_PORT + i);
- try
- {
- // the call fails on getClusterManagerForParticipant()
- // no threads start
- result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
-
- Thread.sleep(30000);
- TestHelper.verifyWithTimeout("verifyNotConnected", result._manager);
-
- logger.info("STOP TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestPauseSignal.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestPauseSignal.java
deleted file mode 100644
index 8afe53e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestPauseSignal.java
+++ /dev/null
@@ -1,121 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.mock.controller.ClusterController;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.model.PauseSignal;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestPauseSignal extends ZkIntegrationTestBase
-{
- @Test()
- public void testPauseSignal() throws Exception
- {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- final String clusterName = className + "_" + methodName;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- MockParticipant[] participants = new MockParticipant[5];
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave",
- true); // do rebalance
-
- // start controller
- ClusterController controller =
- new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- // start participants
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // pause the cluster and make sure pause is persistent
- ZkClient zkClient = new ZkClient(ZK_ADDR);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- final HelixDataAccessor tmpAccessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
-
- String cmd = "-zkSvr " + ZK_ADDR + " -enableCluster " + clusterName + " false";
- ClusterSetup.processCommandLineArgs(cmd.split(" "));
-
- tmpAccessor.setProperty(tmpAccessor.keyBuilder().pause(), new PauseSignal("pause"));
- zkClient.close();
-
- // wait for controller to be signaled by pause
- Thread.sleep(1000);
-
- // add a new resource group
- ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
- setupTool.addResourceToCluster(clusterName, "TestDB1", 10, "MasterSlave");
- setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
-
- // make sure TestDB1 external view is empty
- TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
- 1000,
- clusterName,
- "TestDB1",
- TestHelper.<String> setOf("localhost_12918",
- "localhost_12919",
- "localhost_12920",
- "localhost_12921",
- "localhost_12922"),
- ZK_ADDR);
-
- // resume controller
- final HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-
- cmd = "-zkSvr " + ZK_ADDR + " -enableCluster " + clusterName + " true";
- ClusterSetup.processCommandLineArgs(cmd.split(" "));
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // clean up
- for (int i = 0; i < 5; i++)
- {
- participants[i].syncStop();
- }
-
- Thread.sleep(2000);
- controller.syncStop();
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestRenamePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestRenamePartition.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestRenamePartition.java
deleted file mode 100644
index c35ec2a..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestRenamePartition.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
-
-public class TestRenamePartition extends ZkIntegrationTestBase
-{
- @Test()
- public void testRenamePartitionAutoIS() throws Exception
- {
- String clusterName = "CLUSTER_" + getShortClassName() + "_auto";
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave", true); // do rebalance
-
-
- startAndVerify(clusterName);
-
- // rename partition name TestDB0_0 tp TestDB0_100
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
-
- List<String> prioList = idealState.getRecord().getListFields().remove("TestDB0_0");
- idealState.getRecord().getListFields().put("TestDB0_100", prioList);
- accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
- boolean result = ClusterStateVerifier.verifyByPolling(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
- Assert.assertTrue(result);
-
- System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- }
-
- @Test()
- public void testRenamePartitionCustomIS() throws Exception
- {
-
- String clusterName = "CLUSTER_" + getShortClassName() + "_custom";
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave", false); // do rebalance
-
- // calculate idealState
- List<String> instanceNames = Arrays.asList("localhost_12918", "localhost_12919", "localhost_12920",
- "localhost_12921", "localhost_12922");
- ZNRecord destIS = IdealStateCalculatorForStorageNode.calculateIdealState(instanceNames,
- 10, 3-1, "TestDB0", "MASTER", "SLAVE");
- IdealState idealState = new IdealState(destIS);
- idealState.setIdealStateMode("CUSTOMIZED");
- idealState.setReplicas("3");
- idealState.setStateModelDefRef("MasterSlave");
-
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
- startAndVerify(clusterName);
-
- Map<String, String> stateMap = idealState.getRecord().getMapFields().remove("TestDB0_0");
- idealState.getRecord().getMapFields().put("TestDB0_100", stateMap);
- accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
- boolean result = ClusterStateVerifier.verifyByPolling(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
- Assert.assertTrue(result);
- System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- }
-
- private void startAndVerify(String clusterName) throws Exception
- {
- MockParticipant[] participants = new MockParticipant[5];
-
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
-
- // start participants
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
-// new Thread(participants[i]).start();
- }
-
- boolean result = ClusterStateVerifier.verifyByPolling(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
- Assert.assertTrue(result);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestResetInstance.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestResetInstance.java
deleted file mode 100644
index 94d2ea5..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestResetInstance.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.mock.controller.ClusterController;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockParticipant.ErrTransition;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestResetInstance extends ZkIntegrationTestBase
-{
-
- @Test
- public void testResetInstance() throws Exception
- {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- final int n = 5;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- n, // number of nodes
- 3, // replicas
- "MasterSlave",
- true); // do rebalance
-
- // start controller
- ClusterController controller =
- new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
- {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
-
- // start mock participants
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- if (i == 0)
- {
- participants[i] =
- new MockParticipant(clusterName,
- instanceName,
- ZK_ADDR,
- new ErrTransition(errPartitions));
- }
- else
- {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
- }
- participants[i].syncStart();
- }
-
- // verify cluster
- Map<String, Map<String, String>> errStateMap =
- new HashMap<String, Map<String, String>>();
- errStateMap.put("TestDB0", new HashMap<String, String>());
- errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
- errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
- boolean result =
- ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName,
- errStateMap)));
- Assert.assertTrue(result, "Cluster verification fails");
-
- // reset node "localhost_12918"
- participants[0].setTransition(null);
- String command =
- "--zkSvr " + ZK_ADDR + " --resetInstance " + clusterName
- + " localhost_12918";
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-
- result =
- ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName)));
- Assert.assertTrue(result, "Cluster verification fails");
-
- // clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- controller.syncStop();
- for (int i = 0; i < 5; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestResetPartitionState.java
deleted file mode 100644
index eac7285..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestResetPartitionState.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.mock.controller.ClusterController;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockParticipant.ErrTransition;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestResetPartitionState extends ZkIntegrationTestBase
-{
- int _errToOfflineInvoked = 0;
-
- class ErrTransitionWithResetCnt extends ErrTransition
- {
- public ErrTransitionWithResetCnt(Map<String, Set<String>> errPartitions)
- {
- super(errPartitions);
- }
-
- @Override
- public void doTransition(Message message, NotificationContext context)
- {
- // System.err.println("doReset() invoked");
- super.doTransition(message, context);
- String fromState = message.getFromState();
- String toState = message.getToState();
- if (fromState.equals("ERROR") && toState.equals("OFFLINE"))
- {
- _errToOfflineInvoked++;
- }
- }
-
- }
-
- @Test()
- public void testResetPartitionState() throws Exception
- {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- final int n = 5;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- n, // number of nodes
- 3, // replicas
- "MasterSlave",
- true); // do rebalance
-
- // start controller
- ClusterController controller =
- new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
- {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
-
- // start mock participants
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- if (i == 0)
- {
- participants[i] =
- new MockParticipant(clusterName,
- instanceName,
- ZK_ADDR,
- new ErrTransition(errPartitions));
- }
- else
- {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
- }
- participants[i].syncStart();
- }
-
- // verify cluster
- Map<String, Map<String, String>> errStateMap =
- new HashMap<String, Map<String, String>>();
- errStateMap.put("TestDB0", new HashMap<String, String>());
- errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
- errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
- boolean result =
- ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName,
- errStateMap)));
- Assert.assertTrue(result, "Cluster verification fails");
-
- // reset a non-exist partition, should throw exception
- try
- {
- String command =
- "--zkSvr " + ZK_ADDR + " --resetPartition " + clusterName
- + " localhost_12918 TestDB0 TestDB0_nonExist";
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
- Assert.fail("Should throw exception on reset a non-exist partition");
- }
- catch (Exception e)
- {
- // OK
- }
-
- // reset one error partition
- errPartitions.remove("SLAVE-MASTER");
- participants[0].setTransition(new ErrTransitionWithResetCnt(errPartitions));
- clearStatusUpdate(clusterName, "localhost_12918", "TestDB0", "TestDB0_4");
- _errToOfflineInvoked = 0;
- String command =
- "--zkSvr " + ZK_ADDR + " --resetPartition " + clusterName
- + " localhost_12918 TestDB0 TestDB0_4";
-
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
- Thread.sleep(200); // wait reset to be done
- try
- {
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
- Assert.fail("Should throw exception on reset a partition not in ERROR state");
- }
- catch (Exception e)
- {
- // OK
- }
-
- errStateMap.get("TestDB0").remove("TestDB0_4");
- result =
- ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName,
- errStateMap)));
- Assert.assertTrue(result, "Cluster verification fails");
- Assert.assertEquals(_errToOfflineInvoked, 1);
-
- // reset the other error partition
- participants[0].setTransition(new ErrTransitionWithResetCnt(null));
- clearStatusUpdate(clusterName, "localhost_12918", "TestDB0", "TestDB0_8");
-
- command =
- "--zkSvr " + ZK_ADDR + " --resetPartition " + clusterName
- + " localhost_12918 TestDB0 TestDB0_8";
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-
- result =
- ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result, "Cluster verification fails");
- Assert.assertEquals(_errToOfflineInvoked, 2, "Should reset 2 partitions");
-
- // clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- controller.syncStop();
- for (int i = 0; i < 5; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
- private void clearStatusUpdate(String clusterName,
- String instance,
- String resource,
- String partition)
- {
- // clear status update for error partition so verify() will not fail on old
- // errors
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instance));
- accessor.removeProperty(keyBuilder.stateTransitionStatus(instance,
- liveInstance.getSessionId(),
- resource,
- partition));
-
- }
- // TODO: throw exception in reset()
-}