You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/10/25 03:21:17 UTC
[04/10] [HELIX-279] Apply gc handling fixes to main ZKHelixManager
class
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
index 9d63a65..97b6ebb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
@@ -25,8 +25,8 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
@@ -39,7 +39,7 @@ public class TestCarryOverBadCurState extends ZkIntegrationTestBase {
System.out.println("START testCarryOverBadCurState at " + new Date(System.currentTimeMillis()));
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -58,13 +58,15 @@ public class TestCarryOverBadCurState extends ZkIntegrationTestBase {
_gZkClient.createPersistent(path, true);
_gZkClient.writeData(path, badCurState);
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -78,6 +80,11 @@ public class TestCarryOverBadCurState extends ZkIntegrationTestBase {
clusterName));
Assert.assertTrue(result);
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END testCarryOverBadCurState at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
index 781aa89..e7a7ea0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
@@ -20,11 +20,11 @@ package org.apache.helix.integration;
*/
import org.apache.helix.*;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -57,15 +57,16 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -80,7 +81,7 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
admin.enableCluster(clusterName, false);
// wait all pending zk-events being processed, otherwise remove current-state will cause
// controller send O->S message
- ZkTestHelper.tryWaitZkEventsCleaned(controller.getManager().getZkClient());
+ ZkTestHelper.tryWaitZkEventsCleaned(controller.getZkClient());
// System.out.println("paused controller");
// drop resource
@@ -117,8 +118,13 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
Assert.assertNull(externalView, "external-view for TestDB0 should be removed, but was: "
+ externalView);
- System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
index c0ced72..e844a27 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
@@ -26,7 +26,6 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyType;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.util.HelixUtil;
import org.testng.Assert;
@@ -40,8 +39,8 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace)) {
- _zkClient.deleteRecursive(namespace);
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
}
_setupTool = new ClusterSetup(ZK_ADDR);
@@ -58,13 +57,12 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
@Override
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
+
}
@Override
@AfterClass()
public void afterClass() {
- _zkClient.close();
}
@Test()
@@ -72,9 +70,8 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
setupCluster();
String controllerMsgPath =
HelixUtil.getControllerPropertyPath(CLUSTER_NAME, PropertyType.MESSAGES_CONTROLLER);
- _zkClient.deleteRecursive(controllerMsgPath);
+ _gZkClient.deleteRecursive(controllerMsgPath);
HelixManager manager = null;
- ;
try {
manager =
@@ -106,7 +103,7 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
setupCluster();
String stateModelPath = HelixUtil.getStateModelDefinitionPath(CLUSTER_NAME);
- _zkClient.deleteRecursive(stateModelPath);
+ _gZkClient.deleteRecursive(stateModelPath);
try {
manager =
@@ -125,7 +122,7 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
String instanceStatusUpdatePath =
HelixUtil.getInstancePropertyPath(CLUSTER_NAME, "localhost_" + (START_PORT + 1),
PropertyType.STATUSUPDATES);
- _zkClient.deleteRecursive(instanceStatusUpdatePath);
+ _gZkClient.deleteRecursive(instanceStatusUpdatePath);
try {
manager =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
index 43cfa5a..5467932 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
@@ -32,18 +32,6 @@ import org.testng.annotations.Test;
public class TestCustomIdealState extends ZkIntegrationTestBase {
private static Logger LOG = Logger.getLogger(TestCustomIdealState.class);
- ZkClient _zkClient;
-
- @BeforeClass
- public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- }
-
- @AfterClass
- public void afterClass() {
- _zkClient.close();
- }
@Test
public void testBasic() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 7811c0d..f797d0f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -81,12 +81,12 @@ public class TestCustomizedIdealStateRebalancer extends
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 3);
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, db2));
Assert.assertTrue(result);
Thread.sleep(1000);
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
Assert.assertEquals(ev.getPartitionSet().size(), 60);
@@ -116,7 +116,7 @@ public class TestCustomizedIdealStateRebalancer extends
public boolean verify() {
try {
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
Builder keyBuilder = accessor.keyBuilder();
int numberOfPartitions =
accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
index 3341e6b..4744508 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
@@ -27,10 +27,10 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.tools.ClusterSetup;
@@ -52,7 +52,7 @@ public class TestDisable extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -72,14 +72,15 @@ public class TestDisable extends ZkIntegrationTestBase {
accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -120,7 +121,6 @@ public class TestDisable extends ZkIntegrationTestBase {
// clean up
// wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
@@ -140,7 +140,7 @@ public class TestDisable extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -152,14 +152,15 @@ public class TestDisable extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -200,7 +201,6 @@ public class TestDisable extends ZkIntegrationTestBase {
// clean up
// wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
@@ -219,7 +219,7 @@ public class TestDisable extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -239,14 +239,15 @@ public class TestDisable extends ZkIntegrationTestBase {
accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -292,7 +293,6 @@ public class TestDisable extends ZkIntegrationTestBase {
// clean up
// wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
@@ -311,7 +311,7 @@ public class TestDisable extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -323,14 +323,15 @@ public class TestDisable extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -376,7 +377,6 @@ public class TestDisable extends ZkIntegrationTestBase {
// clean up
// wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
index 93c765c..be523d0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
@@ -38,7 +38,7 @@ public class TestDisableNode extends ZkStandAloneCMTestBaseWithPropertyServerChe
ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
- ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+ ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.enableInstance(CLUSTER_NAME, PARTICIPANT_PREFIX + "_12918", true);
result =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
index 3067a0b..ba7e8e4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
@@ -56,7 +56,7 @@ public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServ
TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "OFFLINE");
- ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+ ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.enablePartition(true, CLUSTER_NAME, "localhost_12919", "TestDB", Arrays.asList("TestDB_9"));
result =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
index 29627a2..e3b0cc7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
@@ -25,10 +25,10 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -77,11 +77,10 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
"LeaderStandby", true); // do rebalance
// start distributed cluster controllers
- ClusterController[] controllers = new ClusterController[n + n];
+ ClusterDistributedController[] controllers = new ClusterDistributedController[n + n];
for (int i = 0; i < n; i++) {
controllers[i] =
- new ClusterController(controllerClusterName, "controller_" + i, ZK_ADDR,
- HelixControllerMain.DISTRIBUTED.toString());
+ new ClusterDistributedController(ZK_ADDR, controllerClusterName, "controller_" + i);
controllers[i].syncStart();
}
@@ -92,11 +91,11 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
Assert.assertTrue(result, "Controller cluster NOT in ideal state");
// start first cluster
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
final String firstClusterName = clusterNamePrefix + "0_0";
for (int i = 0; i < n; i++) {
String instanceName = "localhost0_" + (12918 + i);
- participants[i] = new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, firstClusterName, instanceName);
participants[i].syncStart();
}
@@ -114,8 +113,7 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
setupTool.rebalanceStorageCluster(controllerClusterName, clusterNamePrefix + "0", 6);
for (int i = n; i < 2 * n; i++) {
controllers[i] =
- new ClusterController(controllerClusterName, "controller_" + i, ZK_ADDR,
- HelixControllerMain.DISTRIBUTED.toString());
+ new ClusterDistributedController(ZK_ADDR, controllerClusterName, "controller_" + i);
controllers[i].syncStart();
}
@@ -157,7 +155,6 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
// clean up
// wait for all zk callbacks done
System.out.println("Cleaning up...");
- Thread.sleep(2000);
for (int i = 0; i < 5; i++) {
result =
ClusterStateVerifier
@@ -166,7 +163,6 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
controllers[i].syncStop();
}
- // Thread.sleep(2000);
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
index ed60971..3cf4ed5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
@@ -25,10 +25,10 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -76,11 +76,10 @@ public class TestDistributedClusterController extends ZkIntegrationTestBase {
"LeaderStandby", true); // do rebalance
// start distributed cluster controllers
- ClusterController[] controllers = new ClusterController[n];
+ ClusterDistributedController[] controllers = new ClusterDistributedController[n];
for (int i = 0; i < n; i++) {
controllers[i] =
- new ClusterController(controllerClusterName, "controller_" + i, ZK_ADDR,
- HelixControllerMain.DISTRIBUTED.toString());
+ new ClusterDistributedController(ZK_ADDR, controllerClusterName, "controller_" + i);
controllers[i].syncStart();
}
@@ -91,11 +90,11 @@ public class TestDistributedClusterController extends ZkIntegrationTestBase {
Assert.assertTrue(result, "Controller cluster NOT in ideal state");
// start first cluster
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
final String firstClusterName = clusterNamePrefix + "0_0";
for (int i = 0; i < n; i++) {
String instanceName = "localhost0_" + (12918 + i);
- participants[i] = new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, firstClusterName, instanceName);
participants[i].syncStart();
}
@@ -114,11 +113,11 @@ public class TestDistributedClusterController extends ZkIntegrationTestBase {
controllers[j].syncStop();
// setup the second cluster
- MockParticipant[] participants2 = new MockParticipant[n];
+ MockParticipantManager[] participants2 = new MockParticipantManager[n];
final String secondClusterName = clusterNamePrefix + "0_1";
for (int i = 0; i < n; i++) {
String instanceName = "localhost1_" + (12918 + i);
- participants2[i] = new MockParticipant(secondClusterName, instanceName, ZK_ADDR, null);
+ participants2[i] = new MockParticipantManager(ZK_ADDR, secondClusterName, instanceName);
participants2[i].syncStart();
}
@@ -130,7 +129,6 @@ public class TestDistributedClusterController extends ZkIntegrationTestBase {
// clean up
// wait for all zk callbacks done
System.out.println("Cleaning up...");
- Thread.sleep(1000);
for (int i = 0; i < 5; i++) {
result =
ClusterStateVerifier
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
index 951607b..4dad7c0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
@@ -33,8 +33,9 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.IdealState.IdealStateProperty;
@@ -79,10 +80,8 @@ public class TestDriver {
public final int _numNode;
public final int _replica;
- // public final Map<String, ZNRecord> _idealStateMap = new
- // ConcurrentHashMap<String, ZNRecord>();
- public final Map<String, StartCMResult> _startCMResultMap =
- new ConcurrentHashMap<String, StartCMResult>();
+ public final Map<String, HelixManager> _managers =
+ new ConcurrentHashMap<String, HelixManager>();
public TestInfo(String clusterName, ZkClient zkClient, int numDb, int numPartitionsPerDb,
int numNode, int replica) {
@@ -118,10 +117,6 @@ public class TestDriver {
replica, true);
}
- // public static void setupCluster(String uniqTestName, ZkClient zkClient, int
- // numDb,
- // int numPartitionPerDb, int numNodes, int replica, boolean doRebalance)
- // throws Exception
public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
int numPartitionsPerResource, int numInstances, int replica, boolean doRebalance)
throws Exception {
@@ -193,11 +188,15 @@ public class TestDriver {
for (int id : instanceIds) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + id);
- if (testInfo._startCMResultMap.containsKey(instanceName)) {
+ // if (testInfo._startCMResultMap.containsKey(instanceName)) {
+ if (testInfo._managers.containsKey(instanceName)) {
LOG.warn("Dummy participant:" + instanceName + " has already started; skip starting it");
} else {
- StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, clusterName, instanceName);
- testInfo._startCMResultMap.put(instanceName, result);
+ // StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, clusterName, instanceName);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participant.syncStart();
+ testInfo._managers.put(instanceName, participant);
// testInfo._instanceStarted.countDown();
}
}
@@ -220,13 +219,13 @@ public class TestDriver {
for (int id : nodeIds) {
String controllerName = CONTROLLER_PREFIX + "_" + id;
- if (testInfo._startCMResultMap.containsKey(controllerName)) {
+ if (testInfo._managers.containsKey(controllerName)) {
LOG.warn("Controller:" + controllerName + " has already started; skip starting it");
} else {
- StartCMResult result =
- TestHelper.startController(clusterName, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- testInfo._startCMResultMap.put(controllerName, result);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, controllerName);
+ controller.syncStart();
+ testInfo._managers.put(controllerName, controller);
}
}
}
@@ -257,27 +256,22 @@ public class TestDriver {
TestInfo testInfo = _testInfoMap.remove(uniqClusterName);
// stop controller first
- for (Iterator<Entry<String, StartCMResult>> it =
- testInfo._startCMResultMap.entrySet().iterator(); it.hasNext();) {
- Map.Entry<String, StartCMResult> entry = it.next();
- String instanceName = entry.getKey();
+ for (String instanceName : testInfo._managers.keySet()) {
if (instanceName.startsWith(CONTROLLER_PREFIX)) {
- it.remove();
- HelixManager manager = entry.getValue()._manager;
- manager.disconnect();
- Thread thread = entry.getValue()._thread;
- thread.interrupt();
+ ClusterControllerManager controller =
+ (ClusterControllerManager) testInfo._managers.get(instanceName);
+ controller.syncStop();
}
}
Thread.sleep(1000);
- // stop the rest
- for (Map.Entry<String, StartCMResult> entry : testInfo._startCMResultMap.entrySet()) {
- HelixManager manager = entry.getValue()._manager;
- manager.disconnect();
- Thread thread = entry.getValue()._thread;
- thread.interrupt();
+ for (String instanceName : testInfo._managers.keySet()) {
+ if (!instanceName.startsWith(CONTROLLER_PREFIX)) {
+ MockParticipantManager participant =
+ (MockParticipantManager) testInfo._managers.get(instanceName);
+ participant.syncStop();
+ }
}
testInfo._zkClient.close();
@@ -292,23 +286,24 @@ public class TestDriver {
}
TestInfo testInfo = _testInfoMap.get(uniqClusterName);
- // String clusterName = testInfo._clusterName;
String failHost = PARTICIPANT_PREFIX + "_" + (START_PORT + instanceId);
- StartCMResult result = testInfo._startCMResultMap.remove(failHost);
+ MockParticipantManager participant =
+ (MockParticipantManager) testInfo._managers.remove(failHost);
// TODO need sync
- if (result == null || result._manager == null || result._thread == null) {
+ if (participant == null) {
String errMsg = "Dummy participant:" + failHost + " seems not running";
LOG.error(errMsg);
} else {
// System.err.println("try to stop participant: " +
// result._manager.getInstanceName());
- NodeOpArg arg = new NodeOpArg(result._manager, result._thread);
- TestCommand command = new TestCommand(CommandType.STOP, new TestTrigger(beginTime), arg);
- List<TestCommand> commandList = new ArrayList<TestCommand>();
- commandList.add(command);
- TestExecutor.executeTestAsync(commandList, ZK_ADDR);
+ // NodeOpArg arg = new NodeOpArg(result._manager, result._thread);
+ // TestCommand command = new TestCommand(CommandType.STOP, new TestTrigger(beginTime), arg);
+ // List<TestCommand> commandList = new ArrayList<TestCommand>();
+ // commandList.add(command);
+ // TestExecutor.executeTestAsync(commandList, ZK_ADDR);
+ participant.syncStop();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
index b1fcc60..5b54fad 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
@@ -25,27 +25,21 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.IdealStateBuilder;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -60,7 +54,7 @@ public class TestDrop extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -72,7 +66,8 @@ public class TestDrop extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
@@ -84,11 +79,10 @@ public class TestDrop extends ZkIntegrationTestBase {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new ErrTransition(
- errTransitions));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errTransitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -114,13 +108,10 @@ public class TestDrop extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
- // wait for all zk callbacks done
- // Thread.sleep(1000);
- // controller.syncStop();
- // for (int i = 0; i < 5; i++)
- // {
- // participants[i].syncStop();
- // }
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@@ -135,7 +126,7 @@ public class TestDrop extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -147,7 +138,8 @@ public class TestDrop extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
@@ -159,11 +151,10 @@ public class TestDrop extends ZkIntegrationTestBase {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new ErrTransition(
- errTransitions));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errTransitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -198,13 +189,10 @@ public class TestDrop extends ZkIntegrationTestBase {
Assert.assertEquals(disabledPartitions.get(0), "TestDB0_4");
// clean up
- // wait for all zk callbacks done
- // Thread.sleep(1000);
- // controller.syncStop();
- // for (int i = 0; i < 5; i++)
- // {
- // participants[i].syncStop();
- // }
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@@ -219,7 +207,7 @@ public class TestDrop extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -246,7 +234,8 @@ public class TestDrop extends ZkIntegrationTestBase {
accessor.setProperty(keyBuiler.idealStates("TestDB0"), isBuilder.build());
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
@@ -257,11 +246,10 @@ public class TestDrop extends ZkIntegrationTestBase {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new ErrTransition(
- errTransitions));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errTransitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -285,6 +273,12 @@ public class TestDrop extends ZkIntegrationTestBase {
clusterName));
Assert.assertTrue(result, "Should be empty exeternal-view");
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@@ -298,7 +292,7 @@ public class TestDrop extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -310,14 +304,15 @@ public class TestDrop extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -357,10 +352,8 @@ public class TestDrop extends ZkIntegrationTestBase {
"schemata externalView should be empty but was \"" + extView + "\"");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < n; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
index ba88370..0d02d12 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
@@ -20,7 +20,7 @@ package org.apache.helix.integration;
*/
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
@@ -59,9 +59,8 @@ public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCh
String hostToKill = "localhost_12920";
- _startCMResultMap.get(hostToKill)._manager.disconnect();
+ _participants[2].syncStop();
Thread.sleep(1000);
- _startCMResultMap.get(hostToKill)._thread.interrupt();
String command = "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB2";
ClusterSetup.processCommandLineArgs(command.split(" "));
@@ -70,8 +69,8 @@ public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCh
TestHelper.<String> setOf("localhost_12918", "localhost_12919",
/* "localhost_12920", */"localhost_12921", "localhost_12922"), ZK_ADDR);
- StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, hostToKill);
- _startCMResultMap.put(hostToKill, result);
+ _participants[2] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, hostToKill);
+ _participants[2].syncStart();
TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 30 * 1000, CLUSTER_NAME, "MyDB2",
TestHelper.<String> setOf("localhost_12918", "localhost_12919", "localhost_12920",
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java b/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
index 48cabbd..3dc92f7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
@@ -24,17 +24,20 @@ import java.util.Date;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase {
+ private static Logger LOG = Logger.getLogger(TestEnablePartitionDuringDisable.class);
+
static {
// Logger.getRootLogger().setLevel(Level.INFO);
}
@@ -63,8 +66,7 @@ public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase {
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Exception in cluster setup", e);
}
} else if (slaveToOfflineCnt > 0 && fromState.equals("OFFLINE") && toState.equals("SLAVE")) {
@@ -93,19 +95,21 @@ public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase {
3, // replicas
"MasterSlave", true); // do rebalance
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
EnablePartitionTransition transition = new EnablePartitionTransition();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
if (instanceName.equals("localhost_12919")) {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, transition);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(transition);
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -138,8 +142,6 @@ public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase {
Assert.assertEquals(transition.offlineToSlave, 1, "should get 1 offlineToSlave transition");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
index bebefd1..71c4339 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
@@ -26,9 +26,9 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
@@ -38,7 +38,7 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
@Test()
public void testErrorPartition() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START testErrorPartition() at " + new Date(System.currentTimeMillis()));
ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
@@ -46,8 +46,10 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 10, 5, 3,
"MasterSlave", true);
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
@@ -57,14 +59,12 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
}
};
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
Map<String, Map<String, String>> errStates = new HashMap<String, Map<String, String>>();
@@ -113,7 +113,7 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, clusterName));
Assert.assertTrue(result);
- participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR);
+ participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
new Thread(participants[0]).start();
result =
@@ -121,6 +121,12 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
ZK_ADDR, clusterName));
Assert.assertTrue(result);
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END testErrorPartition() at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java b/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
index 26da639..b3c34e4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
@@ -26,9 +26,9 @@ import java.util.List;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
@@ -42,7 +42,7 @@ public class TestExternalViewUpdates extends ZkIntegrationTestBase {
System.out.println("START testExternalViewUpdates at " + new Date(System.currentTimeMillis()));
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
int resourceNb = 10;
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -53,13 +53,15 @@ public class TestExternalViewUpdates extends ZkIntegrationTestBase {
1, // replicas
"MasterSlave", true); // do rebalance
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -90,8 +92,11 @@ public class TestExternalViewUpdates extends ZkIntegrationTestBase {
Assert.assertTrue(stat.getVersion() <= 2, "ExternalView should be updated at most 2 times");
}
- // TODO: need stop controller and participants
-
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END testExternalViewUpdates at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
index b2bb561..f6a7098 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
@@ -27,21 +27,23 @@ import org.apache.helix.TestHelper;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockJobIntf;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.participant.CustomCodeCallbackHandler;
import org.apache.helix.participant.HelixCustomCodeRunner;
import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
+ private static Logger LOG = Logger.getLogger(TestHelixCustomCodeRunner.class);
+
private final String _clusterName = "CLUSTER_" + getShortClassName();
private final int _nodeNb = 5;
private final int _startPort = 12918;
@@ -74,8 +76,7 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
customCodeRunner.invoke(_callback).on(ChangeType.LIVE_INSTANCE)
.usingLeaderStandbyModel("TestParticLeader").start();
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Exception do pre-connect job", e);
}
}
@@ -100,16 +101,19 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
_nodeNb, // replica
"MasterSlave", true);
- TestHelper.startController(_clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+ controller.syncStart();
- MockParticipant[] partics = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
for (int i = 0; i < _nodeNb; i++) {
String instanceName = "localhost_" + (_startPort + i);
- partics[i] = new MockParticipant(_clusterName, instanceName, ZK_ADDR, null, new MockJob());
- partics[i].syncStart();
- // new Thread(partics[i]).start();
+ MockJob job = new MockJob();
+ participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+
+ job.doPreConnectJob(participants[i]);
+ participants[i].syncStart();
}
boolean result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -121,20 +125,26 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
_callback._isCallbackInvoked = false;
// add a new live instance
- ZkClient zkClient = new ZkClient(ZK_ADDR);
- zkClient.setZkSerializer(new ZNRecordSerializer());
+ // ZkClient zkClient = new ZkClient(ZK_ADDR);
+ // zkClient.setZkSerializer(new ZNRecordSerializer());
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(zkClient));
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
LiveInstance newLiveIns = new LiveInstance("newLiveInstance");
- newLiveIns.setHelixVersion("0.0.0");
+ newLiveIns.setHelixVersion("0.6.0");
newLiveIns.setSessionId("randomSessionId");
accessor.setProperty(keyBuilder.liveInstance("newLiveInstance"), newLiveIns);
Thread.sleep(1000); // wait for the CALLBACK type callback to finish
Assert.assertTrue(_callback._isCallbackInvoked);
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < _nodeNb; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
index 4484386..1f906d0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
@@ -33,8 +33,7 @@ import org.testng.annotations.Test;
public class TestHelixInstanceTag extends ZkStandAloneCMTestBase {
@Test
public void testInstanceTag() throws Exception {
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller;
HelixDataAccessor accessor = manager.getHelixDataAccessor();
String DB2 = "TestDB2";
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java b/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
index f69aeec..2761634 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
@@ -2,8 +2,7 @@ package org.apache.helix.integration;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.IdealState.RebalanceMode;
@@ -35,8 +34,7 @@ public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase {
@Test
public void testInstanceAutoJoin() throws Exception {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ HelixManager manager = _participants[0];
HelixDataAccessor accessor = manager.getHelixDataAccessor();
_setupTool.addResourceToCluster(CLUSTER_NAME, db2, 60, "OnlineOffline", RebalanceMode.FULL_AUTO
@@ -44,10 +42,13 @@ public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase {
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 1);
String instance2 = "localhost_279699";
- StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
+ // StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
+ MockParticipantManager newParticipant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance2);
+ newParticipant.syncStart();
Thread.sleep(500);
- Assert.assertFalse(result._thread.isAlive());
+ // Assert.assertFalse(result._thread.isAlive());
Assert.assertTrue(null == manager.getHelixDataAccessor().getProperty(
accessor.keyBuilder().liveInstance(instance2)));
@@ -55,12 +56,11 @@ public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase {
manager.getConfigAccessor().set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true");
- result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
-
- StartCMResult result2 = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
+ newParticipant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance2);
+ newParticipant.syncStart();
Thread.sleep(500);
- Assert.assertTrue(result._thread.isAlive() || result2._thread.isAlive());
+ // Assert.assertTrue(result._thread.isAlive() || result2._thread.isAlive());
for (int i = 0; i < 20; i++) {
if (null == manager.getHelixDataAccessor().getProperty(
accessor.keyBuilder().liveInstance(instance2))) {
@@ -71,9 +71,6 @@ public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase {
Assert.assertTrue(null != manager.getHelixDataAccessor().getProperty(
accessor.keyBuilder().liveInstance(instance2)));
- result._manager.disconnect();
- result2._manager.disconnect();
- result._thread.interrupt();
- result2._thread.interrupt();
+ newParticipant.syncStop();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
index 837d0a1..8768760 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
@@ -28,9 +28,9 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
@@ -40,6 +40,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.testng.Assert;
+import org.testng.annotations.Test;
// Helix-50: integration test for generate message based on state priority
public class TestInvalidAutoIdealState extends ZkUnitTestBase {
@@ -90,15 +91,16 @@ public class TestInvalidAutoIdealState extends ZkUnitTestBase {
admin.setResourceIdealState(clusterName, "TestDB", idealState);
// start participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
boolean result =
@@ -107,7 +109,7 @@ public class TestInvalidAutoIdealState extends ZkUnitTestBase {
Assert.assertTrue(result);
// make sure localhost_12919 is master on TestDB_1
- HelixDataAccessor accessor = controller.getManager().getHelixDataAccessor();
+ HelixDataAccessor accessor = controller.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
ExternalView extView = accessor.getProperty(keyBuilder.externalView(db));
Map<String, String> stateMap = extView.getStateMap(db + "_1");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
index 487e689..745f2f2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
@@ -37,9 +37,9 @@ import org.testng.annotations.Test;
public class TestMessagePartitionStateMismatch extends ZkStandAloneCMTestBase {
@Test
public void testStateMismatch() throws InterruptedException {
- String controllerName = CONTROLLER_PREFIX + "_0";
+ // String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller; // _startCMResultMap.get(controllerName)._manager;
HelixDataAccessor accessor = manager.getHelixDataAccessor();
Builder kb = accessor.keyBuilder();
ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
index fbff4e1..fc6d6bc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
@@ -21,8 +21,6 @@ package org.apache.helix.integration;
import java.util.Date;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.I0Itec.zkclient.IZkChildListener;
@@ -31,17 +29,13 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
-import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.builder.ClusterConstraintsBuilder;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -55,8 +49,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
// Logger.getRootLogger().setLevel(Level.INFO);
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
- // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -120,13 +113,15 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
});
}
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -143,6 +138,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
Assert.assertTrue(success.get());
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
index cdd644b..7d66780 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
@@ -32,7 +32,6 @@ import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.InstanceConfigChangeListener;
import org.apache.helix.InstanceType;
@@ -245,8 +244,7 @@ public class TestMessageThrottle2 extends ZkIntegrationTestBase {
public void start() throws Exception {
helixManager =
- HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
- InstanceType.PARTICIPANT, ZK_ADDR);
+ new ZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
{
// hack to set sessionTimeout
Field sessionTimeout = ZKHelixManager.class.getDeclaredField("_sessionTimeout");