You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/08/04 22:19:19 UTC
[2/7] [HELIX-376] Remove HelixConnection/HelixManager duplicate code
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
index 04c0352..495d37c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
@@ -25,11 +25,9 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.testutil.ZkTestBase;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -54,9 +52,9 @@ public class TestStartMultipleControllersWithSameName extends ZkTestBase {
// rebalance
// start controller
- ClusterControllerManager[] controllers = new ClusterControllerManager[4];
+ MockController[] controllers = new MockController[4];
for (int i = 0; i < 4; i++) {
- controllers[i] = new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+ controllers[i] = new MockController(_zkaddr, clusterName, "controller_0");
controllers[i].syncStart();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index e5ff171..c4304b0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -32,8 +32,8 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
import org.apache.helix.mock.participant.MockMSStateModel;
import org.apache.helix.mock.participant.MockTransition;
@@ -172,7 +172,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
@Test
public void testStateTransitionTimeOut() throws Exception {
Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
- // MockParticipantManager[] participants = new MockParticipantManager[NODE_NR];
+ // MockParticipant[] participants = new MockParticipant[NODE_NR];
IdealState idealState =
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
for (int i = 0; i < NODE_NR; i++) {
@@ -185,12 +185,12 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
}
}
- _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+ _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
_participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory);
_participants[i].syncStart();
}
String controllerName = "controller_0";
- _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+ _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
_controller.syncStart();
boolean result =
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
index 283055c..1927d72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -22,8 +22,7 @@ package org.apache.helix.integration;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
@@ -47,7 +46,6 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
idealStateOld2.merge(is2.getRecord());
String instanceName = "localhost_" + (START_PORT + 0);
- ZKHelixAdmin tool = new ZKHelixAdmin(_zkclient);
_setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
boolean result =
@@ -77,8 +75,8 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
exception = true;
}
Assert.assertFalse(exception);
- MockParticipantManager newParticipant =
- new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName2);
+ MockParticipant newParticipant =
+ new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName2);
newParticipant.syncStart();
result =
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 111c4d2..9e8fd85 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -31,11 +31,8 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.manager.ZkTestManager;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.model.CurrentState;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -63,16 +60,16 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- final ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+ final MockController controller =
+ new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
// start participants
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
@@ -81,7 +78,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
clusterName));
Assert.assertTrue(result);
- final MockParticipantManager participantManagerToExpire = participants[1];
+ final MockParticipant participantManagerToExpire = participants[1];
// check controller zk-watchers
result = TestHelper.verify(new TestHelper.Verifier() {
@@ -116,12 +113,12 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
Assert.assertTrue(result, "Participant should have 1 zk-watcher. MESSAGES->HelixTaskExecutor");
// check HelixManager#_handlers
- // printHandlers(controllerManager);
- // printHandlers(participantManagerToExpire);
+ TestHelper.printHandlers(controller, controller.getHandlers());
+ TestHelper.printHandlers(participantManagerToExpire, participantManagerToExpire.getHandlers());
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManagerToExpire.getHandlers().size();
- Assert.assertEquals(controllerHandlerNb, 9,
- "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
+ Assert.assertEquals(controllerHandlerNb, 10,
+ "HelixController should have 10 (6+2n) callback handlers for 2 (n) participant");
Assert.assertEquals(particHandlerNb, 1,
"HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
@@ -208,16 +205,16 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- final ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+ final MockController controller =
+ new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
// start participants
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
@@ -228,7 +225,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
Assert.assertTrue(result);
// final ZkHelixTestManager controllerManager = controller.getManager();
// final ZkHelixTestManager participantManager = participants[0].getManager();
- final MockParticipantManager participantManager = participants[0];
+ final MockParticipant participantManager = participants[0];
// wait until we get all the listeners registered
result = TestHelper.verify(new TestHelper.Verifier() {
@@ -237,7 +234,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
public boolean verify() throws Exception {
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManager.getHandlers().size();
- if (controllerHandlerNb == 9 && particHandlerNb == 2)
+ if (controllerHandlerNb == 10 && particHandlerNb == 2)
return true;
else
return false;
@@ -246,12 +243,14 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManager.getHandlers().size();
- Assert.assertEquals(controllerHandlerNb, 9,
- "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
- + controllerHandlerNb + ", " + printHandlers(controller));
+ TestHelper.printHandlers(controller, controller.getHandlers());
+ Assert.assertEquals(controllerHandlerNb, 10,
+ "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was "
+ + controllerHandlerNb);
+ TestHelper.printHandlers(participantManager, participantManager.getHandlers());
Assert.assertEquals(particHandlerNb, 1,
"HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
- + particHandlerNb + ", " + printHandlers(participantManager));
+ + particHandlerNb);
// expire controller
System.out.println("Expiring controller session...");
@@ -301,13 +300,13 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
// check HelixManager#_handlers
// printHandlers(controllerManager);
int handlerNb = controller.getHandlers().size();
+ TestHelper.printHandlers(controller, controller.getHandlers());
Assert.assertEquals(handlerNb, controllerHandlerNb,
- "controller callback handlers should not increase after participant session expiry, but was "
- + printHandlers(controller));
+ "controller callback handlers should not increase after participant session expiry");
handlerNb = participantManager.getHandlers().size();
+ TestHelper.printHandlers(participantManager, participantManager.getHandlers());
Assert.assertEquals(handlerNb, particHandlerNb,
- "participant callback handlers should not increase after participant session expiry, but was "
- + printHandlers(participantManager));
+ "participant callback handlers should not increase after participant session expiry");
// clean up
controller.syncStop();
@@ -333,20 +332,19 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
2, // replicas
"MasterSlave", true);
- final ClusterControllerManager controller =
- new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+ final MockController controller = new MockController(zkAddr, clusterName, "controller_0");
controller.syncStart();
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(zkAddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(zkAddr, clusterName, instanceName);
participants[i].syncStart();
// register a controller listener on participant_0
if (i == 0) {
// ZkHelixTestManager manager = participants[0].getManager();
- MockParticipantManager manager = participants[0];
+ MockParticipant manager = participants[0];
manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
@Override
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -365,7 +363,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
clusterName));
Assert.assertTrue(result);
- MockParticipantManager participantToExpire = participants[0];
+ MockParticipant participantToExpire = participants[0];
String oldSessionId = participantToExpire.getSessionId();
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
@@ -446,8 +444,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertNull(childListeners.get(path),
- "Should have no child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
@@ -494,51 +491,4 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
-
- // debug code
- static String printHandlers(ZkTestManager manager) {
- StringBuilder sb = new StringBuilder();
- List<CallbackHandler> handlers = manager.getHandlers();
- sb.append(manager.getInstanceName() + " has " + handlers.size() + " cb-handlers. [");
-
- for (int i = 0; i < handlers.size(); i++) {
- CallbackHandler handler = handlers.get(i);
- String path = handler.getPath();
- sb.append(path.substring(manager.getClusterName().length() + 1) + ": "
- + handler.getListener());
- if (i < (handlers.size() - 1)) {
- sb.append(", ");
- }
- }
- sb.append("]");
-
- return sb.toString();
- }
-
- void printZkListeners(ZkClient client) throws Exception {
- Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
- Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
-
- System.out.println("dataListeners {");
- for (String path : datalisteners.keySet()) {
- System.out.println("\t" + path + ": ");
- Set<IZkDataListener> set = datalisteners.get(path);
- for (IZkDataListener listener : set) {
- CallbackHandler handler = (CallbackHandler) listener;
- System.out.println("\t\t" + handler.getListener());
- }
- }
- System.out.println("}");
-
- System.out.println("childListeners {");
- for (String path : childListeners.keySet()) {
- System.out.println("\t" + path + ": ");
- Set<IZkChildListener> set = childListeners.get(path);
- for (IZkChildListener listener : set) {
- CallbackHandler handler = (CallbackHandler) listener;
- System.out.println("\t\t" + handler.getListener());
- }
- }
- System.out.println("}");
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
index 8b4e889..d1ae897 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
@@ -30,8 +30,8 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
@@ -110,17 +110,17 @@ public class TestZkSessionExpiry extends ZkTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
// start participants
Set<String> handledMsgSet = new HashSet<String>();
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].getMessagingService().registerMessageHandlerFactory(DUMMY_MSG_TYPE,
new DummyMessageHandlerFactory(handledMsgSet));
participants[i].syncStart();
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index 8eaf2e7..3098b5c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -21,8 +21,8 @@ package org.apache.helix.integration;
import java.util.Date;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -49,8 +49,8 @@ public class ZkStandAloneCMTestBase extends ZkTestBase {
protected final String CLASS_NAME = this.getClass().getSimpleName();
protected final String CLUSTER_NAME = CLASS_NAME;
- protected MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
- protected ClusterControllerManager _controller;
+ protected MockParticipant[] _participants = new MockParticipant[NODE_NR];
+ protected MockController _controller;
int _replica = 3;
@@ -76,13 +76,13 @@ public class ZkStandAloneCMTestBase extends ZkTestBase {
// start dummy participants
for (int i = 0; i < NODE_NR; i++) {
String instanceName = "localhost_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+ _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
_participants[i].syncStart();
}
// start controller
String controllerName = "controller_0";
- _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+ _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
_controller.syncStart();
boolean result =
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
deleted file mode 100644
index b8f0f2b..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.log4j.Logger;
-
-public class ClusterControllerManager extends ZKHelixManager implements Runnable, ZkTestManager {
- private static Logger LOG = Logger.getLogger(ClusterControllerManager.class);
-
- private final CountDownLatch _startCountDown = new CountDownLatch(1);
- private final CountDownLatch _stopCountDown = new CountDownLatch(1);
- private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
- public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) {
- super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
- }
-
- public void syncStop() {
- _stopCountDown.countDown();
- try {
- _waitStopFinishCountDown.await();
- } catch (InterruptedException e) {
- LOG.error("Interrupted waiting for finish", e);
- }
- }
-
- public void syncStart() {
- // TODO: prevent start multiple times
- new Thread(this).start();
- try {
- _startCountDown.await();
- } catch (InterruptedException e) {
- LOG.error("Interrupted waiting for start", e);
- }
- }
-
- @Override
- public void run() {
- try {
- connect();
- _startCountDown.countDown();
- _stopCountDown.await();
- } catch (Exception e) {
- LOG.error("exception running controller-manager", e);
- } finally {
- _startCountDown.countDown();
- disconnect();
- _waitStopFinishCountDown.countDown();
- }
- }
-
- @Override
- public ZkClient getZkClient() {
- return _zkclient;
- }
-
- @Override
- public List<CallbackHandler> getHandlers() {
- return _handlers;
- }
-
- public List<HelixTimerTask> getControllerTimerTasks() {
- return _controllerTimerTasks;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
deleted file mode 100644
index a17ccc1..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.participant.DistClusterControllerStateModelFactory;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class ClusterDistributedController extends ZKHelixManager implements Runnable, ZkTestManager {
- private static Logger LOG = Logger.getLogger(ClusterDistributedController.class);
-
- private final CountDownLatch _startCountDown = new CountDownLatch(1);
- private final CountDownLatch _stopCountDown = new CountDownLatch(1);
- private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
- public ClusterDistributedController(String zkAddr, String clusterName, String controllerName) {
- super(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
- }
-
- public void syncStop() {
- _stopCountDown.countDown();
- try {
- _waitStopFinishCountDown.await();
- } catch (InterruptedException e) {
- LOG.error("Interrupted waiting for finish", e);
- }
- }
-
- public void syncStart() {
- // TODO: prevent start multiple times
- new Thread(this).start();
- try {
- _startCountDown.await();
- } catch (InterruptedException e) {
- LOG.error("Interrupted waiting for start", e);
- }
- }
-
- @Override
- public void run() {
- try {
- StateMachineEngine stateMach = getStateMachineEngine();
- DistClusterControllerStateModelFactory lsModelFactory =
- new DistClusterControllerStateModelFactory(_zkAddress);
- stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
-
- connect();
- _startCountDown.countDown();
- _stopCountDown.await();
- } catch (Exception e) {
- LOG.error("exception running controller-manager", e);
- } finally {
- _startCountDown.countDown();
- disconnect();
- _waitStopFinishCountDown.countDown();
- }
- }
-
- @Override
- public ZkClient getZkClient() {
- return _zkclient;
- }
-
- @Override
- public List<CallbackHandler> getHandlers() {
- return _handlers;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
deleted file mode 100644
index 917be17..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
-import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.mock.participant.MockSchemataModelFactory;
-import org.apache.helix.mock.participant.MockTransition;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class MockParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager {
- private static Logger LOG = Logger.getLogger(MockParticipantManager.class);
-
- private final CountDownLatch _startCountDown = new CountDownLatch(1);
- private final CountDownLatch _stopCountDown = new CountDownLatch(1);
- private final CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
-
- private final MockMSModelFactory _msModelFactory = new MockMSModelFactory(null);
-
- public MockParticipantManager(String zkAddr, String clusterName, String instanceName) {
- super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
- }
-
- public void setTransition(MockTransition transition) {
- _msModelFactory.setTrasition(transition);
- }
-
- public void syncStop() {
- _stopCountDown.countDown();
- try {
- _waitStopCompleteCountDown.await();
- } catch (InterruptedException e) {
- LOG.error("exception in syncStop participant-manager", e);
- }
- }
-
- public void syncStart() {
- try {
- new Thread(this).start();
- _startCountDown.await();
- } catch (InterruptedException e) {
- LOG.error("exception in syncStart participant-manager", e);
- }
- }
-
- @Override
- public void run() {
- try {
- StateMachineEngine stateMach = getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
-
- DummyLeaderStandbyStateModelFactory lsModelFactory =
- new DummyLeaderStandbyStateModelFactory(10);
- DummyOnlineOfflineStateModelFactory ofModelFactory =
- new DummyOnlineOfflineStateModelFactory(10);
- stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
- stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
-
- MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
- stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
-
- connect();
- _startCountDown.countDown();
-
- _stopCountDown.await();
- } catch (InterruptedException e) {
- String msg =
- "participant: " + getInstanceName() + ", " + Thread.currentThread().getName()
- + " is interrupted";
- LOG.info(msg);
- } catch (Exception e) {
- LOG.error("exception running participant-manager", e);
- } finally {
- _startCountDown.countDown();
-
- disconnect();
- _waitStopCompleteCountDown.countDown();
- }
- }
-
- @Override
- public ZkClient getZkClient() {
- return _zkclient;
- }
-
- @Override
- public List<CallbackHandler> getHandlers() {
- return _handlers;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
index 877cf3c..99986ef 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
@@ -24,13 +24,15 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockMultiClusterController;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkCallbackHandler;
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.testutil.HelixTestUtil;
@@ -97,19 +99,18 @@ public class TestConsecutiveZkSessionExpiry extends ZkTestBase {
"MasterSlave", true); // do rebalance
// start controller
- final ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller");
+ final MockController controller = new MockController(_zkaddr, clusterName, "controller");
controller.syncStart();
// start participants
CountDownLatch startCountdown = new CountDownLatch(1);
CountDownLatch endCountdown = new CountDownLatch(1);
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
final String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
if (i == 0) {
participants[i].addPreConnectCallback(new PreConnectTestCallback(instanceName,
@@ -178,14 +179,13 @@ public class TestConsecutiveZkSessionExpiry extends ZkTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- ClusterDistributedController[] distributedControllers = new ClusterDistributedController[n];
+ MockMultiClusterController[] distributedControllers = new MockMultiClusterController[n];
CountDownLatch startCountdown = new CountDownLatch(1);
CountDownLatch endCountdown = new CountDownLatch(1);
for (int i = 0; i < n; i++) {
String contrllerName = "localhost_" + (12918 + i);
- distributedControllers[i] =
- new ClusterDistributedController(_zkaddr, clusterName, contrllerName);
+ distributedControllers[i] = new MockMultiClusterController(_zkaddr, clusterName, contrllerName);
distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
new MockMSModelFactory());
if (i == 0) {
@@ -237,12 +237,15 @@ public class TestConsecutiveZkSessionExpiry extends ZkTestBase {
Assert.assertNotNull(leader);
Assert.assertEquals(leader.getId(), "localhost_12919");
- // check localhost_12918 has 2 handlers: message and data-accessor
- LOG.debug("handlers: " + TestHelper.printHandlers(distributedControllers[0]));
- List<CallbackHandler> handlers = distributedControllers[0].getHandlers();
- Assert.assertEquals(handlers.size(), 1,
- "Distributed controller should have 1 handler (message) after lose leadership, but was "
- + handlers.size());
+ // check localhost_12918 has 2 handlers: message and leader-election
+ TestHelper.printHandlers(distributedControllers[0], distributedControllers[0].getHandlers());
+ List<ZkCallbackHandler> handlers = distributedControllers[0].getHandlers();
+ Assert
+ .assertEquals(
+ handlers.size(),
+ 2,
+ "Distributed controller should have 2 handler (message and leader election) after lose leadership, but was "
+ + handlers.size());
// clean up
distributedControllers[0].disconnect();
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java
index 1544dc8..69d1dbe 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -53,10 +55,10 @@ public class TestControllerManager extends ZkTestBase {
// start multiple controllers of same name
int m = 3;
- ClusterControllerManager[] controllers = new ClusterControllerManager[m];
+ MockController[] controllers = new MockController[m];
for (int i = 0; i < m; i++) {
- controllers[i] = new ClusterControllerManager(_zkaddr, clusterName, "controller");
+ controllers[i] = new MockController(_zkaddr, clusterName, "controller");
controllers[i].syncStart();
}
@@ -69,11 +71,11 @@ public class TestControllerManager extends ZkTestBase {
}
Assert.assertEquals(leaderCnt, 1, "Should have only 1 leader but was " + leaderCnt);
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
final String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
@@ -107,7 +109,7 @@ public class TestControllerManager extends ZkTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
"localhost", // participant name prefix
@@ -119,14 +121,14 @@ public class TestControllerManager extends ZkTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller");
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index f915c4f..18234b5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -27,9 +27,10 @@ import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.MockMultiClusterController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkCallbackHandler;
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.testutil.ZkTestBase;
@@ -109,8 +110,8 @@ public class TestDistributedControllerManager extends ZkTestBase {
* @param newController
* @throws Exception
*/
- void expireController(ClusterDistributedController expireController,
- ClusterDistributedController newController) throws Exception {
+ void expireController(MockMultiClusterController expireController,
+ MockMultiClusterController newController) throws Exception {
String clusterName = expireController.getClusterName();
LOG.info("Expiring distributedController: " + expireController.getInstanceName()
+ ", session: " + expireController.getSessionId() + " ...");
@@ -136,13 +137,12 @@ public class TestDistributedControllerManager extends ZkTestBase {
Assert.assertNotNull(leader);
Assert.assertEquals(leader.getId(), newController.getInstanceName());
- // check expired-controller has 2 handlers: message and data-accessor
- LOG.debug(expireController.getInstanceName() + " handlers: "
- + TestHelper.printHandlers(expireController));
+ // check expired-controller has 2 handlers: message and leader-election
+ TestHelper.printHandlers(expireController, expireController.getHandlers());
- List<CallbackHandler> handlers = expireController.getHandlers();
- Assert.assertEquals(handlers.size(), 1,
- "Distributed controller should have 1 handler (message) after lose leadership, but was "
+ List<ZkCallbackHandler> handlers = expireController.getHandlers();
+ Assert.assertEquals(handlers.size(), 2,
+ "Distributed controller should have 2 handler (message and leader-election) after lose leadership, but was "
+ handlers.size());
}
@@ -165,12 +165,12 @@ public class TestDistributedControllerManager extends ZkTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- ClusterDistributedController[] distributedControllers = new ClusterDistributedController[n];
+ MockMultiClusterController[] distributedControllers = new MockMultiClusterController[n];
for (int i = 0; i < n; i++) {
String contrllerName = "localhost_" + (12918 + i);
distributedControllers[i] =
- new ClusterDistributedController(_zkaddr, clusterName, contrllerName);
+ new MockMultiClusterController(_zkaddr, clusterName, contrllerName);
distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
new MockMSModelFactory());
distributedControllers[i].connect();
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 4d46883..309ab18 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -34,6 +34,8 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -110,7 +112,7 @@ public class TestParticipantManager extends ZkTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
"localhost", // participant name prefix
@@ -122,14 +124,14 @@ public class TestParticipantManager extends ZkTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
@@ -197,7 +199,7 @@ public class TestParticipantManager extends ZkTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
"localhost", // participant name prefix
@@ -209,14 +211,14 @@ public class TestParticipantManager extends ZkTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].setTransition(new SessionExpiryTransition(startCountdown, endCountdown));
participants[i].syncStart();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
index b82f156..d6d7bab 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
@@ -28,6 +28,8 @@ import java.util.TreeMap;
import org.apache.helix.HelixAdmin;
import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.participant.HelixStateMachineEngine;
@@ -70,15 +72,15 @@ public class TestStateModelLeak extends ZkTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller");
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller");
controller.syncStart();
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
final String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
@@ -144,15 +146,15 @@ public class TestStateModelLeak extends ZkTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller");
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller");
controller.syncStart();
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
final String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
if (i == 0) {
Map<String, Set<String>> errTransitionMap = new HashMap<String, Set<String>>();
Set<String> partitions = new HashSet<String>();
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
index 650f13f..1393231 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
@@ -31,6 +31,8 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.model.CurrentState;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -61,16 +63,15 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
"MasterSlave", true); // do rebalance
// start controller
- final ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller");
+ final MockController controller = new MockController(_zkaddr, clusterName, "controller");
controller.connect();
// start participants
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
@@ -98,7 +99,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers.");
// check participant zk-watchers
- final MockParticipantManager participantManagerToExpire = participants[0];
+ final MockParticipant participantManagerToExpire = participants[0];
result = TestHelper.verify(new TestHelper.Verifier() {
@Override
@@ -118,8 +119,8 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
// printHandlers(participantManagerToExpire);
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManagerToExpire.getHandlers().size();
- Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
- "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
+ Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
+ "HelixController should have 10 (5+2n) callback handlers for 2 (n) participant");
Assert.assertEquals(particHandlerNb, 1,
"HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
@@ -200,16 +201,15 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- final ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller");
+ final MockController controller = new MockController(_zkaddr, clusterName, "controller");
controller.syncStart();
// start participants
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
@@ -220,14 +220,14 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
Assert.assertTrue(result);
// wait until we get all the listeners registered
- final MockParticipantManager participantManager = participants[0];
+ final MockParticipant participantManager = participants[0];
result = TestHelper.verify(new TestHelper.Verifier() {
@Override
public boolean verify() throws Exception {
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManager.getHandlers().size();
- if (controllerHandlerNb == 9 && particHandlerNb == 2)
+ if (controllerHandlerNb == 10 && particHandlerNb == 2)
return true;
else
return false;
@@ -236,12 +236,14 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManager.getHandlers().size();
- Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
- "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
- + controllerHandlerNb + ", " + TestHelper.printHandlers(controller));
+ TestHelper.printHandlers(controller, controller.getHandlers());
+ TestHelper.printHandlers(participantManager, participantManager.getHandlers());
+ Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
+ "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was "
+ + controllerHandlerNb);
Assert.assertEquals(particHandlerNb, 1,
"HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
- + particHandlerNb + ", " + TestHelper.printHandlers(participantManager));
+ + particHandlerNb);
// expire controller
LOG.debug("Expiring controller session...");
@@ -292,13 +294,13 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
// check HelixManager#_handlers
// printHandlers(controllerManager);
int handlerNb = controller.getHandlers().size();
+ TestHelper.printHandlers(controller, controller.getHandlers());
Assert.assertEquals(handlerNb, controllerHandlerNb,
- "controller callback handlers should not increase after participant session expiry, but was "
- + TestHelper.printHandlers(controller));
+ "controller callback handlers should not increase after participant session expiry");
handlerNb = participantManager.getHandlers().size();
+ TestHelper.printHandlers(participantManager, participantManager.getHandlers());
Assert.assertEquals(handlerNb, particHandlerNb,
- "participant callback handlers should not increase after participant session expiry, but was "
- + TestHelper.printHandlers(participantManager));
+ "participant callback handlers should not increase after participant session expiry");
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@@ -318,19 +320,18 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
2, // replicas
"MasterSlave", true);
- final ClusterControllerManager controller =
- new ClusterControllerManager(zkAddr, clusterName, "controller");
+ final MockController controller = new MockController(zkAddr, clusterName, "controller");
controller.syncStart();
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(zkAddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(zkAddr, clusterName, instanceName);
participants[i].syncStart();
// register a controller listener on participant_0
if (i == 0) {
- MockParticipantManager manager = participants[0];
+ MockParticipant manager = participants[0];
manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
@Override
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -349,7 +350,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
clusterName));
Assert.assertTrue(result);
- MockParticipantManager participantToExpire = participants[0];
+ MockParticipant participantToExpire = participants[0];
String oldSessionId = participantToExpire.getSessionId();
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
@@ -371,9 +372,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: "
+ path);
Assert
- .assertEquals(
- childListeners.size(),
- 2,
+ .assertEquals(childListeners.size(), 2,
"Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
Assert.assertEquals(childListeners.get(path).size(), 1,
@@ -382,17 +381,14 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertNull(childListeners.get(path),
- "Should have no child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
Map<String, List<String>> watchPaths =
ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
Assert
- .assertEquals(
- watchPaths.get("dataWatches").size(),
- 3,
+ .assertEquals(watchPaths.get("dataWatches").size(), 3,
"Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
"Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
@@ -435,8 +431,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertNull(childListeners.get(path),
- "Should have no child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 2e2c8b6..22beed1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -30,9 +30,9 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.ScheduleConfig;
@@ -63,8 +63,8 @@ public class TestIndependentTaskRebalancer extends ZkTestBase {
private static final int n = 5;
private static final int START_PORT = 12918;
private final String CLUSTER_NAME = "TestIndependentTaskRebalancer";
- private final MockParticipantManager[] _participants = new MockParticipantManager[n];
- private ClusterControllerManager _controller;
+ private final MockParticipant[] _participants = new MockParticipant[n];
+ private MockController _controller;
private Set<String> _invokedClasses = Sets.newHashSet();
private Map<String, Integer> _runCounts = Maps.newHashMap();
@@ -104,7 +104,7 @@ public class TestIndependentTaskRebalancer extends ZkTestBase {
}
});
- _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+ _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
// Register a Task state model factory.
StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
@@ -115,7 +115,7 @@ public class TestIndependentTaskRebalancer extends ZkTestBase {
// Start controller
String controllerName = "controller_0";
- _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+ _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
_controller.syncStart();
// Start an admin connection
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index e39615d..84c0e1d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -30,8 +30,8 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
@@ -66,8 +66,8 @@ public class TestTaskRebalancer extends ZkTestBase {
private static final int NUM_PARTITIONS = 20;
private static final int NUM_REPLICAS = 3;
private final String CLUSTER_NAME = "TestTaskRebalancer";
- private final MockParticipantManager[] _participants = new MockParticipantManager[n];
- private ClusterControllerManager _controller;
+ private final MockParticipant[] _participants = new MockParticipant[n];
+ private MockController _controller;
private HelixManager _manager;
private TaskDriver _driver;
@@ -102,7 +102,7 @@ public class TestTaskRebalancer extends ZkTestBase {
// start dummy participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+ _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
// Register a Task state model factory.
StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
@@ -113,7 +113,7 @@ public class TestTaskRebalancer extends ZkTestBase {
// start controller
String controllerName = "controller_0";
- _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+ _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
_controller.syncStart();
// create cluster manager
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 6de361d..e3e8b89 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -27,8 +27,8 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.Task;
@@ -60,8 +60,8 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
private static final int NUM_PARTITIONS = 20;
private static final int NUM_REPLICAS = 3;
private final String CLUSTER_NAME = "TestTaskRebalancerStopResume";
- private final MockParticipantManager[] _participants = new MockParticipantManager[n];
- private ClusterControllerManager _controller;
+ private final MockParticipant[] _participants = new MockParticipant[n];
+ private MockController _controller;
private HelixManager _manager;
private TaskDriver _driver;
@@ -94,7 +94,7 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
// start dummy participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+ _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
// Register a Task state model factory.
StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
@@ -106,7 +106,7 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
// start controller
String controllerName = "controller_0";
- _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+ _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
_controller.syncStart();
// create cluster manager
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/MockListener.java b/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
index 376481e..d6edf17 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
@@ -21,11 +21,11 @@ package org.apache.helix.manager;
import java.util.List;
-import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
@@ -37,7 +37,7 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
public class MockListener implements IdealStateChangeListener, LiveInstanceChangeListener,
- ConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
+ InstanceConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
ControllerChangeListener, MessageListener
{
@@ -71,7 +71,7 @@ public class MockListener implements IdealStateChangeListener, LiveInstanceChang
}
@Override
- public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
+ public void onInstanceConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
isConfigChangeListenerInvoked = true;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/MockController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/MockController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/MockController.java
new file mode 100644
index 0000000..f4d2159
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/MockController.java
@@ -0,0 +1,86 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+public class MockController extends ZKHelixManager implements Runnable {
+ private static Logger LOG = Logger.getLogger(MockController.class);
+
+ private final CountDownLatch _startCountDown = new CountDownLatch(1);
+ private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+ private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+ public MockController(String zkAddr, String clusterName, String controllerName) {
+ super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
+ }
+
+ public void syncStop() {
+ _stopCountDown.countDown();
+ try {
+ _waitStopFinishCountDown.await();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for finish", e);
+ }
+ }
+
+ public void syncStart() {
+ // TODO: prevent start multiple times
+ new Thread(this).start();
+ try {
+ _startCountDown.await();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for start", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ connect();
+ _startCountDown.countDown();
+ _stopCountDown.await();
+ } catch (Exception e) {
+ LOG.error("exception running controller-manager", e);
+ } finally {
+ _startCountDown.countDown();
+ disconnect();
+ _waitStopFinishCountDown.countDown();
+ }
+ }
+
+ public ZkClient getZkClient() {
+ ZkHelixConnection conn = (ZkHelixConnection)getConn();
+ return conn._zkclient;
+ }
+
+ public ZkHelixConnection getConn() {
+ return (ZkHelixConnection)_role.getConnection();
+ }
+
+ public List<ZkCallbackHandler> getHandlers() {
+ ZkHelixConnection conn = (ZkHelixConnection)getConn();
+ return conn._handlers.get(_role);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
new file mode 100644
index 0000000..7f8b1a3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
@@ -0,0 +1,99 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.participant.DistClusterControllerStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+public class MockMultiClusterController extends ZKHelixManager implements Runnable {
+ private static Logger LOG = Logger.getLogger(MockMultiClusterController.class);
+
+ private final CountDownLatch _startCountDown = new CountDownLatch(1);
+ private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+ private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+ public MockMultiClusterController(String zkAddr, String clusterName, String controllerName) {
+ super(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
+ }
+
+ public void syncStop() {
+ _stopCountDown.countDown();
+ try {
+ _waitStopFinishCountDown.await();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for finish", e);
+ }
+ }
+
+ public void syncStart() {
+ // TODO: prevent start multiple times
+ new Thread(this).start();
+ try {
+ _startCountDown.await();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for start", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ StateMachineEngine stateMach = getStateMachineEngine();
+ DistClusterControllerStateModelFactory lsModelFactory =
+ new DistClusterControllerStateModelFactory(_zkAddress);
+ stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+
+ connect();
+ _startCountDown.countDown();
+ _stopCountDown.await();
+ } catch (Exception e) {
+ LOG.error("exception running controller-manager", e);
+ } finally {
+ _startCountDown.countDown();
+ disconnect();
+ _waitStopFinishCountDown.countDown();
+ }
+ }
+
+ public ZkHelixConnection getConn() {
+ return (ZkHelixConnection)_role.getConnection();
+ }
+
+ public ZkClient getZkClient() {
+ ZkHelixConnection conn = (ZkHelixConnection)getConn();
+ return conn._zkclient;
+ }
+
+ public List<ZkCallbackHandler> getHandlers() {
+ ZkHelixConnection conn = (ZkHelixConnection)getConn();
+ List<ZkCallbackHandler> handlers = new ArrayList<ZkCallbackHandler>();
+ for (List<ZkCallbackHandler> handlerList : conn._handlers.values()) {
+ handlers.addAll(handlerList);
+ }
+
+ return handlers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java b/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
new file mode 100644
index 0000000..f107d3d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
@@ -0,0 +1,119 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.InstanceType;
+import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
+import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.mock.participant.MockSchemataModelFactory;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+public class MockParticipant extends ZKHelixManager implements Runnable {
+ private static Logger LOG = Logger.getLogger(MockParticipant.class);
+
+ private final CountDownLatch _startCountDown = new CountDownLatch(1);
+ private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+ private final CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
+
+ private final MockMSModelFactory _msModelFactory = new MockMSModelFactory(null);
+
+
+ public MockParticipant(String zkAddress, String clusterName, String instanceName) {
+ super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress);
+ }
+
+ public void setTransition(MockTransition transition) {
+ _msModelFactory.setTrasition(transition);
+ }
+
+ public void syncStop() {
+ _stopCountDown.countDown();
+ try {
+ _waitStopCompleteCountDown.await();
+ } catch (InterruptedException e) {
+ LOG.error("exception in syncStop participant-manager", e);
+ }
+ }
+
+ public void syncStart() {
+ try {
+ new Thread(this).start();
+ _startCountDown.await();
+ } catch (InterruptedException e) {
+ LOG.error("exception in syncStart participant-manager", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ StateMachineEngine stateMach = getStateMachineEngine();
+ stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+
+ DummyLeaderStandbyStateModelFactory lsModelFactory =
+ new DummyLeaderStandbyStateModelFactory(10);
+ DummyOnlineOfflineStateModelFactory ofModelFactory =
+ new DummyOnlineOfflineStateModelFactory(10);
+ stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+ stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+
+ MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
+ stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
+
+ connect();
+ _startCountDown.countDown();
+
+ _stopCountDown.await();
+ } catch (InterruptedException e) {
+ String msg =
+ "participant: " + getInstanceName() + ", " + Thread.currentThread().getName()
+ + " is interrupted";
+ LOG.info(msg);
+ } catch (Exception e) {
+ LOG.error("exception running participant-manager", e);
+ } finally {
+ _startCountDown.countDown();
+
+ disconnect();
+ _waitStopCompleteCountDown.countDown();
+ }
+ }
+
+ public HelixConnection getConn() {
+ return _role.getConnection();
+ }
+
+ public ZkClient getZkClient() {
+ ZkHelixConnection conn = (ZkHelixConnection)getConn();
+ return conn._zkclient;
+ }
+
+ public List<ZkCallbackHandler> getHandlers() {
+ ZkHelixConnection conn = (ZkHelixConnection)getConn();
+ return conn._handlers.get(_role);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
index ca0d4ab..3b5b24e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -23,7 +23,6 @@ import java.util.Date;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.testutil.ZkTestBase;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -47,8 +46,8 @@ public class TestHandleNewSession extends ZkTestBase {
3, // replicas
"MasterSlave", true); // do rebalance
- MockParticipantManager participant =
- new MockParticipantManager(_zkaddr, clusterName, "localhost_12918");
+ MockParticipant participant =
+ new MockParticipant(_zkaddr, clusterName, "localhost_12918");
participant.syncStart();
// Logger.getRootLogger().setLevel(Level.INFO);
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
index e59dd0c..7e2f4a7 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -20,7 +20,6 @@ package org.apache.helix.manager.zk;
*/
import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -40,7 +39,7 @@ public class TestLiveInstanceBounce extends ZkStandAloneCMTestBase {
e.printStackTrace();
}
// restart the participant
- _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+ _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
_participants[i].syncStart();
Thread.sleep(100);
}