You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC
[34/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
new file mode 100644
index 0000000..9bf79b8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockJobIntf;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.CustomCodeCallbackHandler;
+import org.apache.helix.participant.HelixCustomCodeRunner;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase
+{
+ private final String _clusterName = "CLUSTER_" + getShortClassName();
+ private final int _nodeNb = 5;
+ private final int _startPort = 12918;
+ private final MockCallback _callback = new MockCallback();
+
+ class MockCallback implements CustomCodeCallbackHandler
+ {
+ boolean _isCallbackInvoked;
+
+ @Override
+ public void onCallback(NotificationContext context)
+ {
+ HelixManager manager = context.getManager();
+ Type type = context.getType();
+ _isCallbackInvoked = true;
+// System.out.println(type + ": TestCallback invoked on " + manager.getInstanceName());
+ }
+
+ }
+
+ class MockJob implements MockJobIntf
+ {
+ @Override
+ public void doPreConnectJob(HelixManager manager)
+ {
+ try
+ {
+ // delay the start of the 1st participant
+ // so there will be a leadership transfer from localhost_12919 to 12918
+ if (manager.getInstanceName().equals("localhost_12918"))
+ {
+ Thread.sleep(2000);
+ }
+
+ HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
+ customCodeRunner.invoke(_callback)
+ .on(ChangeType.LIVE_INSTANCE)
+ .usingLeaderStandbyModel("TestParticLeader")
+ .start();
+ } catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void doPostConnectJob(HelixManager manager)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ @Test
+ public void testCustomCodeRunner() throws Exception
+ {
+ System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(_clusterName,
+ ZK_ADDR,
+ _startPort,
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resourceNb
+ 5, // partitionNb
+ _nodeNb, // nodesNb
+ _nodeNb, // replica
+ "MasterSlave",
+ true);
+
+ TestHelper.startController(_clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+
+ MockParticipant[] partics = new MockParticipant[5];
+ for (int i = 0; i < _nodeNb; i++)
+ {
+ String instanceName = "localhost_" + (_startPort + i);
+
+ partics[i] = new MockParticipant(_clusterName, instanceName, ZK_ADDR,
+ null, new MockJob());
+ partics[i].syncStart();
+// new Thread(partics[i]).start();
+ }
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
+ Assert.assertTrue(result);
+
+ Thread.sleep(1000); // wait for the INIT type callback to finish
+ Assert.assertTrue(_callback._isCallbackInvoked);
+ _callback._isCallbackInvoked = false;
+
+ // add a new live instance
+ ZkClient zkClient = new ZkClient(ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ LiveInstance newLiveIns = new LiveInstance("newLiveInstance");
+ newLiveIns.setHelixVersion("0.0.0");
+ newLiveIns.setSessionId("randomSessionId");
+ accessor.setProperty(keyBuilder.liveInstance("newLiveInstance"), newLiveIns);
+
+ Thread.sleep(1000); // wait for the CALLBACK type callback to finish
+ Assert.assertTrue(_callback._isCallbackInvoked);
+
+ System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestHelixUsingDifferentParams.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixUsingDifferentParams.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixUsingDifferentParams.java
new file mode 100644
index 0000000..dd74b68
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixUsingDifferentParams.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+import org.testng.annotations.Test;
+
+public class TestHelixUsingDifferentParams extends ZkIntegrationTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestHelixUsingDifferentParams.class);
+
+ @Test()
+ public void testCMUsingDifferentParams() throws Exception
+ {
+ System.out.println("START " + getShortClassName() + " at "
+ + new Date(System.currentTimeMillis()));
+
+ int numResourceArray[] = new int[] { 1 }; // , 2}; // , 3, 6};
+ int numPartitionsPerResourceArray[] = new int[] { 10 }; // , 20, 50, 100}; // ,
+ // 1000};
+ int numInstances[] = new int[] { 5 }; // , 10}; // , 50, 100, 1000};
+ int replicas[] = new int[] { 2 }; // , 3}; //, 4, 5};
+
+ for (int numResources : numResourceArray)
+ {
+ for (int numPartitionsPerResource : numPartitionsPerResourceArray)
+ {
+ for (int numInstance : numInstances)
+ {
+ for (int replica : replicas)
+ {
+ String uniqClusterName = "TestDiffParam_" + "rg" + numResources + "_p"
+ + numPartitionsPerResource + "_n" + numInstance + "_r" + replica;
+ System.out.println("START " + uniqClusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestDriver.setupCluster(uniqClusterName, ZK_ADDR, numResources,
+ numPartitionsPerResource, numInstance, replica);
+
+ for (int i = 0; i < numInstance; i++)
+ {
+ TestDriver.startDummyParticipant(uniqClusterName, i);
+ }
+
+ TestDriver.startController(uniqClusterName);
+ TestDriver.verifyCluster(uniqClusterName, 1000, 50 * 1000);
+ TestDriver.stopCluster(uniqClusterName);
+
+ System.out.println("END " + uniqClusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+ }
+ }
+ }
+
+ System.out
+ .println("END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
new file mode 100644
index 0000000..60cfe7b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
@@ -0,0 +1,78 @@
+package org.apache.helix.integration;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestMessagePartitionStateMismatch extends ZkStandAloneCMTestBase
+{
+ @Test
+ public void testStateMismatch() throws InterruptedException
+ {
+ String controllerName = CONTROLLER_PREFIX + "_0";
+
+ HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder kb = accessor.keyBuilder();
+ ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
+ Map<String, LiveInstance> liveinstanceMap = accessor.getChildValuesMap(accessor.keyBuilder().liveInstances());
+
+ for(String instanceName : liveinstanceMap.keySet())
+ {
+ String sessionid = liveinstanceMap.get(instanceName).getSessionId();
+ for(String partition : ev.getPartitionSet())
+ {
+ if(ev.getStateMap(partition).containsKey(instanceName))
+ {
+ String uuid = UUID.randomUUID().toString();
+ Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+ boolean rand = new Random().nextInt(10) > 5;
+ if(ev.getStateMap(partition).get(instanceName).equals("MASTER"))
+ {
+ message.setSrcName(manager.getInstanceName());
+ message.setTgtName(instanceName);
+ message.setMsgState(MessageState.NEW);
+ message.setPartitionName(partition);
+ message.setResourceName(TEST_DB);
+ message.setFromState(rand ? "SLAVE" : "OFFLINE");
+ message.setToState(rand ? "MASTER" : "SLAVE");
+ message.setTgtSessionId(sessionid);
+ message.setSrcSessionId(manager.getSessionId());
+ message.setStateModelDef("MasterSlave");
+ message.setStateModelFactoryName("DEFAULT");
+ }
+ else if (ev.getStateMap(partition).get(instanceName).equals("SLAVE"))
+ {
+ message.setSrcName(manager.getInstanceName());
+ message.setTgtName(instanceName);
+ message.setMsgState(MessageState.NEW);
+ message.setPartitionName(partition);
+ message.setResourceName(TEST_DB);
+ message.setFromState(rand ? "MASTER" : "OFFLINE");
+ message.setToState(rand ? "SLAVE" : "SLAVE");
+ message.setTgtSessionId(sessionid);
+ message.setSrcSessionId(manager.getSessionId());
+ message.setStateModelDef("MasterSlave");
+ message.setStateModelFactoryName("DEFAULT");
+ }
+ accessor.setProperty(accessor.keyBuilder().message(instanceName, message.getMsgId()), message);
+ }
+ }
+ }
+ Thread.sleep(3000);
+ ExternalView ev2 = accessor.getProperty(kb.externalView(TEST_DB));
+ Assert.assertTrue(ev.equals(ev2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
new file mode 100644
index 0000000..45e948f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
@@ -0,0 +1,144 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestMessageThrottle extends ZkIntegrationTestBase
+{
+ @Test()
+ public void testMessageThrottle() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+ // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
+ // port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // setup message constraint
+ // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,CONSTRAINT_VALUE=1";
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ Map<String, String> constraints = new TreeMap<String, String>();
+ constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+ // constraints.put("TRANSITION", "OFFLINE-SLAVE");
+ constraints.put("CONSTRAINT_VALUE", "1");
+ constraints.put("INSTANCE", ".*");
+ admin.addMessageConstraint(clusterName, "constraint1", constraints);
+
+
+ final ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+
+ // make sure we never see more than 1 state transition message for each participant
+ final AtomicBoolean success = new AtomicBoolean(true);
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+ String msgPath =
+ PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, instanceName);
+
+ _gZkClient.subscribeChildChanges(msgPath, new IZkChildListener()
+ {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+ {
+ if (currentChilds != null && currentChilds.size() > 1)
+ {
+ List<ZNRecord> records = accessor.getBaseDataAccessor().getChildren(parentPath, null, 0);
+ int transitionMsgCount = 0;
+ for (ZNRecord record : records)
+ {
+ Message msg = new Message(record);
+ if(msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()))
+ {
+ transitionMsgCount++;
+ }
+ }
+
+ if (transitionMsgCount > 1)
+ {
+ success.set(false);
+ Assert.fail("Should not see more than 1 message");
+ }
+ }
+
+
+ }
+ });
+ }
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ Assert.assertTrue(success.get());
+
+
+ // clean up
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
new file mode 100644
index 0000000..3dfb55d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -0,0 +1,478 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.HashSet;
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.Criteria.DataSource;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+ public static class TestMessagingHandlerFactory implements
+ MessageHandlerFactory
+ {
+ public static HashSet<String> _processedMsgIds = new HashSet<String>();
+
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ return new TestMessagingHandler(message, context);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return "TestExtensibility";
+ }
+
+ @Override
+ public void reset()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public static class TestMessagingHandler extends MessageHandler
+ {
+ public TestMessagingHandler(Message message, NotificationContext context)
+ {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ Thread.sleep(1000);
+ System.out.println("TestMessagingHandler " + _message.getMsgId());
+ _processedMsgIds.add(_message.getRecord().getSimpleField(
+ "TestMessagingPara"));
+ result.getTaskResultMap().put("ReplyMessage", "TestReplyMessage");
+ return result;
+ }
+
+
+ @Override
+ public void onError( Exception e, ErrorCode code, ErrorType type)
+ {
+ // TODO Auto-generated method stub
+
+ }
+ }
+ }
+
+ @Test()
+ public void TestMessageSimpleSend() throws Exception
+ {
+ String hostSrc = "localhost_" + START_PORT;
+ String hostDest = "localhost_" + (START_PORT + 1);
+
+ TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ String msgId = new UUID(123, 456).toString();
+ Message msg = new Message(factory.getMessageType(),msgId);
+ msg.setMsgId(msgId);
+ msg.setSrcName(hostSrc);
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+ String para = "Testing messaging para";
+ msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+ Criteria cr = new Criteria();
+ cr.setInstanceName(hostDest);
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+
+ int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ AssertJUnit.assertTrue(nMsgs == 1);
+ Thread.sleep(2500);
+ // Thread.currentThread().join();
+ AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds
+ .contains(para));
+
+ cr = new Criteria();
+ cr.setInstanceName(hostDest);
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setDataSource(DataSource.IDEALSTATES);
+
+ nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ AssertJUnit.assertTrue(nMsgs == 1);
+ Thread.sleep(2500);
+ // Thread.currentThread().join();
+ AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds
+ .contains(para));
+
+ }
+
+ public static class MockAsyncCallback extends AsyncCallback
+ {
+
+ public MockAsyncCallback()
+ {
+ }
+
+ @Override
+ public void onTimeOut()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onReplyMessage(Message message)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ public static class TestAsyncCallback extends AsyncCallback
+ {
+ public TestAsyncCallback(long timeout)
+ {
+ super(timeout);
+ }
+
+ static HashSet<String> _replyedMessageContents = new HashSet<String>();
+ public boolean timeout = false;
+
+ @Override
+ public void onTimeOut()
+ {
+ timeout = true;
+ }
+
+ @Override
+ public void onReplyMessage(Message message)
+ {
+ // TODO Auto-generated method stub
+ System.out.println("OnreplyMessage: "
+ + message.getRecord()
+ .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+ .get("ReplyMessage"));
+ if(message.getRecord()
+ .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+ .get("ReplyMessage") == null)
+ {
+ int x = 0;
+ x++;
+ }
+ _replyedMessageContents.add(message.getRecord()
+ .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+ .get("ReplyMessage"));
+ }
+
+ }
+
+ @Test()
+ public void TestMessageSimpleSendReceiveAsync() throws Exception
+ {
+ String hostSrc = "localhost_" + START_PORT;
+ String hostDest = "localhost_" + (START_PORT + 1);
+
+ TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ String msgId = new UUID(123, 456).toString();
+ Message msg = new Message(factory.getMessageType(),msgId);
+ msg.setMsgId(msgId);
+ msg.setSrcName(hostSrc);
+
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+ String para = "Testing messaging para";
+ msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+ Criteria cr = new Criteria();
+ cr.setInstanceName(hostDest);
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+
+ TestAsyncCallback callback = new TestAsyncCallback(60000);
+
+ _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+
+ Thread.sleep(2000);
+ // Thread.currentThread().join();
+ AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents
+ .contains("TestReplyMessage"));
+ AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
+
+ TestAsyncCallback callback2 = new TestAsyncCallback(500);
+ _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+
+ Thread.sleep(3000);
+ // Thread.currentThread().join();
+ AssertJUnit.assertTrue(callback2.isTimedOut());
+
+ cr = new Criteria();
+ cr.setInstanceName(hostDest);
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setDataSource(DataSource.IDEALSTATES);
+
+ callback = new TestAsyncCallback(60000);
+
+ _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+
+ Thread.sleep(2000);
+ // Thread.currentThread().join();
+ AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents
+ .contains("TestReplyMessage"));
+ AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
+
+ callback2 = new TestAsyncCallback(500);
+ _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+
+ Thread.sleep(3000);
+ // Thread.currentThread().join();
+ AssertJUnit.assertTrue(callback2.isTimedOut());
+
+ }
+
+ @Test()
+ public void TestBlockingSendReceive() throws Exception
+ {
+ String hostSrc = "localhost_" + START_PORT;
+ String hostDest = "localhost_" + (START_PORT + 1);
+
+ TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+
+ String msgId = new UUID(123, 456).toString();
+ Message msg = new Message(factory.getMessageType(),msgId);
+ msg.setMsgId(msgId);
+ msg.setSrcName(hostSrc);
+
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+ String para = "Testing messaging para";
+ msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+ Criteria cr = new Criteria();
+ cr.setInstanceName(hostDest);
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+
+ AsyncCallback asyncCallback = new MockAsyncCallback();
+ int messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, asyncCallback, 60000);
+
+ AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
+ .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+ .get("ReplyMessage").equals("TestReplyMessage"));
+ AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
+
+
+ AsyncCallback asyncCallback2 = new MockAsyncCallback();
+ messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, asyncCallback2, 500);
+ AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
+
+ }
+
+ @Test()
+ public void TestMultiMessageCriteria() throws Exception
+ {
+ String hostSrc = "localhost_" + START_PORT;
+
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+ }
+ String msgId = new UUID(123, 456).toString();
+ Message msg = new Message(
+ new TestMessagingHandlerFactory().getMessageType(),msgId);
+ msg.setMsgId(msgId);
+ msg.setSrcName(hostSrc);
+
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+ String para = "Testing messaging para";
+ msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+ Criteria cr = new Criteria();
+ cr.setInstanceName("%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ AsyncCallback callback1 = new MockAsyncCallback();
+ int messageSent1 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback1, 2000);
+
+ AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
+ .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+ .get("ReplyMessage").equals("TestReplyMessage"));
+ AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
+
+ AsyncCallback callback2 = new MockAsyncCallback();
+ int messageSent2 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback2, 500);
+ AssertJUnit.assertTrue(callback2.isTimedOut());
+
+ cr.setPartition("TestDB_17");
+ AsyncCallback callback3 = new MockAsyncCallback();
+ int messageSent3 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback3, 2000);
+ AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
+
+
+ cr.setPartition("TestDB_15");
+ AsyncCallback callback4 = new MockAsyncCallback();
+ int messageSent4 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback4, 2000);
+ AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
+
+ cr.setPartitionState("SLAVE");
+ AsyncCallback callback5 = new MockAsyncCallback();
+ int messageSent5 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback5, 2000);
+ AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
+
+ cr.setDataSource(DataSource.IDEALSTATES);
+ AsyncCallback callback6 = new MockAsyncCallback();
+ int messageSent6 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback6, 2000);
+ AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
+ }
+
+ @Test()
+ public void sendSelfMsg()
+ {
+ String hostSrc = "localhost_" + START_PORT;
+
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+ }
+ String msgId = new UUID(123, 456).toString();
+ Message msg = new Message(
+ new TestMessagingHandlerFactory().getMessageType(),msgId);
+ msg.setMsgId(msgId);
+ msg.setSrcName(hostSrc);
+
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+ String para = "Testing messaging para";
+ msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+ Criteria cr = new Criteria();
+ cr.setInstanceName("%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setSelfExcluded(false);
+ AsyncCallback callback1 = new MockAsyncCallback();
+ int messageSent1 = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback1, 3000);
+
+ AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
+ AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
+ .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+ .get("ReplyMessage").equals("TestReplyMessage"));
+ }
+
+ @Test()
+ public void TestControllerMessage() throws Exception
+ {
+ String hostSrc = "localhost_" + START_PORT;
+
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+ }
+ String msgId = new UUID(123, 456).toString();
+ Message msg = new Message(MessageType.CONTROLLER_MSG,msgId);
+ msg.setMsgId(msgId);
+ msg.setSrcName(hostSrc);
+
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+ String para = "Testing messaging para";
+ msg.getRecord().setSimpleField("TestMessagingPara", para);
+
+ Criteria cr = new Criteria();
+ cr.setInstanceName("*");
+ cr.setRecipientInstanceType(InstanceType.CONTROLLER);
+ cr.setSessionSpecific(false);
+
+ AsyncCallback callback1 = new MockAsyncCallback();
+ int messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback1, 2000);
+
+ AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
+ .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+ .get("ControllerResult").indexOf(hostSrc) != -1);
+ AssertJUnit.assertTrue(callback1.getMessageReplied().size() == 1);
+
+ msgId = UUID.randomUUID().toString();
+ msg.setMsgId(msgId);
+ cr.setPartition("TestDB_17");
+ AsyncCallback callback2 = new MockAsyncCallback();
+ messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback2, 2000);
+ AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
+ .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+ .get("ControllerResult").indexOf(hostSrc) != -1);
+
+ AssertJUnit.assertTrue(callback2.getMessageReplied().size() == 1);
+
+ msgId = UUID.randomUUID().toString();
+ msg.setMsgId(msgId);
+ cr.setPartitionState("SLAVE");
+ AsyncCallback callback3 = new MockAsyncCallback();
+ messagesSent = _startCMResultMap.get(hostSrc)._manager.getMessagingService()
+ .sendAndWait(cr, msg, callback3, 2000);
+ AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
+ .getMapField(Message.Attributes.MESSAGE_RESULT.toString())
+ .get("ControllerResult").indexOf(hostSrc) != -1);
+
+ AssertJUnit.assertTrue(callback3.getMessageReplied().size() == 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
new file mode 100644
index 0000000..636dd93
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -0,0 +1,113 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.MockBootstrapModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestNonOfflineInitState extends ZkIntegrationTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestNonOfflineInitState.class);
+
+ @Test
+ public void testNonOfflineInitState() throws Exception
+ {
+ System.out.println("START testNonOfflineInitState at "
+ + new Date(System.currentTimeMillis()));
+ String clusterName = getShortClassName();
+
+ setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 1, // replicas
+ "Bootstrap",
+ true); // do rebalance
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[5];
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+
+ // add a state model with non-OFFLINE initial state
+ StateMachineEngine stateMach = participants[i].getManager().getStateMachineEngine();
+ MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
+ stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
+
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ System.out.println("END testNonOfflineInitState at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ private static void setupCluster(String clusterName,
+ String ZkAddr,
+ int startPort,
+ String participantNamePrefix,
+ String resourceNamePrefix,
+ int resourceNb,
+ int partitionNb,
+ int nodesNb,
+ int replica,
+ String stateModelDef,
+ boolean doRebalance) throws Exception
+ {
+ ZkClient zkClient = new ZkClient(ZkAddr);
+ if (zkClient.exists("/" + clusterName))
+ {
+ LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
+ zkClient.deleteRecursive("/" + clusterName);
+ }
+
+ ClusterSetup setupTool = new ClusterSetup(ZkAddr);
+ setupTool.addCluster(clusterName, true);
+ setupTool.addStateModelDef(clusterName,
+ "Bootstrap",
+ TestHelper.generateStateModelDefForBootstrap());
+
+ for (int i = 0; i < nodesNb; i++)
+ {
+ int port = startPort + i;
+ setupTool.addInstanceToCluster(clusterName, participantNamePrefix + ":" + port);
+ }
+
+ for (int i = 0; i < resourceNb; i++)
+ {
+ String dbName = resourceNamePrefix + i;
+ setupTool.addResourceToCluster(clusterName, dbName, partitionNb, stateModelDef);
+ if (doRebalance)
+ {
+ setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+ }
+ }
+ zkClient.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
new file mode 100644
index 0000000..33dd30c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
@@ -0,0 +1,78 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestNullReplica extends ZkIntegrationTestBase
+{
+
+ @Test
+ public void testNullReplica() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[5];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+ // set replica in ideal state to null
+ String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, "TestDB0");
+ ZNRecord idealState = _gZkClient.readData(idealStatePath);
+ idealState.getSimpleFields().remove(IdealState.IdealStateProperty.REPLICAS.toString());
+ _gZkClient.writeData(idealStatePath, idealState);
+
+ ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // clean up
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ Thread.sleep(2000);
+ controller.syncStop();
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
new file mode 100644
index 0000000..02103e3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
@@ -0,0 +1,76 @@
+package org.apache.helix.integration;
+
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory;
+import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase
+{
+ @Test()
+ public void TestParticipantErrorMessageSend()
+ {
+ String participant1 = "localhost_" + START_PORT;
+ String participant2 = "localhost_" + (START_PORT + 1);
+
+ Message errorMessage1
+ = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
+ errorMessage1.setTgtSessionId("*");
+ errorMessage1.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_INSTANCE.toString());
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
+ recipientCriteria.setSessionSpecific(false);
+ _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria, errorMessage1);
+
+ Message errorMessage2
+ = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
+ errorMessage2.setTgtSessionId("*");
+ errorMessage2.setResourceName("TestDB");
+ errorMessage2.setPartitionName("TestDB_14");
+ errorMessage2.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_PARTITION.toString());
+ Criteria recipientCriteria2 = new Criteria();
+ recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
+ recipientCriteria2.setSessionSpecific(false);
+ _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2, errorMessage2);
+
+ try
+ {
+ Thread.sleep(1500);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+ Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder();
+ ExternalView externalView = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
+
+ for(String partitionName : externalView.getRecord().getMapFields().keySet())
+ {
+ for(String hostName : externalView.getRecord().getMapField(partitionName).keySet())
+ {
+ if(hostName.equals(participant1))
+ {
+ Assert.assertTrue(externalView.getRecord().getMapField(partitionName).get(hostName).equalsIgnoreCase("OFFLINE"));
+ }
+ }
+ }
+ Assert.assertTrue(externalView.getRecord().getMapField("TestDB_14").get(participant2).equalsIgnoreCase("OFFLINE"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
new file mode 100644
index 0000000..9715380
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.log4j.Logger;
+import org.testng.annotations.Test;
+
+
+public class TestParticipantNameCollision extends ZkStandAloneCMTestBase
+{
+ private static Logger logger = Logger.getLogger(TestParticipantNameCollision.class);
+
+ @Test()
+ public void testParticiptantNameCollision() throws Exception
+ {
+ logger.info("RUN TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
+
+ StartCMResult result = null;
+ for (int i = 0; i < 1; i++)
+ {
+ String instanceName = "localhost_" + (START_PORT + i);
+ try
+ {
+ // the call fails on getClusterManagerForParticipant()
+ // no threads start
+ result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+
+ Thread.sleep(30000);
+ TestHelper.verifyWithTimeout("verifyNotConnected", result._manager);
+
+ logger.info("STOP TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
new file mode 100644
index 0000000..0c00912
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
@@ -0,0 +1,121 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestPauseSignal extends ZkIntegrationTestBase
+{
+ @Test()
+ public void testPauseSignal() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ final String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[5];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // pause the cluster and make sure pause is persistent
+ ZkClient zkClient = new ZkClient(ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ final HelixDataAccessor tmpAccessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+
+ String cmd = "-zkSvr " + ZK_ADDR + " -enableCluster " + clusterName + " false";
+ ClusterSetup.processCommandLineArgs(cmd.split(" "));
+
+ tmpAccessor.setProperty(tmpAccessor.keyBuilder().pause(), new PauseSignal("pause"));
+ zkClient.close();
+
+ // wait for controller to be signaled by pause
+ Thread.sleep(1000);
+
+ // add a new resource group
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ setupTool.addResourceToCluster(clusterName, "TestDB1", 10, "MasterSlave");
+ setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
+
+ // make sure TestDB1 external view is empty
+ TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
+ 1000,
+ clusterName,
+ "TestDB1",
+ TestHelper.<String> setOf("localhost_12918",
+ "localhost_12919",
+ "localhost_12920",
+ "localhost_12921",
+ "localhost_12922"),
+ ZK_ADDR);
+
+ // resume controller
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+
+ cmd = "-zkSvr " + ZK_ADDR + " -enableCluster " + clusterName + " true";
+ ClusterSetup.processCommandLineArgs(cmd.split(" "));
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // clean up
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ Thread.sleep(2000);
+ controller.syncStop();
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
new file mode 100644
index 0000000..feb487b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestRenamePartition extends ZkIntegrationTestBase
+{
+ @Test()
+ public void testRenamePartitionAutoIS() throws Exception
+ {
+ String clusterName = "CLUSTER_" + getShortClassName() + "_auto";
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+
+
+ startAndVerify(clusterName);
+
+ // rename partition name TestDB0_0 tp TestDB0_100
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+
+ List<String> prioList = idealState.getRecord().getListFields().remove("TestDB0_0");
+ idealState.getRecord().getListFields().put("TestDB0_100", prioList);
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ }
+
+ @Test()
+ public void testRenamePartitionCustomIS() throws Exception
+ {
+
+ String clusterName = "CLUSTER_" + getShortClassName() + "_custom";
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave", false); // do rebalance
+
+ // calculate idealState
+ List<String> instanceNames = Arrays.asList("localhost_12918", "localhost_12919", "localhost_12920",
+ "localhost_12921", "localhost_12922");
+ ZNRecord destIS = IdealStateCalculatorForStorageNode.calculateIdealState(instanceNames,
+ 10, 3-1, "TestDB0", "MASTER", "SLAVE");
+ IdealState idealState = new IdealState(destIS);
+ idealState.setIdealStateMode("CUSTOMIZED");
+ idealState.setReplicas("3");
+ idealState.setStateModelDefRef("MasterSlave");
+
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ startAndVerify(clusterName);
+
+ Map<String, String> stateMap = idealState.getRecord().getMapFields().remove("TestDB0_0");
+ idealState.getRecord().getMapFields().put("TestDB0_100", stateMap);
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ }
+
+ private void startAndVerify(String clusterName) throws Exception
+ {
+ MockParticipant[] participants = new MockParticipant[5];
+
+ TestHelper.startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+// new Thread(participants[i]).start();
+ }
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
new file mode 100644
index 0000000..24ee3a3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
@@ -0,0 +1,113 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestResetInstance extends ZkIntegrationTestBase
+{
+
+ @Test
+ public void testResetInstance() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
+ {
+ {
+ put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
+ }
+ };
+
+ // start mock participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ if (i == 0)
+ {
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new ErrTransition(errPartitions));
+ }
+ else
+ {
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ }
+ participants[i].syncStart();
+ }
+
+ // verify cluster
+ Map<String, Map<String, String>> errStateMap =
+ new HashMap<String, Map<String, String>>();
+ errStateMap.put("TestDB0", new HashMap<String, String>());
+ errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+ errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStateMap)));
+ Assert.assertTrue(result, "Cluster verification fails");
+
+ // reset node "localhost_12918"
+ participants[0].setTransition(null);
+ String command =
+ "--zkSvr " + ZK_ADDR + " --resetInstance " + clusterName
+ + " localhost_12918";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName)));
+ Assert.assertTrue(result, "Cluster verification fails");
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
new file mode 100644
index 0000000..4860778
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -0,0 +1,224 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestResetPartitionState extends ZkIntegrationTestBase
+{
+ int _errToOfflineInvoked = 0;
+
+ class ErrTransitionWithResetCnt extends ErrTransition
+ {
+ public ErrTransitionWithResetCnt(Map<String, Set<String>> errPartitions)
+ {
+ super(errPartitions);
+ }
+
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ // System.err.println("doReset() invoked");
+ super.doTransition(message, context);
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ if (fromState.equals("ERROR") && toState.equals("OFFLINE"))
+ {
+ _errToOfflineInvoked++;
+ }
+ }
+
+ }
+
+ @Test()
+ public void testResetPartitionState() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
+ {
+ {
+ put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
+ }
+ };
+
+ // start mock participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ if (i == 0)
+ {
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new ErrTransition(errPartitions));
+ }
+ else
+ {
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ }
+ participants[i].syncStart();
+ }
+
+ // verify cluster
+ Map<String, Map<String, String>> errStateMap =
+ new HashMap<String, Map<String, String>>();
+ errStateMap.put("TestDB0", new HashMap<String, String>());
+ errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+ errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStateMap)));
+ Assert.assertTrue(result, "Cluster verification fails");
+
+ // reset a non-exist partition, should throw exception
+ try
+ {
+ String command =
+ "--zkSvr " + ZK_ADDR + " --resetPartition " + clusterName
+ + " localhost_12918 TestDB0 TestDB0_nonExist";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ Assert.fail("Should throw exception on reset a non-exist partition");
+ }
+ catch (Exception e)
+ {
+ // OK
+ }
+
+ // reset one error partition
+ errPartitions.remove("SLAVE-MASTER");
+ participants[0].setTransition(new ErrTransitionWithResetCnt(errPartitions));
+ clearStatusUpdate(clusterName, "localhost_12918", "TestDB0", "TestDB0_4");
+ _errToOfflineInvoked = 0;
+ String command =
+ "--zkSvr " + ZK_ADDR + " --resetPartition " + clusterName
+ + " localhost_12918 TestDB0 TestDB0_4";
+
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ Thread.sleep(200); // wait reset to be done
+ try
+ {
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ Assert.fail("Should throw exception on reset a partition not in ERROR state");
+ }
+ catch (Exception e)
+ {
+ // OK
+ }
+
+ errStateMap.get("TestDB0").remove("TestDB0_4");
+ result =
+ ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStateMap)));
+ Assert.assertTrue(result, "Cluster verification fails");
+ Assert.assertEquals(_errToOfflineInvoked, 1);
+
+ // reset the other error partition
+ participants[0].setTransition(new ErrTransitionWithResetCnt(null));
+ clearStatusUpdate(clusterName, "localhost_12918", "TestDB0", "TestDB0_8");
+
+ command =
+ "--zkSvr " + ZK_ADDR + " --resetPartition " + clusterName
+ + " localhost_12918 TestDB0 TestDB0_8";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result, "Cluster verification fails");
+ Assert.assertEquals(_errToOfflineInvoked, 2, "Should reset 2 partitions");
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ private void clearStatusUpdate(String clusterName,
+ String instance,
+ String resource,
+ String partition)
+ {
+ // clear status update for error partition so verify() will not fail on old
+ // errors
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instance));
+ accessor.removeProperty(keyBuilder.stateTransitionStatus(instance,
+ liveInstance.getSessionId(),
+ resource,
+ partition));
+
+ }
+ // TODO: throw exception in reset()
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
new file mode 100644
index 0000000..1d199d3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
@@ -0,0 +1,112 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestResetResource extends ZkIntegrationTestBase
+{
+ @Test
+ public void testResetNode() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
+ {
+ {
+ put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
+ }
+ };
+
+ // start mock participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ if (i == 0)
+ {
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new ErrTransition(errPartitions));
+ }
+ else
+ {
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ }
+ participants[i].syncStart();
+ }
+
+ // verify cluster
+ Map<String, Map<String, String>> errStateMap =
+ new HashMap<String, Map<String, String>>();
+ errStateMap.put("TestDB0", new HashMap<String, String>());
+ errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+ errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStateMap)));
+ Assert.assertTrue(result, "Cluster verification fails");
+
+ // reset resource "TestDB0"
+ participants[0].setTransition(null);
+ String command =
+ "--zkSvr " + ZK_ADDR + " --resetResource " + clusterName
+ + " TestDB0";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName)));
+ Assert.assertTrue(result, "Cluster verification fails");
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
new file mode 100644
index 0000000..7df10dc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
@@ -0,0 +1,115 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestRestartParticipant extends ZkIntegrationTestBase
+{
+ public class KillOtherTransition extends MockTransition
+ {
+ final AtomicReference<MockParticipant> _other;
+
+ public KillOtherTransition(MockParticipant other)
+ {
+ _other = new AtomicReference<MockParticipant>(other);
+ }
+
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ MockParticipant other = _other.getAndSet(null);
+ if (other != null)
+ {
+ System.err.println("Kill " + other.getInstanceName()
+ + ". Interrupted exceptions are IGNORABLE");
+ other.syncStop();
+ }
+ }
+ }
+
+ @Test()
+ public void testRestartParticipant() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ System.out.println("START testRestartParticipant at "
+ + new Date(System.currentTimeMillis()));
+
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ if (i == 4)
+ {
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new KillOtherTransition(participants[0]));
+ }
+ else
+ {
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ null);
+// Thread.sleep(100);
+ }
+
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // restart
+ Thread.sleep(500);
+ MockParticipant participant =
+ new MockParticipant(participants[0].getClusterName(),
+ participants[0].getInstanceName(),
+ ZK_ADDR,
+ null);
+ System.err.println("Restart " + participant.getInstanceName());
+ participant.syncStart();
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ System.out.println("START testRestartParticipant at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}