You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:12:19 UTC
[04/52] [abbrv] [HELIX-279] Apply gc handling fixes to ZKHelixManager
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
index 8cbe55b..0c7c131 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
@@ -29,13 +29,13 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.Message;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -49,8 +49,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
// Logger.getRootLogger().setLevel(Level.INFO);
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
- // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -114,13 +113,15 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
});
}
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -137,6 +138,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
Assert.assertTrue(success.get());
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
index a947445..a182753 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
@@ -32,7 +32,6 @@ import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.InstanceConfigChangeListener;
import org.apache.helix.InstanceType;
@@ -246,8 +245,7 @@ public class TestMessageThrottle2 extends ZkIntegrationTestBase {
public void start() throws Exception {
helixManager =
- HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
- InstanceType.PARTICIPANT, ZK_ADDR);
+ new ZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
{
// hack to set sessionTimeout
Field sessionTimeout = ZKHelixManager.class.getDeclaredField("_sessionTimeout");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 9c6b4b7..86f1ce4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -89,8 +89,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
String hostDest = "localhost_" + (START_PORT + 1);
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
+ _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+ factory);
MessageId msgId = MessageId.from(new UUID(123, 456).toString());
Message msg = new Message(factory.getMessageType(), msgId);
@@ -106,7 +106,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
cr.setSessionSpecific(false);
- int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ // int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ int nMsgs = _participants[0].getMessagingService().send(cr, msg);
AssertJUnit.assertTrue(nMsgs == 1);
Thread.sleep(2500);
// Thread.currentThread().join();
@@ -118,7 +119,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
cr.setSessionSpecific(false);
cr.setDataSource(DataSource.IDEALSTATES);
- nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ // nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ nMsgs = _participants[0].getMessagingService().send(cr, msg);
AssertJUnit.assertTrue(nMsgs == 1);
Thread.sleep(2500);
// Thread.currentThread().join();
@@ -181,11 +183,11 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
String hostDest = "localhost_" + (START_PORT + 1);
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
+ _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+ factory);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
+ _participants[0].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+ factory);
MessageId msgId = MessageId.from(new UUID(123, 456).toString());
Message msg = new Message(factory.getMessageType(), msgId);
@@ -204,7 +206,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
TestAsyncCallback callback = new TestAsyncCallback(60000);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+ _participants[0].getMessagingService().send(cr, msg, callback, 60000);
Thread.sleep(2000);
// Thread.currentThread().join();
@@ -212,7 +214,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
TestAsyncCallback callback2 = new TestAsyncCallback(500);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+ _participants[0].getMessagingService().send(cr, msg, callback2, 500);
Thread.sleep(3000);
// Thread.currentThread().join();
@@ -226,7 +228,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
callback = new TestAsyncCallback(60000);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+ _participants[0].getMessagingService().send(cr, msg, callback, 60000);
Thread.sleep(2000);
// Thread.currentThread().join();
@@ -234,7 +236,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
callback2 = new TestAsyncCallback(500);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+ _participants[0].getMessagingService().send(cr, msg, callback2, 500);
Thread.sleep(3000);
// Thread.currentThread().join();
@@ -248,8 +250,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
String hostDest = "localhost_" + (START_PORT + 1);
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
+ _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+ factory);
MessageId msgId = MessageId.from(new UUID(123, 456).toString());
Message msg = new Message(factory.getMessageType(), msgId);
@@ -268,8 +270,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AsyncCallback asyncCallback = new MockAsyncCallback();
int messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- asyncCallback, 60000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -277,9 +278,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
AsyncCallback asyncCallback2 = new MockAsyncCallback();
- messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- asyncCallback2, 500);
+ messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
}
@@ -291,8 +290,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[0].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
+
}
MessageId msgId = MessageId.from(new UUID(123, 456).toString());
Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId);
@@ -310,8 +310,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
cr.setSessionSpecific(false);
AsyncCallback callback1 = new MockAsyncCallback();
int messageSent1 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -319,37 +318,32 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
AsyncCallback callback2 = new MockAsyncCallback();
- int messageSent2 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback2, 500);
+ int messageSent2 = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
+
AssertJUnit.assertTrue(callback2.isTimedOut());
cr.setPartition("TestDB_17");
AsyncCallback callback3 = new MockAsyncCallback();
int messageSent3 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback3, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
cr.setPartition("TestDB_15");
AsyncCallback callback4 = new MockAsyncCallback();
int messageSent4 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback4, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
cr.setPartitionState("SLAVE");
AsyncCallback callback5 = new MockAsyncCallback();
int messageSent5 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback5, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
cr.setDataSource(DataSource.IDEALSTATES);
AsyncCallback callback6 = new MockAsyncCallback();
int messageSent6 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback6, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
}
@@ -360,8 +354,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
+
}
MessageId msgId = MessageId.from(new UUID(123, 456).toString());
Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId);
@@ -380,8 +375,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
cr.setSelfExcluded(false);
AsyncCallback callback1 = new MockAsyncCallback();
int messageSent1 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
@@ -396,8 +390,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
+
}
MessageId msgId = MessageId.from(new UUID(123, 456).toString());
Message msg = new Message(MessageType.CONTROLLER_MSG, msgId);
@@ -416,8 +411,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AsyncCallback callback1 = new MockAsyncCallback();
int messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -428,9 +422,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
msg.setMessageId(msgId);
cr.setPartition("TestDB_17");
AsyncCallback callback2 = new MockAsyncCallback();
- messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback2, 10000);
+ messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
+
AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
.indexOf(hostSrc) != -1);
@@ -441,9 +434,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
msg.setMessageId(msgId);
cr.setPartitionState("SLAVE");
AsyncCallback callback3 = new MockAsyncCallback();
- messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback3, 10000);
+ messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
.indexOf(hostSrc) != -1);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
index c5558ca..aa48c90 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -22,10 +22,9 @@ package org.apache.helix.integration;
import java.util.Date;
import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.MockBootstrapModelFactory;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -51,18 +50,19 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
1, // replicas
"Bootstrap", true); // do rebalance
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
// add a state model with non-OFFLINE initial state
- StateMachineEngine stateMach = participants[i].getManager().getStateMachineEngine();
+ StateMachineEngine stateMach = participants[i].getStateMachineEngine();
MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
@@ -74,16 +74,21 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
clusterName));
Assert.assertTrue(result);
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END testNonOfflineInitState at " + new Date(System.currentTimeMillis()));
}
private static void setupCluster(String clusterName, String ZkAddr, int startPort,
String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception {
- ZkClient zkClient = new ZkClient(ZkAddr);
- if (zkClient.exists("/" + clusterName)) {
+ if (_gZkClient.exists("/" + clusterName)) {
LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
- zkClient.deleteRecursive("/" + clusterName);
+ _gZkClient.deleteRecursive("/" + clusterName);
}
ClusterSetup setupTool = new ClusterSetup(ZkAddr);
@@ -103,7 +108,6 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
}
}
- zkClient.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
index 496d1a6..0f5cc72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
@@ -25,8 +25,8 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -44,7 +44,7 @@ public class TestNullReplica extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -61,14 +61,15 @@ public class TestNullReplica extends ZkIntegrationTestBase {
idealState.getSimpleFields().remove(IdealState.IdealStateProperty.REPLICAS.toString());
_gZkClient.writeData(idealStatePath, idealState);
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -78,13 +79,11 @@ public class TestNullReplica extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
- Thread.sleep(2000);
- controller.syncStop();
-
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
index 07a2fc0..750e2b7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
@@ -35,10 +35,13 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
+ private static Logger LOG = Logger.getLogger(TestParticipantErrorMessage.class);
+
@Test()
public void TestParticipantErrorMessageSend() {
String participant1 = "localhost_" + START_PORT;
@@ -54,7 +57,7 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
recipientCriteria.setSessionSpecific(false);
- _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria,
+ _participants[0].getMessagingService().send(recipientCriteria,
errorMessage1);
Message errorMessage2 =
@@ -69,23 +72,22 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
Criteria recipientCriteria2 = new Criteria();
recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
recipientCriteria2.setSessionSpecific(false);
- _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2,
+ _participants[1].getMessagingService().send(recipientCriteria2,
errorMessage2);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted sleep", e);
}
boolean result =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
CLUSTER_NAME));
Assert.assertTrue(result);
- Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder();
+ Builder kb = _participants[1].getHelixDataAccessor().keyBuilder();
ExternalView externalView =
- _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty(
+ _participants[1].getHelixDataAccessor().getProperty(
kb.externalView("TestDB"));
for (String partitionName : externalView.getRecord().getMapFields().keySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
index 4227688..aa67ac9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
@@ -22,7 +22,7 @@ package org.apache.helix.integration;
import java.util.Date;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.log4j.Logger;
import org.testng.annotations.Test;
@@ -33,20 +33,21 @@ public class TestParticipantNameCollision extends ZkStandAloneCMTestBase {
public void testParticiptantNameCollision() throws Exception {
logger.info("RUN TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
- StartCMResult result = null;
+ MockParticipantManager newParticipant = null;
for (int i = 0; i < 1; i++) {
String instanceName = "localhost_" + (START_PORT + i);
try {
// the call fails on getClusterManagerForParticipant()
// no threads start
- result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+ newParticipant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ newParticipant.syncStart();
} catch (Exception e) {
e.printStackTrace();
}
}
Thread.sleep(30000);
- TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, result._manager);
+ TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, newParticipant);
logger.info("STOP TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
index d900d98..a1c413b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
@@ -24,12 +24,12 @@ import java.util.Date;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -47,7 +47,7 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -59,14 +59,15 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -112,13 +113,11 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
- Thread.sleep(2000);
- controller.syncStop();
-
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java b/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java
index be2de65..f2af156 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java
@@ -9,10 +9,10 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.StateModelDefinition;
@@ -49,15 +49,16 @@ public class TestRedefineStateModelDef extends ZkUnitTestBase {
autoRebalance(clusterName);
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -70,7 +71,7 @@ public class TestRedefineStateModelDef extends ZkUnitTestBase {
// stop controller, redefine state model definition, and re-start controller
controller.syncStop();
redefineStateModelDef(clusterName);
- controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
result =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
index e04cc79..d3a370d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
@@ -23,16 +23,17 @@ import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -40,6 +41,14 @@ import org.testng.Assert;
import org.testng.annotations.Test;
public class TestRenamePartition extends ZkIntegrationTestBase {
+ // map from clusterName to participants
+ final Map<String, MockParticipantManager[]> _participantMap =
+ new ConcurrentHashMap<String, MockParticipantManager[]>();
+
+ // map from clusterName to controllers
+ final Map<String, ClusterControllerManager> _controllerMap =
+ new ConcurrentHashMap<String, ClusterControllerManager>();
+
@Test()
public void testRenamePartitionAutoIS() throws Exception {
String clusterName = "CLUSTER_" + getShortClassName() + "_auto";
@@ -72,8 +81,8 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
ZK_ADDR, clusterName));
Assert.assertTrue(result);
+ stop(clusterName);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
}
@Test()
@@ -104,7 +113,7 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
idealState.setStateModelDefId(StateModelDefId.from("MasterSlave"));
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
@@ -119,23 +128,25 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, clusterName));
Assert.assertTrue(result);
+
+ stop(clusterName);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
private void startAndVerify(String clusterName) throws Exception {
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
boolean result =
@@ -143,5 +154,21 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
ZK_ADDR, clusterName));
Assert.assertTrue(result);
+ _participantMap.put(clusterName, participants);
+ _controllerMap.put(clusterName, controller);
+ }
+
+ private void stop(String clusterName) {
+ ClusterControllerManager controller = _controllerMap.get(clusterName);
+ if (controller != null) {
+ controller.syncStop();
+ }
+
+ MockParticipantManager[] participants = _participantMap.get(clusterName);
+ if (participants != null) {
+ for (MockParticipantManager participant : participants) {
+ participant.syncStop();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
index 72920a8..7159a17 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
@@ -25,9 +25,9 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.ErrTransition;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
@@ -54,7 +54,8 @@ public class TestResetInstance extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -65,16 +66,15 @@ public class TestResetInstance extends ZkIntegrationTestBase {
};
// start mock participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -102,8 +102,6 @@ public class TestResetInstance extends ZkIntegrationTestBase {
Assert.assertTrue(result, "Cluster verification fails");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
index 9479cff..af1ef13 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -31,9 +31,9 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.ErrTransition;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -81,7 +81,8 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -92,16 +93,16 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
};
// start mock participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -170,8 +171,6 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
Assert.assertEquals(_errToOfflineInvoked, 2, "Should reset 2 partitions");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
index f8b4dc9..46a05d8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
@@ -25,9 +25,9 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.ErrTransition;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
@@ -53,7 +53,8 @@ public class TestResetResource extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -64,16 +65,15 @@ public class TestResetResource extends ZkIntegrationTestBase {
};
// start mock participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -101,8 +101,6 @@ public class TestResetResource extends ZkIntegrationTestBase {
Assert.assertTrue(result, "Cluster verification fails");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
index 008782c..64043ed 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
@@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -35,15 +35,15 @@ import org.testng.annotations.Test;
public class TestRestartParticipant extends ZkIntegrationTestBase {
public class KillOtherTransition extends MockTransition {
- final AtomicReference<MockParticipant> _other;
+ final AtomicReference<MockParticipantManager> _other;
- public KillOtherTransition(MockParticipant other) {
- _other = new AtomicReference<MockParticipant>(other);
+ public KillOtherTransition(MockParticipantManager other) {
+ _other = new AtomicReference<MockParticipantManager>(other);
}
@Override
public void doTransition(Message message, NotificationContext context) {
- MockParticipant other = _other.getAndSet(null);
+ MockParticipantManager other = _other.getAndSet(null);
if (other != null) {
System.err.println("Kill " + other.getInstanceName()
+ ". Interrupted exceptions are IGNORABLE");
@@ -58,7 +58,7 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis()));
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -69,19 +69,19 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
3, // replicas
"MasterSlave", true); // do rebalance
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 4) {
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new KillOtherTransition(
- participants[0]));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new KillOtherTransition(participants[0]));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- // Thread.sleep(100);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
@@ -94,9 +94,9 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
// restart
Thread.sleep(500);
- MockParticipant participant =
- new MockParticipant(participants[0].getClusterName(), participants[0].getInstanceName(),
- ZK_ADDR, null);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, participants[0].getClusterName(),
+ participants[0].getInstanceName());
System.err.println("Restart " + participant.getInstanceName());
participant.syncStart();
result =
@@ -104,6 +104,13 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
clusterName));
Assert.assertTrue(result);
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+ participant.syncStop();
+
System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis()));
}