You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/10/25 03:21:16 UTC
[03/10] [HELIX-279] Apply gc handling fixes to main ZKHelixManager
class
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 2354ebd..457b5fb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -87,8 +87,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
String hostDest = "localhost_" + (START_PORT + 1);
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
+ _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+ factory);
String msgId = new UUID(123, 456).toString();
Message msg = new Message(factory.getMessageType(), msgId);
@@ -104,7 +104,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
cr.setSessionSpecific(false);
- int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ // int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ int nMsgs = _participants[0].getMessagingService().send(cr, msg);
AssertJUnit.assertTrue(nMsgs == 1);
Thread.sleep(2500);
// Thread.currentThread().join();
@@ -116,7 +117,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
cr.setSessionSpecific(false);
cr.setDataSource(DataSource.IDEALSTATES);
- nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ // nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+ nMsgs = _participants[0].getMessagingService().send(cr, msg);
AssertJUnit.assertTrue(nMsgs == 1);
Thread.sleep(2500);
// Thread.currentThread().join();
@@ -179,11 +181,11 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
String hostDest = "localhost_" + (START_PORT + 1);
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
+ _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+ factory);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
+ _participants[0].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+ factory);
String msgId = new UUID(123, 456).toString();
Message msg = new Message(factory.getMessageType(), msgId);
@@ -202,7 +204,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
TestAsyncCallback callback = new TestAsyncCallback(60000);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+ _participants[0].getMessagingService().send(cr, msg, callback, 60000);
Thread.sleep(2000);
// Thread.currentThread().join();
@@ -210,7 +212,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
TestAsyncCallback callback2 = new TestAsyncCallback(500);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+ _participants[0].getMessagingService().send(cr, msg, callback2, 500);
Thread.sleep(3000);
// Thread.currentThread().join();
@@ -224,7 +226,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
callback = new TestAsyncCallback(60000);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+ _participants[0].getMessagingService().send(cr, msg, callback, 60000);
Thread.sleep(2000);
// Thread.currentThread().join();
@@ -232,7 +234,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
callback2 = new TestAsyncCallback(500);
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+ _participants[0].getMessagingService().send(cr, msg, callback2, 500);
Thread.sleep(3000);
// Thread.currentThread().join();
@@ -246,8 +248,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
String hostDest = "localhost_" + (START_PORT + 1);
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
- factory.getMessageType(), factory);
+ _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+ factory);
String msgId = new UUID(123, 456).toString();
Message msg = new Message(factory.getMessageType(), msgId);
@@ -266,8 +268,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AsyncCallback asyncCallback = new MockAsyncCallback();
int messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- asyncCallback, 60000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -275,9 +276,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
AsyncCallback asyncCallback2 = new MockAsyncCallback();
- messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- asyncCallback2, 500);
+ messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
}
@@ -289,8 +288,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[0].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
+
}
String msgId = new UUID(123, 456).toString();
Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId);
@@ -308,8 +308,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
cr.setSessionSpecific(false);
AsyncCallback callback1 = new MockAsyncCallback();
int messageSent1 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -317,37 +316,32 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
AsyncCallback callback2 = new MockAsyncCallback();
- int messageSent2 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback2, 500);
+ int messageSent2 = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
+
AssertJUnit.assertTrue(callback2.isTimedOut());
cr.setPartition("TestDB_17");
AsyncCallback callback3 = new MockAsyncCallback();
int messageSent3 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback3, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
cr.setPartition("TestDB_15");
AsyncCallback callback4 = new MockAsyncCallback();
int messageSent4 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback4, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
cr.setPartitionState("SLAVE");
AsyncCallback callback5 = new MockAsyncCallback();
int messageSent5 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback5, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
cr.setDataSource(DataSource.IDEALSTATES);
AsyncCallback callback6 = new MockAsyncCallback();
int messageSent6 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback6, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
}
@@ -358,8 +352,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
+
}
String msgId = new UUID(123, 456).toString();
Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId);
@@ -378,8 +373,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
cr.setSelfExcluded(false);
AsyncCallback callback1 = new MockAsyncCallback();
int messageSent1 =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
@@ -394,8 +388,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
+
}
String msgId = new UUID(123, 456).toString();
Message msg = new Message(MessageType.CONTROLLER_MSG, msgId);
@@ -414,8 +409,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
AsyncCallback callback1 = new MockAsyncCallback();
int messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -426,9 +420,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
msg.setMsgId(msgId);
cr.setPartition("TestDB_17");
AsyncCallback callback2 = new MockAsyncCallback();
- messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback2, 10000);
+ messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
+
AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
.indexOf(hostSrc) != -1);
@@ -439,9 +432,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
msg.setMsgId(msgId);
cr.setPartitionState("SLAVE");
AsyncCallback callback3 = new MockAsyncCallback();
- messagesSent =
- _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
- callback3, 10000);
+ messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
.indexOf(hostSrc) != -1);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
index 5a6e5e6..734e2b4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -22,9 +22,9 @@ package org.apache.helix.integration;
import java.util.Date;
import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockBootstrapModelFactory;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.tools.ClusterSetup;
@@ -51,18 +51,19 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
1, // replicas
"Bootstrap", true); // do rebalance
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
// add a state model with non-OFFLINE initial state
- StateMachineEngine stateMach = participants[i].getManager().getStateMachineEngine();
+ StateMachineEngine stateMach = participants[i].getStateMachineEngine();
MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
@@ -74,16 +75,21 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
clusterName));
Assert.assertTrue(result);
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END testNonOfflineInitState at " + new Date(System.currentTimeMillis()));
}
private static void setupCluster(String clusterName, String ZkAddr, int startPort,
String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception {
- ZkClient zkClient = new ZkClient(ZkAddr);
- if (zkClient.exists("/" + clusterName)) {
+ if (_gZkClient.exists("/" + clusterName)) {
LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
- zkClient.deleteRecursive("/" + clusterName);
+ _gZkClient.deleteRecursive("/" + clusterName);
}
ClusterSetup setupTool = new ClusterSetup(ZkAddr);
@@ -103,7 +109,6 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
}
}
- zkClient.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
index 496d1a6..0f5cc72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
@@ -25,8 +25,8 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -44,7 +44,7 @@ public class TestNullReplica extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -61,14 +61,15 @@ public class TestNullReplica extends ZkIntegrationTestBase {
idealState.getSimpleFields().remove(IdealState.IdealStateProperty.REPLICAS.toString());
_gZkClient.writeData(idealStatePath, idealState);
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -78,13 +79,11 @@ public class TestNullReplica extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
- Thread.sleep(2000);
- controller.syncStop();
-
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
index 8336939..cf59ed0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
@@ -31,10 +31,13 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
+ private static Logger LOG = Logger.getLogger(TestParticipantErrorMessage.class);
+
@Test()
public void TestParticipantErrorMessageSend() {
String participant1 = "localhost_" + START_PORT;
@@ -49,7 +52,7 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
recipientCriteria.setSessionSpecific(false);
- _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria,
+ _participants[0].getMessagingService().send(recipientCriteria,
errorMessage1);
Message errorMessage2 =
@@ -63,23 +66,22 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
Criteria recipientCriteria2 = new Criteria();
recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
recipientCriteria2.setSessionSpecific(false);
- _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2,
+ _participants[1].getMessagingService().send(recipientCriteria2,
errorMessage2);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted sleep", e);
}
boolean result =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
CLUSTER_NAME));
Assert.assertTrue(result);
- Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder();
+ Builder kb = _participants[1].getHelixDataAccessor().keyBuilder();
ExternalView externalView =
- _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty(
+ _participants[1].getHelixDataAccessor().getProperty(
kb.externalView("TestDB"));
for (String partitionName : externalView.getRecord().getMapFields().keySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
index 4227688..aa67ac9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
@@ -22,7 +22,7 @@ package org.apache.helix.integration;
import java.util.Date;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.log4j.Logger;
import org.testng.annotations.Test;
@@ -33,20 +33,21 @@ public class TestParticipantNameCollision extends ZkStandAloneCMTestBase {
public void testParticiptantNameCollision() throws Exception {
logger.info("RUN TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
- StartCMResult result = null;
+ MockParticipantManager newParticipant = null;
for (int i = 0; i < 1; i++) {
String instanceName = "localhost_" + (START_PORT + i);
try {
// the call fails on getClusterManagerForParticipant()
// no threads start
- result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+ newParticipant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ newParticipant.syncStart();
} catch (Exception e) {
e.printStackTrace();
}
}
Thread.sleep(30000);
- TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, result._manager);
+ TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, newParticipant);
logger.info("STOP TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
index d900d98..a1c413b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
@@ -24,12 +24,12 @@ import java.util.Date;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -47,7 +47,7 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -59,14 +59,15 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -112,13 +113,11 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
- Thread.sleep(2000);
- controller.syncStop();
-
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
index ed056ab..5f4377d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
@@ -23,14 +23,15 @@ import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -39,6 +40,14 @@ import org.testng.Assert;
import org.testng.annotations.Test;
public class TestRenamePartition extends ZkIntegrationTestBase {
+ // map from clusterName to participants
+ final Map<String, MockParticipantManager[]> _participantMap =
+ new ConcurrentHashMap<String, MockParticipantManager[]>();
+
+ // map from clusterName to controllers
+ final Map<String, ClusterControllerManager> _controllerMap =
+ new ConcurrentHashMap<String, ClusterControllerManager>();
+
@Test()
public void testRenamePartitionAutoIS() throws Exception {
String clusterName = "CLUSTER_" + getShortClassName() + "_auto";
@@ -57,7 +66,7 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
// rename partition name TestDB0_0 tp TestDB0_100
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
@@ -71,8 +80,8 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
ZK_ADDR, clusterName));
Assert.assertTrue(result);
+ stop(clusterName);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
}
@Test()
@@ -103,7 +112,7 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
idealState.setStateModelDefRef("MasterSlave");
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
@@ -118,23 +127,25 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, clusterName));
Assert.assertTrue(result);
+
+ stop(clusterName);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
private void startAndVerify(String clusterName) throws Exception {
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
boolean result =
@@ -142,5 +153,21 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
ZK_ADDR, clusterName));
Assert.assertTrue(result);
+ _participantMap.put(clusterName, participants);
+ _controllerMap.put(clusterName, controller);
+ }
+
+ private void stop(String clusterName) {
+ ClusterControllerManager controller = _controllerMap.get(clusterName);
+ if (controller != null) {
+ controller.syncStop();
+ }
+
+ MockParticipantManager[] participants = _participantMap.get(clusterName);
+ if (participants != null) {
+ for (MockParticipantManager participant : participants) {
+ participant.syncStop();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
index 2715932..7159a17 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
@@ -25,8 +25,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -54,7 +54,8 @@ public class TestResetInstance extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -65,16 +66,15 @@ public class TestResetInstance extends ZkIntegrationTestBase {
};
// start mock participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -102,8 +102,6 @@ public class TestResetInstance extends ZkIntegrationTestBase {
Assert.assertTrue(result, "Cluster verification fails");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
index 09e57c6..c37a5ea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -28,10 +28,10 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
@@ -80,7 +80,8 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -91,16 +92,16 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
};
// start mock participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -169,8 +170,6 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
Assert.assertEquals(_errToOfflineInvoked, 2, "Should reset 2 partitions");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
index de4ad1a..46a05d8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
@@ -25,8 +25,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -53,7 +53,8 @@ public class TestResetResource extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -64,16 +65,15 @@ public class TestResetResource extends ZkIntegrationTestBase {
};
// start mock participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -101,8 +101,6 @@ public class TestResetResource extends ZkIntegrationTestBase {
Assert.assertTrue(result, "Cluster verification fails");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
index 008782c..64043ed 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
@@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -35,15 +35,15 @@ import org.testng.annotations.Test;
public class TestRestartParticipant extends ZkIntegrationTestBase {
public class KillOtherTransition extends MockTransition {
- final AtomicReference<MockParticipant> _other;
+ final AtomicReference<MockParticipantManager> _other;
- public KillOtherTransition(MockParticipant other) {
- _other = new AtomicReference<MockParticipant>(other);
+ public KillOtherTransition(MockParticipantManager other) {
+ _other = new AtomicReference<MockParticipantManager>(other);
}
@Override
public void doTransition(Message message, NotificationContext context) {
- MockParticipant other = _other.getAndSet(null);
+ MockParticipantManager other = _other.getAndSet(null);
if (other != null) {
System.err.println("Kill " + other.getInstanceName()
+ ". Interrupted exceptions are IGNORABLE");
@@ -58,7 +58,7 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis()));
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -69,19 +69,19 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
3, // replicas
"MasterSlave", true); // do rebalance
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 4) {
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new KillOtherTransition(
- participants[0]));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new KillOtherTransition(participants[0]));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
- // Thread.sleep(100);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
@@ -94,9 +94,9 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
// restart
Thread.sleep(500);
- MockParticipant participant =
- new MockParticipant(participants[0].getClusterName(), participants[0].getInstanceName(),
- ZK_ADDR, null);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, participants[0].getClusterName(),
+ participants[0].getInstanceName());
System.err.println("Restart " + participant.getInstanceName());
participant.syncStart();
result =
@@ -104,6 +104,13 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
clusterName));
Assert.assertTrue(result);
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+ participant.syncStop();
+
System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 2c174c4..bf851cc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -194,10 +194,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
@@ -282,10 +282,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
@@ -367,11 +367,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
String controllerStatusPath =
HelixUtil.getControllerPropertyPath(manager.getClusterName(),
PropertyType.STATUSUPDATES_CONTROLLER);
- List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
+ List<String> subPaths = _gZkClient.getChildren(controllerStatusPath);
Assert.assertTrue(subPaths.size() > 0);
for (String subPath : subPaths) {
String nextPath = controllerStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ List<String> subsubPaths = _gZkClient.getChildren(nextPath);
Assert.assertTrue(subsubPaths.size() > 0);
}
@@ -379,38 +379,38 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
PropertyType.STATUSUPDATES);
- subPaths = _zkClient.getChildren(instanceStatusPath);
+ subPaths = _gZkClient.getChildren(instanceStatusPath);
Assert.assertTrue(subPaths.size() > 0);
for (String subPath : subPaths) {
String nextPath = instanceStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ List<String> subsubPaths = _gZkClient.getChildren(nextPath);
Assert.assertTrue(subsubPaths.size() > 0);
for (String subsubPath : subsubPaths) {
String nextnextPath = nextPath + "/" + subsubPath;
- Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
+ Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() > 0);
}
}
Thread.sleep(3000);
- ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
+ ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _gZkClient, 0);
dumpTask.run();
- subPaths = _zkClient.getChildren(controllerStatusPath);
+ subPaths = _gZkClient.getChildren(controllerStatusPath);
Assert.assertTrue(subPaths.size() > 0);
for (String subPath : subPaths) {
String nextPath = controllerStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ List<String> subsubPaths = _gZkClient.getChildren(nextPath);
Assert.assertTrue(subsubPaths.size() == 0);
}
- subPaths = _zkClient.getChildren(instanceStatusPath);
+ subPaths = _gZkClient.getChildren(instanceStatusPath);
Assert.assertTrue(subPaths.size() > 0);
for (String subPath : subPaths) {
String nextPath = instanceStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ List<String> subsubPaths = _gZkClient.getChildren(nextPath);
Assert.assertTrue(subsubPaths.size() > 0);
for (String subsubPath : subsubPaths) {
String nextnextPath = nextPath + "/" + subsubPath;
- Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
+ Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() == 0);
}
}
}
@@ -420,10 +420,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
@@ -511,10 +511,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
@@ -581,13 +581,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- //
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
@@ -702,13 +702,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- //
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
_factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
@@ -852,13 +852,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
- //
- _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+
+ _participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
- manager = _startCMResultMap.get(hostDest)._manager;
+
+ manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
}
Message schedulerMessage =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
index 3024f45..a927520 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
@@ -27,10 +27,10 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -46,7 +46,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
String clusterName = className + "_" + methodName;
int n = 5;
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -70,14 +70,15 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
Arrays.asList(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString()));
accessor.setProperty(key, idealState);
- ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
controller.syncStart();
// start n-1 participants
for (int i = 1; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -87,7 +88,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// start the remaining 1 participant
- participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR, null);
+ participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
participants[0].syncStart();
// make sure we have all participants in MASTER state
@@ -107,6 +108,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
}
// clean up
+ controller.syncStop();
for (int i = 0; i < n; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
index f38c6de..965b8ef 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
@@ -25,10 +25,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -39,13 +38,14 @@ import org.testng.Assert;
import org.testng.annotations.Test;
public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
+ private static Logger LOG = Logger.getLogger(TestSessionExpiryInTransition.class);
public class SessionExpiryTransition extends MockTransition {
private final AtomicBoolean _done = new AtomicBoolean();
@Override
public void doTransition(Message message, NotificationContext context) {
- ZkHelixTestManager manager = (ZkHelixTestManager) context.getManager();
+ MockParticipantManager manager = (MockParticipantManager) context.getManager();
String instance = message.getTgtName();
String partition = message.getPartitionName();
@@ -55,8 +55,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
try {
ZkTestHelper.expireSession(manager.getZkClient());
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Exception expire zk-session", e);
}
}
}
@@ -64,7 +63,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
@Test
public void testSessionExpiryInTransition() throws Exception {
- Logger.getRootLogger().setLevel(Level.WARN);
+ // Logger.getRootLogger().setLevel(Level.WARN);
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
@@ -72,7 +71,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -84,15 +83,15 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- ZkHelixTestManager manager =
- new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
- participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new SessionExpiryTransition());
participants[i].syncStart();
}
@@ -102,13 +101,11 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
- Thread.sleep(2000);
- controller.syncStop();
-
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
index 02a34d8..6eb7a8c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
@@ -21,9 +21,15 @@ package org.apache.helix.integration;
import java.util.Date;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
@@ -35,16 +41,34 @@ public class TestStandAloneCMMain extends ZkStandAloneCMTestBase {
@Test()
public void testStandAloneCMMain() throws Exception {
logger.info("RUN testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
-
+ ClusterControllerManager newController = null;
for (int i = 1; i <= 2; i++) {
String controllerName = "controller_" + i;
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ newController =
+ new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ newController.syncStart();
}
- stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+ // stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+ _controller.syncStop();
+
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ final String newControllerName = newController.getInstanceName();
+ TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader == null) {
+ return false;
+ }
+ return leader.getInstanceName().equals(newControllerName);
+
+ }
+ }, 30 * 1000);
+
boolean result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, CLUSTER_NAME));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
index 347ff7e..e8adf03 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
@@ -23,9 +23,9 @@ import java.util.Date;
import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
@@ -47,18 +47,16 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, PARTICIPANT_PREFIX, "TestDB", 1, 20, 5, 3,
"MasterSlave", true);
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- ZkHelixTestManager manager =
- new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
- participants[i] = new MockParticipant(manager, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
- ZkHelixTestManager controller =
- new ZkHelixTestManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
- controller.connect();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
boolean result;
result =
@@ -67,7 +65,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// participant session expiry
- ZkHelixTestManager participantToExpire = (ZkHelixTestManager) participants[1].getManager();
+ MockParticipantManager participantToExpire = participants[1];
System.out.println("Expire participant session");
String oldSessionId = participantToExpire.getSessionId();
@@ -107,8 +105,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
// clean up
System.out.println("Clean up ...");
// Logger.getRootLogger().setLevel(Level.DEBUG);
- controller.disconnect();
- Thread.sleep(100);
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
index dce3fd4..d191c18 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
@@ -25,7 +25,7 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -54,10 +54,10 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB
// rebalance
// start controller
- ClusterController[] controllers = new ClusterController[4];
+ ClusterControllerManager[] controllers = new ClusterControllerManager[4];
for (int i = 0; i < 4; i++) {
- controllers[i] = new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controllers[i].start();
+ controllers[i] = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controllers[i].syncStart();
}
Thread.sleep(500); // wait leader election finishes
@@ -69,7 +69,6 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB
// clean up
for (int i = 0; i < 4; i++) {
controllers[i].syncStop();
- Thread.sleep(1000); // wait for all zk callbacks done
}
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index edc10c6..a297752 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -28,22 +28,16 @@ import java.util.Set;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
-import org.apache.helix.TestHelper;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
-import org.apache.helix.mock.participant.MockJobIntf;
import org.apache.helix.mock.participant.MockMSStateModel;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.mock.participant.SleepTransition;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.StateTransitionError;
@@ -59,15 +53,14 @@ import org.testng.annotations.Test;
public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
private static Logger LOG = Logger.getLogger(TestStateTransitionTimeout.class);
+ @Override
@BeforeClass
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace)) {
- _zkClient.deleteRecursive(namespace);
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
}
_setupTool = new ClusterSetup(ZK_ADDR);
@@ -106,12 +99,14 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
_sleep = sleep;
}
+ @Override
@Transition(to = "SLAVE", from = "OFFLINE")
public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
LOG.info("Become SLAVE from OFFLINE");
}
+ @Override
@Transition(to = "MASTER", from = "SLAVE")
public void onBecomeMasterFromSlave(Message message, NotificationContext context)
throws InterruptedException {
@@ -121,23 +116,27 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
}
}
+ @Override
@Transition(to = "SLAVE", from = "MASTER")
public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
LOG.info("Become SLAVE from MASTER");
}
+ @Override
@Transition(to = "OFFLINE", from = "SLAVE")
public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
LOG.info("Become OFFLINE from SLAVE");
}
+ @Override
@Transition(to = "DROPPED", from = "OFFLINE")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
LOG.info("Become DROPPED from OFFLINE");
}
+ @Override
public void rollbackOnError(Message message, NotificationContext context,
StateTransitionError error) {
_error = error;
@@ -171,7 +170,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
@Test
public void testStateTransitionTimeOut() throws Exception {
Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
- MockParticipant[] participants = new MockParticipant[NODE_NR];
+ // MockParticipantManager[] participants = new MockParticipantManager[NODE_NR];
IdealState idealState =
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
for (int i = 0; i < NODE_NR; i++) {
@@ -184,19 +183,20 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
}
}
- participants[i] = new MockParticipant(factory, CLUSTER_NAME, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory);
+ _participants[i].syncStart();
}
String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ _controller =
+ new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
boolean result =
ClusterStateVerifier
.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
- HelixDataAccessor accessor = participants[0].getManager().getHelixDataAccessor();
+ HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
Builder kb = accessor.keyBuilder();
ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
index a1f63aa..6cce716 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -21,9 +21,8 @@ package org.apache.helix.integration;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -33,8 +32,7 @@ import org.testng.annotations.Test;
public class TestSwapInstance extends ZkStandAloneCMTestBase {
@Test
public void TestSwap() throws Exception {
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller;
HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
_setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
@@ -49,7 +47,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
idealStateOld2.merge(is2.getRecord());
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+ ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
_setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
boolean result =
@@ -68,8 +66,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
}
Assert.assertTrue(exception);
- _startCMResultMap.get(instanceName)._manager.disconnect();
- _startCMResultMap.get(instanceName)._thread.interrupt();
+ _participants[0].syncStop();
Thread.sleep(1000);
exception = false;
@@ -80,8 +77,9 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
exception = true;
}
Assert.assertFalse(exception);
- StartCMResult result2 = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
- _startCMResultMap.put(instanceName2, result2);
+ MockParticipantManager newParticipant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName2);
+ newParticipant.syncStart();
result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(