You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:12:24 UTC
[09/52] [abbrv] git commit: [HELIX-279] Apply gc handling fixes to
ZKHelixManager
[HELIX-279] Apply gc handling fixes to ZKHelixManager
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/f8e3b1af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/f8e3b1af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/f8e3b1af
Branch: refs/heads/helix-yarn
Commit: f8e3b1af5c94b779d4244137131dbabc593ca0e8
Parents: 18a8c7c
Author: zzhang <zz...@apache.org>
Authored: Mon Nov 11 13:09:24 2013 -0800
Committer: zzhang <zz...@apache.org>
Committed: Mon Nov 11 13:09:24 2013 -0800
----------------------------------------------------------------------
.../tools/TestHelixAdminScenariosRest.java | 889 +++++++++------
.../apache/helix/tools/TestResetInstance.java | 21 +-
.../helix/tools/TestResetPartitionState.java | 24 +-
.../apache/helix/tools/TestResetResource.java | 22 +-
.../org/apache/helix/agent/TestHelixAgent.java | 27 +-
.../java/org/apache/helix/HelixManager.java | 7 +
.../java/org/apache/helix/HelixProperty.java | 6 +-
.../rebalancer/SemiAutoRebalancer.java | 3 +
.../util/ConstraintBasedAssignment.java | 55 +-
.../stages/BestPossibleStateCalcStage.java | 64 +-
.../stages/CurrentStateComputationStage.java | 13 +-
.../stages/MessageGenerationStage.java | 2 +-
.../stages/MessageSelectionStage.java | 9 +-
.../controller/stages/MessageThrottleStage.java | 11 +-
.../controller/stages/ResourceCurrentState.java | 13 +-
.../controller/stages/TaskAssignmentStage.java | 12 +-
.../helix/manager/zk/AbstractManager.java | 691 ------------
.../helix/manager/zk/ControllerManager.java | 174 ---
.../manager/zk/ControllerManagerHelper.java | 7 +-
.../DefaultSchedulerMessageHandlerFactory.java | 1 +
.../zk/DistributedControllerManager.java | 190 ----
.../manager/zk/DistributedLeaderElection.java | 16 +-
.../manager/zk/HelixConnectionAdaptor.java | 6 +
.../helix/manager/zk/ParticipantManager.java | 153 ---
.../manager/zk/ParticipantManagerHelper.java | 27 +-
.../apache/helix/manager/zk/ZKHelixManager.java | 1041 +++++++++---------
.../helix/manager/zk/ZkAsyncCallbacks.java | 3 +-
.../helix/manager/zk/ZkStateChangeListener.java | 127 ---
.../messaging/DefaultMessagingService.java | 4 +-
.../apache/helix/model/ResourceAssignment.java | 19 +-
.../src/test/java/org/apache/helix/Mocks.java | 6 +
.../test/java/org/apache/helix/TestHelper.java | 97 +-
.../org/apache/helix/TestZkClientWrapper.java | 45 +-
.../java/org/apache/helix/TestZnodeModify.java | 3 +-
.../org/apache/helix/ZkHelixTestManager.java | 44 -
.../org/apache/helix/api/TestNewStages.java | 12 +-
.../controller/stages/DummyClusterManager.java | 6 +
.../stages/TestMessageThrottleStage.java | 30 +-
.../stages/TestParseInfoFromAlert.java | 5 +-
.../stages/TestRebalancePipeline.java | 19 +-
.../strategy/TestShufflingTwoStateStrategy.java | 68 +-
.../helix/healthcheck/TestAddDropAlert.java | 35 +-
.../healthcheck/TestAlertActionTriggering.java | 21 +-
.../helix/healthcheck/TestAlertFireHistory.java | 38 +-
.../helix/healthcheck/TestDummyAlerts.java | 14 +-
.../helix/healthcheck/TestExpandAlert.java | 40 +-
.../helix/healthcheck/TestSimpleAlert.java | 40 +-
.../healthcheck/TestSimpleWildcardAlert.java | 42 +-
.../helix/healthcheck/TestStalenessAlert.java | 36 +-
.../helix/healthcheck/TestWildcardAlert.java | 36 +-
.../helix/integration/TestAddClusterV2.java | 78 +-
.../TestAddNodeAfterControllerStart.java | 56 +-
.../TestAddStateModelFactoryAfterConnect.java | 14 +-
.../integration/TestAutoIsWithEmptyMap.java | 14 +-
.../helix/integration/TestAutoRebalance.java | 64 +-
.../TestAutoRebalancePartitionLimit.java | 102 +-
.../helix/integration/TestBatchMessage.java | 50 +-
.../integration/TestBatchMessageWrapper.java | 18 +-
.../integration/TestBucketizedResource.java | 15 +-
.../integration/TestCarryOverBadCurState.java | 19 +-
.../integration/TestCleanupExternalView.java | 20 +-
.../helix/integration/TestClusterStartsup.java | 15 +-
.../helix/integration/TestCustomIdealState.java | 12 -
.../TestCustomizedIdealStateRebalancer.java | 4 +-
.../apache/helix/integration/TestDisable.java | 36 +-
.../helix/integration/TestDisableNode.java | 2 +-
.../helix/integration/TestDisablePartition.java | 2 +-
.../integration/TestDistributedCMMain.java | 18 +-
.../TestDistributedClusterController.java | 18 +-
.../apache/helix/integration/TestDriver.java | 80 +-
.../org/apache/helix/integration/TestDrop.java | 150 ++-
.../helix/integration/TestDropResource.java | 9 +-
.../TestEnablePartitionDuringDisable.java | 22 +-
.../helix/integration/TestErrorPartition.java | 28 +-
.../integration/TestExternalViewUpdates.java | 21 +-
.../integration/TestHelixCustomCodeRunner.java | 40 +-
.../helix/integration/TestHelixInstanceTag.java | 3 +-
.../helix/integration/TestInstanceAutoJoin.java | 25 +-
.../integration/TestInvalidAutoIdealState.java | 14 +-
.../TestMessagePartitionStateMismatch.java | 4 +-
.../helix/integration/TestMessageThrottle.java | 16 +-
.../helix/integration/TestMessageThrottle2.java | 4 +-
.../helix/integration/TestMessagingService.java | 81 +-
.../integration/TestNonOfflineInitState.java | 28 +-
.../helix/integration/TestNullReplica.java | 15 +-
.../TestParticipantErrorMessage.java | 14 +-
.../TestParticipantNameCollision.java | 9 +-
.../helix/integration/TestPauseSignal.java | 15 +-
.../integration/TestRedefineStateModelDef.java | 13 +-
.../helix/integration/TestRenamePartition.java | 45 +-
.../helix/integration/TestResetInstance.java | 18 +-
.../integration/TestResetPartitionState.java | 17 +-
.../helix/integration/TestResetResource.java | 18 +-
.../integration/TestRestartParticipant.java | 41 +-
.../helix/integration/TestSchedulerMessage.java | 586 +++++++---
.../integration/TestSchedulerMsgContraints.java | 254 -----
.../integration/TestSchedulerMsgUsingQueue.java | 181 ---
.../helix/integration/TestSchemataSM.java | 14 +-
.../TestSessionExpiryInTransition.java | 29 +-
.../helix/integration/TestStandAloneCMMain.java | 40 +-
.../TestStandAloneCMSessionExpiry.java | 22 +-
...estStartMultipleControllersWithSameName.java | 9 +-
.../integration/TestStateTransitionTimeout.java | 38 +-
.../helix/integration/TestSwapInstance.java | 16 +-
.../integration/TestZkCallbackHandlerLeak.java | 89 +-
.../integration/ZkIntegrationTestBase.java | 49 +-
.../integration/ZkStandAloneCMTestBase.java | 63 +-
...dAloneCMTestBaseWithPropertyServerCheck.java | 25 +-
.../manager/ClusterControllerManager.java | 17 +-
.../manager/ClusterDistributedController.java | 20 +-
.../manager/MockParticipantManager.java | 8 +-
.../TestDistributedControllerManager.java | 9 +-
.../manager/TestParticipantManager.java | 26 +-
.../apache/helix/josql/TestJosqlProcessor.java | 8 +-
.../TestDefaultControllerMsgHandlerFactory.java | 9 +-
.../helix/manager/zk/TestHandleNewSession.java | 16 +-
.../manager/zk/TestLiveInstanceBounce.java | 20 +-
.../zk/TestZKPropertyTransferServer.java | 21 +-
.../helix/manager/zk/TestZkClusterManager.java | 16 +-
.../apache/helix/manager/zk/TestZkFlapping.java | 14 +-
.../zk/TestZkManagerFlappingDetection.java | 117 +-
.../manager/zk/TestZkStateChangeListener.java | 54 +-
.../handling/TestConfigThreadpoolSize.java | 10 +-
.../handling/TestResourceThreadpoolSize.java | 8 +-
.../mock/controller/ClusterController.java | 127 ---
.../MockHealthReportParticipant.java | 16 +-
.../helix/mock/participant/MockParticipant.java | 181 ---
.../helix/participant/MockZKHelixManager.java | 6 +
.../TestDistControllerStateModel.java | 13 +-
.../apache/helix/tools/TestHelixAdminCli.java | 162 +--
.../helix/userdefinedrebalancer/Lock.java | 5 +
131 files changed, 3175 insertions(+), 4684 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
index 04a70d2..6a0e331 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
@@ -19,24 +19,19 @@ package org.apache.helix.tools;
* under the License.
*/
-/*
- * Simulate all the admin tasks needed by using command line tool
- *
- * */
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState.IdealStateProperty;
@@ -64,8 +59,10 @@ import org.restlet.representation.Representation;
import org.testng.Assert;
import org.testng.annotations.Test;
+/**
+ * Simulate all the admin tasks needed by using command line tool
+ */
public class TestHelixAdminScenariosRest extends AdminTestBase {
- Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
RestAdminApplication _adminApp;
Component _component;
String _tag1 = "tag1123";
@@ -90,55 +87,6 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
return mapper.readValue(sr, clazz);
}
- @Test
- public void testAddDeleteClusterAndInstanceAndResource() throws Exception {
- // Helix bug helix-102
- // ZKPropertyTransferServer.PERIOD = 500;
- // ZkPropertyTransferClient.SEND_PERIOD = 500;
- // ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
-
- /** ======================= Add clusters ============================== */
-
- testAddCluster();
-
- /** ================= Add / drop some resources =========================== */
-
- testAddResource();
-
- /** ====================== Add / delete instances =========================== */
-
- testAddInstance();
-
- /** ===================== Rebalance resource =========================== */
-
- testRebalanceResource();
-
- /** ==================== start the clusters ============================= */
-
- testStartCluster();
-
- /** ==================== drop add resource in live clusters =================== */
- testDropAddResource();
-
- /** ======================Operations with live node ============================ */
-
- testInstanceOperations();
-
- /** ======================Operations with partitions ============================ */
-
- testEnablePartitions();
-
- /** ============================ expand cluster =========================== */
-
- testExpandCluster();
-
- /** ============================ deactivate cluster =========================== */
- testDeactivateCluster();
-
- // wait all zk callbacks done
- Thread.sleep(1000);
- }
-
static String assertSuccessPostOperation(String url, Map<String, String> jsonParameters,
boolean hasException) throws IOException {
Reference resourceRef = new Reference(url);
@@ -226,41 +174,46 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
Assert.assertTrue(exceptionThrown);
}
+ private Map<String, String> addClusterCmd(String clusterName) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.CLUSTER_NAME, clusterName);
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
+
+ return parameters;
+ }
+
+ private void addCluster(String clusterName) throws IOException {
+ String url = "http://localhost:" + ADMIN_PORT + "/clusters";
+ String response = assertSuccessPostOperation(url, addClusterCmd(clusterName), false);
+ Assert.assertTrue(response.contains(clusterName));
+ }
+
+ @Test
public void testAddCluster() throws Exception {
String url = "http://localhost:" + ADMIN_PORT + "/clusters";
- Map<String, String> paraMap = new HashMap<String, String>();
// Normal add
- paraMap.put(JsonParameters.CLUSTER_NAME, "clusterTest");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
-
- String response = assertSuccessPostOperation(url, paraMap, false);
+ String response = assertSuccessPostOperation(url, addClusterCmd("clusterTest"), false);
Assert.assertTrue(response.contains("clusterTest"));
// malformed cluster name
- paraMap.put(JsonParameters.CLUSTER_NAME, "/ClusterTest");
- response = assertSuccessPostOperation(url, paraMap, true);
+ response = assertSuccessPostOperation(url, addClusterCmd("/ClusterTest"), true);
// Add the grand cluster
- paraMap.put(JsonParameters.CLUSTER_NAME, "Klazt3rz");
- response = assertSuccessPostOperation(url, paraMap, false);
+ response = assertSuccessPostOperation(url, addClusterCmd("Klazt3rz"), false);
Assert.assertTrue(response.contains("Klazt3rz"));
- paraMap.put(JsonParameters.CLUSTER_NAME, "\\ClusterTest");
- response = assertSuccessPostOperation(url, paraMap, false);
+ response = assertSuccessPostOperation(url, addClusterCmd("\\ClusterTest"), false);
Assert.assertTrue(response.contains("\\ClusterTest"));
// Add already exist cluster
- paraMap.put(JsonParameters.CLUSTER_NAME, "clusterTest");
- response = assertSuccessPostOperation(url, paraMap, true);
+ response = assertSuccessPostOperation(url, addClusterCmd("clusterTest"), true);
// delete cluster without resource and instance
Assert.assertTrue(ZKUtil.isClusterSetup("Klazt3rz", _gZkClient));
Assert.assertTrue(ZKUtil.isClusterSetup("clusterTest", _gZkClient));
Assert.assertTrue(ZKUtil.isClusterSetup("\\ClusterTest", _gZkClient));
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.dropCluster);
-
String clusterUrl = getClusterUrl("\\ClusterTest");
deleteUrl(clusterUrl, false);
@@ -284,94 +237,180 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
Assert.assertFalse(_gZkClient.exists("/clusterTest1"));
Assert.assertFalse(_gZkClient.exists("/clusterTestOK"));
- paraMap.put(JsonParameters.CLUSTER_NAME, "clusterTest1");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
- response = assertSuccessPostOperation(url, paraMap, false);
+ response = assertSuccessPostOperation(url, addClusterCmd("clusterTest1"), false);
response = getUrl(clustersUrl);
Assert.assertTrue(response.contains("clusterTest1"));
}
- public void testAddResource() throws Exception {
- String reourcesUrl = "http://localhost:" + ADMIN_PORT + "/clusters/clusterTest1/resourceGroups";
+ private Map<String, String> addResourceCmd(String resourceName, String stateModelDef,
+ int partition) {
+ Map<String, String> parameters = new HashMap<String, String>();
- Map<String, String> paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_22");
- paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
- paraMap.put(JsonParameters.PARTITIONS, "144");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
+ parameters.put(JsonParameters.RESOURCE_GROUP_NAME, resourceName);
+ parameters.put(JsonParameters.STATE_MODEL_DEF_REF, stateModelDef);
+ parameters.put(JsonParameters.PARTITIONS, "" + partition);
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
- String response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
- Assert.assertTrue(response.contains("db_22"));
+ return parameters;
+ }
+
+ private void addResource(String clusterName, String resourceName, int partitions)
+ throws IOException {
+ final String reourcesUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
+ String response =
+ assertSuccessPostOperation(reourcesUrl,
+ addResourceCmd(resourceName, "MasterSlave", partitions), false);
+ Assert.assertTrue(response.contains(resourceName));
+ }
- paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_11");
- paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
- paraMap.put(JsonParameters.PARTITIONS, "44");
+ @Test
+ public void testAddResource() throws Exception {
+ final String clusterName = "clusterTestAddResource";
+ addCluster(clusterName);
+
+ String reourcesUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
+ String response =
+ assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_22", "MasterSlave", 144), false);
+ Assert.assertTrue(response.contains("db_22"));
- response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+ response =
+ assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_11", "MasterSlave", 44), false);
Assert.assertTrue(response.contains("db_11"));
// Add duplicate resource
- paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_22");
- paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "OnlineOffline");
- paraMap.put(JsonParameters.PARTITIONS, "55");
-
- response = assertSuccessPostOperation(reourcesUrl, paraMap, true);
+ response =
+ assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_22", "OnlineOffline", 55), true);
// drop resource now
- String resourceUrl = getResourceUrl("clusterTest1", "db_11");
+ String resourceUrl = getResourceUrl(clusterName, "db_11");
deleteUrl(resourceUrl, false);
- Assert.assertFalse(_gZkClient.exists("/clusterTest1/IDEALSTATES/db_11"));
+ Assert.assertFalse(_gZkClient.exists("/" + clusterName + "/IDEALSTATES/db_11"));
- paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_11");
- paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
- paraMap.put(JsonParameters.PARTITIONS, "44");
- response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+ response =
+ assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_11", "MasterSlave", 44), false);
Assert.assertTrue(response.contains("db_11"));
- Assert.assertTrue(_gZkClient.exists("/clusterTest1/IDEALSTATES/db_11"));
+ Assert.assertTrue(_gZkClient.exists("/" + clusterName + "/IDEALSTATES/db_11"));
- paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_33");
- response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+ response =
+ assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_33", "MasterSlave", 44), false);
Assert.assertTrue(response.contains("db_33"));
- paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_44");
- response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+ response =
+ assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_44", "MasterSlave", 44), false);
Assert.assertTrue(response.contains("db_44"));
}
- private void testDeactivateCluster() throws Exception, InterruptedException {
- HelixDataAccessor accessor;
- String path;
- // deactivate cluster
- String clusterUrl = getClusterUrl("clusterTest1");
- Map<String, String> paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.ENABLED, "false");
- paraMap.put(JsonParameters.GRAND_CLUSTER, "Klazt3rz");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
+ private Map<String, String> activateClusterCmd(String grandClusterName, boolean enabled) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.GRAND_CLUSTER, grandClusterName);
+ parameters.put(JsonParameters.ENABLED, "" + enabled);
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
- String response = assertSuccessPostOperation(clusterUrl, paraMap, false);
+ return parameters;
+ }
+
+ @Test
+ public void testDeactivateCluster() throws Exception {
+ final String clusterName = "clusterTestDeactivateCluster";
+ final String controllerClusterName = "controllerClusterTestDeactivateCluster";
+
+ Map<String, MockParticipantManager> participants =
+ new HashMap<String, MockParticipantManager>();
+ Map<String, ClusterDistributedController> distControllers =
+ new HashMap<String, ClusterDistributedController>();
+
+ // setup cluster
+ addCluster(clusterName);
+ addInstancesToCluster(clusterName, "localhost:123", 6, null);
+ addResource(clusterName, "db_11", 16);
+ rebalanceResource(clusterName, "db_11");
+
+ addCluster(controllerClusterName);
+ addInstancesToCluster(controllerClusterName, "controller_900", 2, null);
+
+ // start mock nodes
+ for (int i = 0; i < 6; i++) {
+ String instanceName = "localhost_123" + i;
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participant.syncStart();
+ participants.put(instanceName, participant);
+ }
+
+ // start controller nodes
+ for (int i = 0; i < 2; i++) {
+ String controllerName = "controller_900" + i;
+ ClusterDistributedController distController =
+ new ClusterDistributedController(ZK_ADDR, controllerClusterName, controllerName);
+ distController.syncStart();
+ distControllers.put(controllerName, distController);
+ }
+
+ String clusterUrl = getClusterUrl(clusterName);
+
+ // activate cluster
+ assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, true), false);
+ boolean verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ controllerClusterName));
+ Assert.assertTrue(verifyResult);
+
+ verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(verifyResult);
+
+ // deactivate cluster
+ assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, false), false);
Thread.sleep(6000);
- Assert.assertFalse(_gZkClient.exists("/Klazt3rz/IDEALSTATES/clusterTest1"));
+ Assert.assertFalse(_gZkClient.exists("/" + controllerClusterName + "/IDEALSTATES/"
+ + clusterName));
- accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
- path = accessor.keyBuilder().controllerLeader().getPath();
+ HelixDataAccessor accessor = participants.get("localhost_1231").getHelixDataAccessor();
+ String path = accessor.keyBuilder().controllerLeader().getPath();
Assert.assertFalse(_gZkClient.exists(path));
deleteUrl(clusterUrl, true);
+ Assert.assertTrue(_gZkClient.exists("/" + clusterName));
- Assert.assertTrue(_gZkClient.exists("/clusterTest1"));
// leader node should be gone
- for (StartCMResult result : _startCMResultMap.values()) {
- result._manager.disconnect();
- result._thread.interrupt();
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
}
deleteUrl(clusterUrl, false);
- Assert.assertFalse(_gZkClient.exists("/clusterTest1"));
+ Assert.assertFalse(_gZkClient.exists("/" + clusterName));
+
+ // clean up
+ for (ClusterDistributedController controller : distControllers.values()) {
+ controller.syncStop();
+ }
+
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
+ }
+ }
+
+ private Map<String, String> addIdealStateCmd() {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addIdealState);
+
+ return parameters;
}
- private void testDropAddResource() throws Exception {
- ZNRecord record = _gSetupTool._admin.getResourceIdealState("clusterTest1", "db_11").getRecord();
+ @Test
+ public void testDropAddResource() throws Exception {
+ final String clusterName = "clusterTestDropAddResource";
+
+ // setup cluster
+ addCluster(clusterName);
+ addResource(clusterName, "db_11", 22);
+ addInstancesToCluster(clusterName, "localhost_123", 6, null);
+ rebalanceResource(clusterName, "db_11");
+ ZNRecord record = _gSetupTool._admin.getResourceIdealState(clusterName, "db_11").getRecord();
String x = ObjectToJson(record);
FileWriter fos = new FileWriter("/tmp/temp.log");
@@ -379,217 +418,370 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
pw.write(x);
pw.close();
- String resourceUrl = getResourceUrl("clusterTest1", "db_11");
- deleteUrl(resourceUrl, false);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
+ controller.syncStart();
+ // start mock nodes
+ Map<String, MockParticipantManager> participants =
+ new HashMap<String, MockParticipantManager>();
+ for (int i = 0; i < 6; i++) {
+ String instanceName = "localhost_123" + i;
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participant.syncStart();
+ participants.put(instanceName, participant);
+ }
boolean verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- "clusterTest1"));
+ clusterName));
Assert.assertTrue(verifyResult);
- Map<String, String> paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
- paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_11");
- paraMap.put(JsonParameters.PARTITIONS, "22");
- paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
- String response =
- assertSuccessPostOperation(getClusterUrl("clusterTest1") + "/resourceGroups", paraMap,
- false);
- String idealStateUrl = getResourceUrl("clusterTest1", "db_11") + "/idealState";
- Assert.assertTrue(response.contains("db_11"));
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addIdealState);
+ String resourceUrl = getResourceUrl(clusterName, "db_11");
+ deleteUrl(resourceUrl, false);
+
+ verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(verifyResult);
+ addResource(clusterName, "db_11", 22);
+
+ String idealStateUrl = getResourceUrl(clusterName, "db_11") + "/idealState";
Map<String, String> extraform = new HashMap<String, String>();
extraform.put(JsonParameters.NEW_IDEAL_STATE, x);
- response = assertSuccessPostOperation(idealStateUrl, paraMap, extraform, false);
+ assertSuccessPostOperation(idealStateUrl, addIdealStateCmd(), extraform, false);
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- "clusterTest1"));
+ clusterName));
Assert.assertTrue(verifyResult);
- ZNRecord record2 =
- _gSetupTool._admin.getResourceIdealState("clusterTest1", "db_11").getRecord();
+ ZNRecord record2 = _gSetupTool._admin.getResourceIdealState(clusterName, "db_11").getRecord();
Assert.assertTrue(record2.equals(record));
+
+ // clean up
+ controller.syncStop();
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
+ }
}
- private void testExpandCluster() throws Exception {
- boolean verifyResult;
+ private Map<String, String> addInstanceCmd(String instances) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.INSTANCE_NAMES, instances);
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
- String clusterUrl = getClusterUrl("clusterTest1");
- String instancesUrl = clusterUrl + "/instances";
+ return parameters;
+ }
+
+ private Map<String, String> expandClusterCmd() {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.expandCluster);
+
+ return parameters;
+ }
+
+ @Test
+ public void testExpandCluster() throws Exception {
- Map<String, String> paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.INSTANCE_NAMES,
- "localhost:12331;localhost:12341;localhost:12351;localhost:12361");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
+ final String clusterName = "clusterTestExpandCluster";
- String response = assertSuccessPostOperation(instancesUrl, paraMap, false);
- String[] hosts = "localhost:12331;localhost:12341;localhost:12351;localhost:12361".split(";");
+ // setup cluster
+ addCluster(clusterName);
+ addInstancesToCluster(clusterName, "localhost:123", 6, null);
+ addResource(clusterName, "db_11", 22);
+ rebalanceResource(clusterName, "db_11");
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
+ controller.syncStart();
+
+ // start mock nodes
+ Map<String, MockParticipantManager> participants =
+ new HashMap<String, MockParticipantManager>();
+ for (int i = 0; i < 6; i++) {
+ String instanceName = "localhost_123" + i;
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participant.syncStart();
+ participants.put(instanceName, participant);
+ }
+
+ boolean verifyResult =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(verifyResult);
+
+ String clusterUrl = getClusterUrl(clusterName);
+ String instancesUrl = clusterUrl + "/instances";
+
+ String instances = "localhost:12331;localhost:12341;localhost:12351;localhost:12361";
+ String response = assertSuccessPostOperation(instancesUrl, addInstanceCmd(instances), false);
+ String[] hosts = instances.split(";");
for (String host : hosts) {
Assert.assertTrue(response.contains(host.replace(':', '_')));
}
- paraMap.clear();
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.expandCluster);
- response = assertSuccessPostOperation(clusterUrl, paraMap, false);
+
+ response = assertSuccessPostOperation(clusterUrl, expandClusterCmd(), false);
for (int i = 3; i <= 6; i++) {
- StartCMResult result =
- TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_123" + i + "1");
- _startCMResultMap.put("localhost_123" + i + "1", result);
+ String instanceName = "localhost_123" + i + "1";
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participant.syncStart();
+ participants.put(instanceName, participant);
}
verifyResult =
- ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
- "clusterTest1"));
+ ClusterStateVerifier
+ .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, clusterName));
Assert.assertTrue(verifyResult);
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- "clusterTest1"));
+ clusterName));
Assert.assertTrue(verifyResult);
+
+ // clean up
+ controller.syncStop();
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
+ }
}
- private void testEnablePartitions() throws IOException, InterruptedException {
- HelixDataAccessor accessor;
- accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
+ private Map<String, String> enablePartitionCmd(String resourceName, String partitions,
+ boolean enabled) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enablePartition);
+ parameters.put(JsonParameters.ENABLED, "" + enabled);
+ parameters.put(JsonParameters.PARTITION, partitions);
+ parameters.put(JsonParameters.RESOURCE, resourceName);
+
+ return parameters;
+ }
+
+ @Test
+ public void testEnablePartitions() throws IOException, InterruptedException {
+ final String clusterName = "clusterTestEnablePartitions";
+
+ // setup cluster
+ addCluster(clusterName);
+ addInstancesToCluster(clusterName, "localhost:123", 6, null);
+ addResource(clusterName, "db_11", 22);
+ rebalanceResource(clusterName, "db_11");
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
+ controller.syncStart();
+
+ // start mock nodes
+ Map<String, MockParticipantManager> participants =
+ new HashMap<String, MockParticipantManager>();
+ for (int i = 0; i < 6; i++) {
+ String instanceName = "localhost_123" + i;
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participant.syncStart();
+ participants.put(instanceName, participant);
+ }
+
+ HelixDataAccessor accessor = participants.get("localhost_1231").getHelixDataAccessor();
// drop node should fail as not disabled
String hostName = "localhost_1231";
- String instanceUrl = getInstanceUrl("clusterTest1", hostName);
+ String instanceUrl = getInstanceUrl(clusterName, hostName);
ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView("db_11"));
- Map<String, String> paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enablePartition);
- paraMap.put(JsonParameters.ENABLED, "false");
- paraMap.put(JsonParameters.PARTITION, "db_11_0;db_11_15");
- paraMap.put(JsonParameters.RESOURCE, "db_11");
-
- String response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+ String response =
+ assertSuccessPostOperation(instanceUrl,
+ enablePartitionCmd("db_11", "db_11_0;db_11_11", false), false);
Assert.assertTrue(response.contains("DISABLED_PARTITION"));
Assert.assertTrue(response.contains("db_11_0"));
- Assert.assertTrue(response.contains("db_11_15"));
+ Assert.assertTrue(response.contains("db_11_11"));
boolean verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- "clusterTest1"));
+ clusterName));
Assert.assertTrue(verifyResult);
ev = accessor.getProperty(accessor.keyBuilder().externalView("db_11"));
Assert.assertEquals(ev.getStateMap("db_11_0").get(hostName), "OFFLINE");
- Assert.assertEquals(ev.getStateMap("db_11_15").get(hostName), "OFFLINE");
+ Assert.assertEquals(ev.getStateMap("db_11_11").get(hostName), "OFFLINE");
- paraMap.put(JsonParameters.ENABLED, "true");
- response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+ response =
+ assertSuccessPostOperation(instanceUrl,
+ enablePartitionCmd("db_11", "db_11_0;db_11_11", true), false);
Assert.assertFalse(response.contains("db_11_0"));
- Assert.assertFalse(response.contains("db_11_15"));
+ Assert.assertFalse(response.contains("db_11_11"));
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- "clusterTest1"));
+ clusterName));
Assert.assertTrue(verifyResult);
ev = accessor.getProperty(accessor.keyBuilder().externalView("db_11"));
Assert.assertEquals(ev.getStateMap("db_11_0").get(hostName), "MASTER");
- Assert.assertEquals(ev.getStateMap("db_11_15").get(hostName), "SLAVE");
+ Assert.assertEquals(ev.getStateMap("db_11_11").get(hostName), "SLAVE");
+
+ // clean up
+ controller.syncStop();
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
+ }
+ }
+
+ private Map<String, String> enableInstanceCmd(boolean enabled) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableInstance);
+ parameters.put(JsonParameters.ENABLED, "" + enabled);
+ return parameters;
}
- private void testInstanceOperations() throws Exception {
+ private Map<String, String> swapInstanceCmd(String oldInstance, String newInstance) {
+ Map<String, String> parameters = new HashMap<String, String>();
+
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.swapInstance);
+ parameters.put(JsonParameters.OLD_INSTANCE, oldInstance);
+ parameters.put(JsonParameters.NEW_INSTANCE, newInstance);
+
+ return parameters;
+ }
+
+ @Test
+ public void testInstanceOperations() throws Exception {
+ final String clusterName = "clusterTestInstanceOperations";
+
+ // setup cluster
+ addCluster(clusterName);
+ addInstancesToCluster(clusterName, "localhost:123", 6, null);
+ addResource(clusterName, "db_11", 8);
+ rebalanceResource(clusterName, "db_11");
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
+ controller.syncStart();
+
+ // start mock nodes
+ Map<String, MockParticipantManager> participants =
+ new HashMap<String, MockParticipantManager>();
+ for (int i = 0; i < 6; i++) {
+ String instanceName = "localhost_123" + i;
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participant.syncStart();
+ participants.put(instanceName, participant);
+ }
+
HelixDataAccessor accessor;
// drop node should fail as not disabled
- String instanceUrl = getInstanceUrl("clusterTest1", "localhost_1232");
+ String instanceUrl = getInstanceUrl(clusterName, "localhost_1232");
deleteUrl(instanceUrl, true);
// disabled node
- Map<String, String> paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableInstance);
- paraMap.put(JsonParameters.ENABLED, "false");
- String response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+ String response = assertSuccessPostOperation(instanceUrl, enableInstanceCmd(false), false);
Assert.assertTrue(response.contains("false"));
// Cannot drop / swap
deleteUrl(instanceUrl, true);
- String instancesUrl = getClusterUrl("clusterTest1") + "/instances";
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.swapInstance);
- paraMap.put(JsonParameters.OLD_INSTANCE, "localhost_1232");
- paraMap.put(JsonParameters.NEW_INSTANCE, "localhost_12320");
- response = assertSuccessPostOperation(instancesUrl, paraMap, true);
+ String instancesUrl = getClusterUrl(clusterName) + "/instances";
+ response =
+ assertSuccessPostOperation(instancesUrl,
+ swapInstanceCmd("localhost_1232", "localhost_12320"), true);
// disconnect the node
- _startCMResultMap.get("localhost_1232")._manager.disconnect();
- _startCMResultMap.get("localhost_1232")._thread.interrupt();
+ participants.get("localhost_1232").syncStop();
// add new node then swap instance
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
- paraMap.put(JsonParameters.INSTANCE_NAME, "localhost_12320");
- response = assertSuccessPostOperation(instancesUrl, paraMap, false);
+ response = assertSuccessPostOperation(instancesUrl, addInstanceCmd("localhost_12320"), false);
Assert.assertTrue(response.contains("localhost_12320"));
// swap instance. The instance get swapped out should not exist anymore
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.swapInstance);
- paraMap.put(JsonParameters.OLD_INSTANCE, "localhost_1232");
- paraMap.put(JsonParameters.NEW_INSTANCE, "localhost_12320");
- response = assertSuccessPostOperation(instancesUrl, paraMap, false);
+ response =
+ assertSuccessPostOperation(instancesUrl,
+ swapInstanceCmd("localhost_1232", "localhost_12320"), false);
Assert.assertTrue(response.contains("localhost_12320"));
Assert.assertFalse(response.contains("localhost_1232\""));
- accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
+ accessor = participants.get("localhost_1231").getHelixDataAccessor();
String path = accessor.keyBuilder().instanceConfig("localhost_1232").getPath();
Assert.assertFalse(_gZkClient.exists(path));
- _startCMResultMap.put("localhost_12320",
- TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_12320"));
+ MockParticipantManager newParticipant =
+ new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12320");
+ newParticipant.syncStart();
+ participants.put("localhost_12320", newParticipant);
+
+ boolean verifyResult =
+ ClusterStateVerifier
+ .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, clusterName));
+ Assert.assertTrue(verifyResult);
+
+ // clean up
+ controller.syncStop();
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
+ }
}
- private void testStartCluster() throws Exception, InterruptedException {
+ @Test
+ public void testStartCluster() throws Exception {
+ final String clusterName = "clusterTestStartCluster";
+ final String controllerClusterName = "controllerClusterTestStartCluster";
+
+ Map<String, MockParticipantManager> participants =
+ new HashMap<String, MockParticipantManager>();
+ Map<String, ClusterDistributedController> distControllers =
+ new HashMap<String, ClusterDistributedController>();
+
+ // setup cluster
+ addCluster(clusterName);
+ addInstancesToCluster(clusterName, "localhost:123", 6, null);
+ addResource(clusterName, "db_11", 8);
+ rebalanceResource(clusterName, "db_11");
+
+ addCluster(controllerClusterName);
+ addInstancesToCluster(controllerClusterName, "controller_900", 2, null);
+
// start mock nodes
for (int i = 0; i < 6; i++) {
- StartCMResult result =
- TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_123" + i);
- _startCMResultMap.put("localhost_123" + i, result);
+ String instanceName = "localhost_123" + i;
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participant.syncStart();
+ participants.put(instanceName, participant);
}
// start controller nodes
for (int i = 0; i < 2; i++) {
- StartCMResult result =
- TestHelper.startController("Klazt3rz", "controller_900" + i, ZK_ADDR,
- HelixControllerMain.DISTRIBUTED);
-
- _startCMResultMap.put("controller_900" + i, result);
+ String controllerName = "controller_900" + i;
+ ClusterDistributedController distController =
+ new ClusterDistributedController(ZK_ADDR, controllerClusterName, controllerName);
+ distController.syncStart();
+ distControllers.put(controllerName, distController);
}
Thread.sleep(100);
// activate clusters
// wrong grand clustername
-
- String clusterUrl = getClusterUrl("clusterTest1");
- Map<String, String> paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.ENABLED, "true");
- paraMap.put(JsonParameters.GRAND_CLUSTER, "Klazters");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
-
- String response = assertSuccessPostOperation(clusterUrl, paraMap, true);
+ String clusterUrl = getClusterUrl(clusterName);
+ assertSuccessPostOperation(clusterUrl, activateClusterCmd("nonExistCluster", true), true);
// wrong cluster name
- clusterUrl = getClusterUrl("clusterTest2");
- paraMap.put(JsonParameters.GRAND_CLUSTER, "Klazt3rz");
- response = assertSuccessPostOperation(clusterUrl, paraMap, true);
+ clusterUrl = getClusterUrl("nonExistCluster");
+ assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, true), true);
- paraMap.put(JsonParameters.ENABLED, "true");
- paraMap.put(JsonParameters.GRAND_CLUSTER, "Klazt3rz");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
- clusterUrl = getClusterUrl("clusterTest1");
- response = assertSuccessPostOperation(clusterUrl, paraMap, false);
+ clusterUrl = getClusterUrl(clusterName);
+ assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, true), false);
Thread.sleep(500);
deleteUrl(clusterUrl, true);
// verify leader node
- HelixDataAccessor accessor =
- _startCMResultMap.get("controller_9001")._manager.getHelixDataAccessor();
+ HelixDataAccessor accessor = distControllers.get("controller_9001").getHelixDataAccessor();
LiveInstance controllerLeader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
Assert.assertTrue(controllerLeader.getInstanceName().startsWith("controller_900"));
- accessor = _startCMResultMap.get("localhost_1232")._manager.getHelixDataAccessor();
+ accessor = participants.get("localhost_1232").getHelixDataAccessor();
LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
for (int i = 0; i < 5; i++) {
if (leader != null) {
@@ -601,81 +793,98 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
Assert.assertTrue(leader.getInstanceName().startsWith("controller_900"));
boolean verifyResult =
- ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
- "clusterTest1"));
+ ClusterStateVerifier
+ .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, clusterName));
Assert.assertTrue(verifyResult);
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- "clusterTest1"));
+ clusterName));
Assert.assertTrue(verifyResult);
+ Thread.sleep(1000);
+
+ // clean up
+ for (ClusterDistributedController controller : distControllers.values()) {
+ controller.syncStop();
+ }
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
+ }
+ }
+
+ private Map<String, String> rebalanceCmd(int replicas, String prefix, String tag) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.REPLICAS, "" + replicas);
+ if (prefix != null) {
+ parameters.put(JsonParameters.RESOURCE_KEY_PREFIX, prefix);
+ }
+ if (tag != null) {
+ parameters.put(ClusterSetup.instanceGroupTag, tag);
+ }
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
+
+ return parameters;
}
- private void testRebalanceResource() throws Exception {
- String resourceUrl = getResourceUrl("clusterTest1", "db_11");
- Map<String, String> paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.REPLICAS, "3");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
+ private void rebalanceResource(String clusterName, String resourceName) throws IOException {
+ String resourceUrl = getResourceUrl(clusterName, resourceName);
+ String idealStateUrl = resourceUrl + "/idealState";
+
+ assertSuccessPostOperation(idealStateUrl, rebalanceCmd(3, null, null), false);
+ }
+
+ @Test
+ public void testRebalanceResource() throws Exception {
+ // add a normal cluster
+ final String clusterName = "clusterTestRebalanceResource";
+ addCluster(clusterName);
- String ISUrl = resourceUrl + "/idealState";
- String response = assertSuccessPostOperation(ISUrl, paraMap, false);
+ addInstancesToCluster(clusterName, "localhost:123", 3, _tag1);
+ addResource(clusterName, "db_11", 44);
+
+ String resourceUrl = getResourceUrl(clusterName, "db_11");
+
+ String idealStateUrl = resourceUrl + "/idealState";
+ String response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(3, null, null), false);
ZNRecord record = JsonToObject(ZNRecord.class, response);
Assert.assertTrue(record.getId().equalsIgnoreCase("db_11"));
- Assert
- .assertTrue((((List<String>) (record.getListFields().values().toArray()[0]))).size() == 3);
- Assert.assertTrue((((Map<String, String>) (record.getMapFields().values().toArray()[0])))
- .size() == 3);
+ Assert.assertEquals(record.getListField("db_11_0").size(), 3);
+ Assert.assertEquals(record.getMapField("db_11_0").size(), 3);
deleteUrl(resourceUrl, false);
// re-add and rebalance
- String reourcesUrl = "http://localhost:" + ADMIN_PORT + "/clusters/clusterTest1/resourceGroups";
+ final String reourcesUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
+
response = getUrl(reourcesUrl);
Assert.assertFalse(response.contains("db_11"));
- paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_11");
- paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
- paraMap.put(JsonParameters.PARTITIONS, "48");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
-
- response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
- Assert.assertTrue(response.contains("db_11"));
-
- ISUrl = resourceUrl + "/idealState";
- paraMap.put(JsonParameters.REPLICAS, "3");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
- response = assertSuccessPostOperation(ISUrl, paraMap, false);
+ addResource(clusterName, "db_11", 48);
+ idealStateUrl = resourceUrl + "/idealState";
+ response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(3, null, null), false);
record = JsonToObject(ZNRecord.class, response);
Assert.assertTrue(record.getId().equalsIgnoreCase("db_11"));
- Assert
- .assertTrue((((List<String>) (record.getListFields().values().toArray()[0]))).size() == 3);
- Assert.assertTrue((((Map<String, String>) (record.getMapFields().values().toArray()[0])))
- .size() == 3);
+ Assert.assertEquals(record.getListField("db_11_0").size(), 3);
+ Assert.assertEquals(record.getMapField("db_11_0").size(), 3);
// rebalance with key prefix
- resourceUrl = getResourceUrl("clusterTest1", "db_22");
- ISUrl = resourceUrl + "/idealState";
- paraMap.put(JsonParameters.REPLICAS, "2");
- paraMap.put(JsonParameters.RESOURCE_KEY_PREFIX, "alias");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
- response = assertSuccessPostOperation(ISUrl, paraMap, false);
+ addResource(clusterName, "db_22", 55);
+ resourceUrl = getResourceUrl(clusterName, "db_22");
+ idealStateUrl = resourceUrl + "/idealState";
+ response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(2, "alias", null), false);
record = JsonToObject(ZNRecord.class, response);
Assert.assertTrue(record.getId().equalsIgnoreCase("db_22"));
- Assert
- .assertTrue((((List<String>) (record.getListFields().values().toArray()[0]))).size() == 2);
- Assert.assertTrue((((Map<String, String>) (record.getMapFields().values().toArray()[0])))
- .size() == 2);
+ Assert.assertEquals(record.getListField("alias_0").size(), 2);
+ Assert.assertEquals(record.getMapField("alias_0").size(), 2);
Assert.assertTrue((((String) (record.getMapFields().keySet().toArray()[0])))
.startsWith("alias_"));
Assert.assertFalse(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
- resourceUrl = getResourceUrl("clusterTest1", "db_33");
- ISUrl = resourceUrl + "/idealState";
- paraMap.put(JsonParameters.REPLICAS, "2");
- paraMap.remove(JsonParameters.RESOURCE_KEY_PREFIX);
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
- paraMap.put(ClusterSetup.instanceGroupTag, _tag1);
- response = assertSuccessPostOperation(ISUrl, paraMap, false);
+
+ addResource(clusterName, "db_33", 44);
+ resourceUrl = getResourceUrl(clusterName, "db_33");
+ idealStateUrl = resourceUrl + "/idealState";
+ response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(2, null, _tag1), false);
Assert.assertTrue(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
Assert.assertTrue(response.contains(_tag1));
@@ -688,14 +897,10 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
}
}
- resourceUrl = getResourceUrl("clusterTest1", "db_44");
- ISUrl = resourceUrl + "/idealState";
- paraMap.put(JsonParameters.REPLICAS, "2");
- paraMap.remove(JsonParameters.RESOURCE_KEY_PREFIX);
- paraMap.put(JsonParameters.RESOURCE_KEY_PREFIX, "alias");
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
- paraMap.put(ClusterSetup.instanceGroupTag, _tag1);
- response = assertSuccessPostOperation(ISUrl, paraMap, false);
+ addResource(clusterName, "db_44", 44);
+ resourceUrl = getResourceUrl(clusterName, "db_44");
+ idealStateUrl = resourceUrl + "/idealState";
+ response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(2, "alias", _tag1), false);
Assert.assertTrue(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
Assert.assertTrue(response.contains(_tag1));
@@ -713,24 +918,66 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
}
}
- private void testAddInstance() throws Exception {
- String clusterUrl = getClusterUrl("clusterTest1");
- Map<String, String> paraMap = new HashMap<String, String>();
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
- String response = null;
- // Add instances to cluster
+ private void addInstancesToCluster(String clusterName, String instanceNamePrefix, int n,
+ String tag) throws IOException {
+ Map<String, String> parameters = new HashMap<String, String>();
+ final String clusterUrl = getClusterUrl(clusterName);
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
+
+ // add instances to cluster
String instancesUrl = clusterUrl + "/instances";
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < n; i++) {
- paraMap.put(JsonParameters.INSTANCE_NAME, "localhost:123" + i);
- response = assertSuccessPostOperation(instancesUrl, paraMap, false);
- Assert.assertTrue(response.contains(("localhost:123" + i).replace(':', '_')));
+ parameters.put(JsonParameters.INSTANCE_NAME, instanceNamePrefix + i);
+ String response = assertSuccessPostOperation(instancesUrl, parameters, false);
+ Assert.assertTrue(response.contains((instanceNamePrefix + i).replace(':', '_')));
}
- paraMap.remove(JsonParameters.INSTANCE_NAME);
- paraMap.put(JsonParameters.INSTANCE_NAMES,
- "localhost:1233;localhost:1234;localhost:1235;localhost:1236");
- response = assertSuccessPostOperation(instancesUrl, paraMap, false);
+ // add tag to instance
+ if (tag != null && !tag.isEmpty()) {
+ parameters.clear();
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
+ parameters.put(ClusterSetup.instanceGroupTag, tag);
+ for (int i = 0; i < n; i++) {
+ String instanceUrl = instancesUrl + "/" + (instanceNamePrefix + i).replace(':', '_');
+ String response = assertSuccessPostOperation(instanceUrl, parameters, false);
+ Assert.assertTrue(response.contains(_tag1));
+ }
+ }
+
+ }
+
+ private Map<String, String> addInstanceTagCmd(String tag) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
+ parameters.put(ClusterSetup.instanceGroupTag, tag);
+
+ return parameters;
+ }
+
+ private Map<String, String> removeInstanceTagCmd(String tag) {
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.removeInstanceTag);
+ parameters.put(ClusterSetup.instanceGroupTag, tag);
+
+ return parameters;
+ }
+
+ @Test
+ public void testAddInstance() throws Exception {
+ final String clusterName = "clusterTestAddInstance";
+
+ // add normal cluster
+ addCluster(clusterName);
+
+ String clusterUrl = getClusterUrl(clusterName);
+
+ // Add instances to cluster
+ String instancesUrl = clusterUrl + "/instances";
+ addInstancesToCluster(clusterName, "localhost:123", 3, null);
+
+ String instances = "localhost:1233;localhost:1234;localhost:1235;localhost:1236";
+ String response = assertSuccessPostOperation(instancesUrl, addInstanceCmd(instances), false);
for (int i = 3; i <= 6; i++) {
Assert.assertTrue(response.contains("localhost_123" + i));
}
@@ -749,42 +996,34 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
// disable node
instanceUrl = instancesUrl + "/localhost_1236";
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableInstance);
- paraMap.put(JsonParameters.ENABLED, "false");
- response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+ response = assertSuccessPostOperation(instanceUrl, enableInstanceCmd(false), false);
Assert.assertTrue(response.contains("false"));
deleteUrl(instanceUrl, false);
+ // add controller cluster
+ final String controllerClusterName = "controllerClusterTestAddInstance";
+ addCluster(controllerClusterName);
+
// add node to controller cluster
- paraMap.remove(JsonParameters.INSTANCE_NAME);
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
- paraMap.put(JsonParameters.INSTANCE_NAMES, "controller:9000;controller:9001");
- String controllerUrl = getClusterUrl("Klazt3rz") + "/instances";
- response = assertSuccessPostOperation(controllerUrl, paraMap, false);
+ String controllers = "controller:9000;controller:9001";
+ String controllerUrl = getClusterUrl(controllerClusterName) + "/instances";
+ response = assertSuccessPostOperation(controllerUrl, addInstanceCmd(controllers), false);
Assert.assertTrue(response.contains("controller_9000"));
Assert.assertTrue(response.contains("controller_9001"));
- // add a dup host
- paraMap.remove(JsonParameters.INSTANCE_NAMES);
- paraMap.put(JsonParameters.INSTANCE_NAME, "localhost:1234");
- response = assertSuccessPostOperation(instancesUrl, paraMap, true);
-
- // add tags
+ // add a duplicated host
+ response = assertSuccessPostOperation(instancesUrl, addInstanceCmd("localhost:1234"), true);
- paraMap.clear();
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
- paraMap.put(ClusterSetup.instanceGroupTag, _tag1);
+ // add/remove tags
for (int i = 0; i < 4; i++) {
instanceUrl = instancesUrl + "/localhost_123" + i;
- response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+ response = assertSuccessPostOperation(instanceUrl, addInstanceTagCmd(_tag1), false);
Assert.assertTrue(response.contains(_tag1));
-
}
- paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.removeInstanceTag);
+
instanceUrl = instancesUrl + "/localhost_1233";
- response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+ response = assertSuccessPostOperation(instanceUrl, removeInstanceTagCmd(_tag1), false);
Assert.assertFalse(response.contains(_tag1));
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java
index 9534cf5..fd12080 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java
@@ -25,8 +25,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -53,12 +53,8 @@ public class TestResetInstance extends AdminTestBase {
3, // replicas
"MasterSlave", true); // do rebalance
- // // start admin thread
- // AdminThread adminThread = new AdminThread(ZK_ADDR, _port);
- // adminThread.start();
-
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -69,16 +65,16 @@ public class TestResetInstance extends AdminTestBase {
};
// start mock participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -111,9 +107,6 @@ public class TestResetInstance extends AdminTestBase {
Assert.assertTrue(result, "Cluster verification fails");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- // adminThread.stop();
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
index 9b07445..82a2607 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
@@ -30,10 +30,10 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
@@ -95,12 +95,8 @@ public class TestResetPartitionState extends AdminTestBase {
3, // replicas
"MasterSlave", true); // do rebalance
- // start admin thread
- // AdminThread adminThread = new AdminThread(ZK_ADDR, _port);
- // adminThread.start();
-
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
@@ -108,16 +104,16 @@ public class TestResetPartitionState extends AdminTestBase {
errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -171,9 +167,6 @@ public class TestResetPartitionState extends AdminTestBase {
Assert.assertEquals(_errToOfflineInvoked.get(), 2, "reset() should be invoked 2 times");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- // adminThread.stop();
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
@@ -185,8 +178,7 @@ public class TestResetPartitionState extends AdminTestBase {
private void clearStatusUpdate(String clusterName, String instance, String resource,
String partition) {
// clear status update for error partition so verify() will not fail on
- // old
- // errors
+ // old errors
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java
index 96f4f6c..db9e9bb 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java
@@ -25,8 +25,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -53,12 +53,8 @@ public class TestResetResource extends AdminTestBase {
3, // replicas
"MasterSlave", true); // do rebalance
- // start admin thread
- // AdminThread adminThread = new AdminThread(ZK_ADDR, _port);
- // adminThread.start();
-
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -69,16 +65,16 @@ public class TestResetResource extends AdminTestBase {
};
// start mock participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -112,15 +108,11 @@ public class TestResetResource extends AdminTestBase {
Assert.assertTrue(result, "Cluster verification fails");
// clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- // adminThread.stop();
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java b/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
index 083cbd4..27b4d36 100644
--- a/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
+++ b/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
@@ -21,27 +21,30 @@ package org.apache.helix.agent;
import java.io.File;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ExternalCommand;
import org.apache.helix.ScriptTestHelper;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestHelixAgent extends ZkUnitTestBase {
+ private final static Logger LOG = Logger.getLogger(TestHelixAgent.class);
+
final String workingDir = ScriptTestHelper.getPrefix() + ScriptTestHelper.INTEGRATION_SCRIPT_DIR;
ExternalCommand serverCmd = null;
@@ -94,13 +97,9 @@ public class TestHelixAgent extends ZkUnitTestBase {
"MasterSlave", true); // do rebalance
// set cluster config
- ZkClient client =
- new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
- new ZNRecordSerializer());
-
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
- ConfigAccessor configAccessor = new ConfigAccessor(client);
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
// String pidFile = ScriptTestHelper.getPrefix() + ScriptTestHelper.INTEGRATION_LOG_DIR +
// "/default/foo_{PARTITION_NAME}_pid.txt";
@@ -148,10 +147,11 @@ public class TestHelixAgent extends ZkUnitTestBase {
configAccessor.set(scope, cmdConfig.toKeyValueMap());
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", zkAddr);
+ ClusterControllerManager controller = new ClusterControllerManager(zkAddr, clusterName, "controller_0");
controller.syncStart();
// start helix-agent
+ Map<String, Thread> agents = new HashMap<String, Thread>();
for (int i = 0; i < n; i++) {
final String instanceName = "localhost_" + (12918 + i);
Thread agentThread = new Thread() {
@@ -163,11 +163,11 @@ public class TestHelixAgent extends ZkUnitTestBase {
"--stateModel", "MasterSlave"
});
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Exception start helix-agent", e);
}
}
};
+ agents.put(instanceName, agentThread);
agentThread.start();
// wait participant thread to start
@@ -197,6 +197,11 @@ public class TestHelixAgent extends ZkUnitTestBase {
clusterName));
Assert.assertTrue(result);
+ // clean up
+ controller.syncStop();
+ for (Thread agentThread : agents.values()) {
+ agentThread.interrupt();
+ }
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 24f8f1e..54e9943 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -102,6 +102,7 @@ public interface HelixManager {
* @param listener
* @deprecated replaced by addInstanceConfigChangeListener()
*/
+ @Deprecated
void addConfigChangeListener(ConfigChangeListener listener) throws Exception;
/**
@@ -155,6 +156,12 @@ public interface HelixManager {
void addControllerListener(ControllerChangeListener listener);
/**
+ * Add message listener for controller
+ * @param listener
+ */
+ void addControllerMessageListener(MessageListener listener);
+
+ /**
* Removes the listener. If the same listener was used for multiple changes,
* all change notifications will be removed.<br/>
* This will invoke onChange method on the listener with
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index 9c0c25e..00189b6 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -28,11 +28,14 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.log4j.Logger;
/**
* A wrapper class for ZNRecord. Used as a base class for IdealState, CurrentState, etc.
*/
public class HelixProperty {
+ private static Logger LOG = Logger.getLogger(HelixProperty.class);
+
public enum HelixPropertyAttribute {
BUCKET_SIZE,
BATCH_MESSAGE_MODE
@@ -138,8 +141,7 @@ public class HelixProperty {
});
return constructor.newInstance(record);
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Exception convert znrecord: " + record + " to class: " + clazz, e);
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index c5a7f22..a0ad6f3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -9,6 +9,7 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
@@ -59,6 +60,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
LOG.debug("Processing resource:" + config.getResourceId());
}
ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+
for (PartitionId partition : config.getPartitionSet()) {
Map<ParticipantId, State> currentStateMap =
currentState.getCurrentStateMap(config.getResourceId(), partition);
@@ -71,6 +73,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
Map<State, String> upperBounds =
ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
cluster.getConfig());
+
Map<ParticipantId, State> bestStateForPartition =
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
.getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index a0ee1c1..84129de 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.util;
*/
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -110,38 +111,56 @@ public class ConstraintBasedAssignment {
}
/**
- * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
- * @param upperBounds map of state to upper bound
- * @param liveParticipantSet set of live participant ids
- * @param stateModelDef
- * @param participantPreferenceList
- * @param currentStateMap
- * : participant->state for each partition
- * @param disabledParticipantsForPartition
- * @return
+ * Get a mapping for a partition for the current state participants who have been dropped or
+ * disabled for a given partition.
+ * @param currentStateMap current map of participant id to state for a partition
+ * @param participants participants selected to serve the partition
+ * @param disabledParticipants participants that have been disabled for this partition
+ * @param initialState the initial state of the resource state model
+ * @return map of participant id to state of dropped and disabled partitions
*/
- public static Map<ParticipantId, State> computeAutoBestStateForPartition(
- Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
- StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
- Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+ public static Map<ParticipantId, State> dropAndDisablePartitions(
+ Map<ParticipantId, State> currentStateMap, Collection<ParticipantId> participants,
+ Set<ParticipantId> disabledParticipants, State initialState) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
// if the resource is deleted, instancePreferenceList will be empty and
// we should drop all resources.
if (currentStateMap != null) {
for (ParticipantId participantId : currentStateMap.keySet()) {
- if ((participantPreferenceList == null || !participantPreferenceList
- .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
+ if ((participants == null || !participants.contains(participantId))
+ && !disabledParticipants.contains(participantId)) {
// if dropped and not disabled, transit to DROPPED
participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
} else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
participantId).equals(State.from(HelixDefinedState.ERROR)))
- && disabledParticipantsForPartition.contains(participantId)) {
+ && disabledParticipants.contains(participantId)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
+ participantStateMap.put(participantId, initialState);
}
}
}
+ return participantStateMap;
+ }
+
+ /**
+ * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
+ * @param upperBounds map of state to upper bound
+ * @param liveParticipantSet set of live participant ids
+ * @param stateModelDef
+ * @param participantPreferenceList
+ * @param currentStateMap
+ * : participant->state for each partition
+ * @param disabledParticipantsForPartition
+ * @return
+ */
+ public static Map<ParticipantId, State> computeAutoBestStateForPartition(
+ Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
+ StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
+ Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+ // drop and disable participants if necessary
+ Map<ParticipantId, State> participantStateMap =
+ dropAndDisablePartitions(currentStateMap, participantPreferenceList,
+ disabledParticipantsForPartition, stateModelDef.getTypedInitialState());
// resource is deleted
if (participantPreferenceList == null) {