You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/11 22:09:56 UTC
[02/10] [HELIX-279] Apply gc handling fixes to ZKHelixManager
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index be65ad1..00537a4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -30,13 +30,13 @@ import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.manager.ZkTestManager;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.CurrentState;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
@@ -63,15 +63,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -80,9 +81,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(result);
- final ZkHelixTestManager controllerManager = controller.getManager();
- final ZkHelixTestManager participantManagerToExpire =
- participants[1].getManager();
+ final MockParticipantManager participantManagerToExpire = participants[1];
// check controller zk-watchers
result = TestHelper.verify(new TestHelper.Verifier() {
@@ -90,7 +89,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
@Override
public boolean verify() throws Exception {
Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
- Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+ // Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+ Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
// System.out.println("controller watch paths: " + watchPaths);
// controller should have 5 + 2n + m + (m+2)n zk-watchers
@@ -118,7 +118,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// check HelixManager#_handlers
// printHandlers(controllerManager);
// printHandlers(participantManagerToExpire);
- int controllerHandlerNb = controllerManager.getHandlers().size();
+ int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManagerToExpire.getHandlers().size();
Assert.assertEquals(controllerHandlerNb, 9,
"HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
@@ -145,7 +145,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
@Override
public boolean verify() throws Exception {
Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
- Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+ Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
// System.out.println("controller watch paths after session expiry: " + watchPaths);
// controller should have 5 + 2n + m + (m+2)n zk-watchers
@@ -173,13 +173,19 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// check handlers
// printHandlers(controllerManager);
// printHandlers(participantManagerToExpire);
- int handlerNb = controllerManager.getHandlers().size();
+ int handlerNb = controller.getHandlers().size();
Assert.assertEquals(handlerNb, controllerHandlerNb,
"controller callback handlers should not increase after participant session expiry");
handlerNb = participantManagerToExpire.getHandlers().size();
Assert.assertEquals(handlerNb, particHandlerNb,
"participant callback handlers should not increase after participant session expiry");
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@@ -202,15 +208,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -219,15 +226,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(result);
- final ZkHelixTestManager controllerManager = controller.getManager();
- final ZkHelixTestManager participantManager = participants[0].getManager();
+ // final ZkHelixTestManager controllerManager = controller.getManager();
+ // final ZkHelixTestManager participantManager = participants[0].getManager();
+ final MockParticipantManager participantManager = participants[0];
// wait until we get all the listeners registered
result = TestHelper.verify(new TestHelper.Verifier() {
@Override
public boolean verify() throws Exception {
- int controllerHandlerNb = controllerManager.getHandlers().size();
+ int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManager.getHandlers().size();
if (controllerHandlerNb == 9 && particHandlerNb == 2)
return true;
@@ -236,21 +244,21 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
}
}, 1000);
- int controllerHandlerNb = controllerManager.getHandlers().size();
+ int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManager.getHandlers().size();
Assert.assertEquals(controllerHandlerNb, 9,
"HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
- + controllerHandlerNb + ", " + printHandlers(controllerManager));
+ + controllerHandlerNb + ", " + printHandlers(controller));
Assert.assertEquals(particHandlerNb, 2,
"HelixParticipant should have 2 (msg+cur-state) callback handlers, but was "
+ particHandlerNb + ", " + printHandlers(participantManager));
// expire controller
System.out.println("Expiring controller session...");
- String oldSessionId = controllerManager.getSessionId();
+ String oldSessionId = controller.getSessionId();
- ZkTestHelper.expireSession(controllerManager.getZkClient());
- String newSessionId = controllerManager.getSessionId();
+ ZkTestHelper.expireSession(controller.getZkClient());
+ String newSessionId = controller.getSessionId();
System.out.println("Expired controller session. oldSessionId: " + oldSessionId
+ ", newSessionId: " + newSessionId);
@@ -265,7 +273,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
@Override
public boolean verify() throws Exception {
Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
- Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+ Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
// System.out.println("controller watch paths after session expiry: " + watchPaths);
// controller should have 5 + 2n + m + (m+2)n zk-watchers
@@ -292,15 +300,21 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// check HelixManager#_handlers
// printHandlers(controllerManager);
- int handlerNb = controllerManager.getHandlers().size();
+ int handlerNb = controller.getHandlers().size();
Assert.assertEquals(handlerNb, controllerHandlerNb,
"controller callback handlers should not increase after participant session expiry, but was "
- + printHandlers(controllerManager));
+ + printHandlers(controller));
handlerNb = participantManager.getHandlers().size();
Assert.assertEquals(handlerNb, particHandlerNb,
"participant callback handlers should not increase after participant session expiry, but was "
+ printHandlers(participantManager));
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@@ -319,18 +333,20 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
2, // replicas
"MasterSlave", true);
- ClusterController controller = new ClusterController(clusterName, "controller_0", zkAddr);
+ final ClusterControllerManager controller =
+ new ClusterControllerManager(zkAddr, clusterName, "controller_0");
controller.syncStart();
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, zkAddr, null);
+ participants[i] = new MockParticipantManager(zkAddr, clusterName, instanceName);
participants[i].syncStart();
// register a controller listener on participant_0
if (i == 0) {
- ZkHelixTestManager manager = participants[0].getManager();
+ // ZkHelixTestManager manager = participants[0].getManager();
+ MockParticipantManager manager = participants[0];
manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
@Override
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -349,7 +365,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
clusterName));
Assert.assertTrue(result);
- ZkHelixTestManager participantToExpire = participants[0].getManager();
+ MockParticipantManager participantToExpire = participants[0];
String oldSessionId = participantToExpire.getSessionId();
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
@@ -474,11 +490,18 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
"Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
// Thread.sleep(1000);
+
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
// debug code
- static String printHandlers(ZkHelixTestManager manager) {
+ static String printHandlers(ZkTestManager manager) {
StringBuilder sb = new StringBuilder();
List<CallbackHandler> handlers = manager.getHandlers();
sb.append(manager.getInstanceName() + " has " + handlers.size() + " cb-handlers. [");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index d04fbfd..9188e61 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -19,14 +19,13 @@ package org.apache.helix.integration;
* under the License.
*/
-import java.util.Map;
import java.util.logging.Level;
import org.I0Itec.zkclient.ZkServer;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -37,7 +36,6 @@ import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.util.ZKClientPool;
import org.apache.log4j.Logger;
-import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
@@ -85,7 +83,7 @@ public class ZkIntegrationTestBase {
protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
Builder keyBuilder = accessor.keyBuilder();
LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
@@ -95,49 +93,6 @@ public class ZkIntegrationTestBase {
return leader.getInstanceName();
}
- /**
- * Stop current leader and returns the new leader
- * @param zkClient
- * @param clusterName
- * @param startCMResultMap
- * @return
- */
- protected String stopCurrentLeader(ZkClient zkClient, String clusterName,
- Map<String, StartCMResult> startCMResultMap) {
- String leader = getCurrentLeader(zkClient, clusterName);
- Assert.assertTrue(leader != null);
- System.out.println("stop leader: " + leader + " in " + clusterName);
- Assert.assertTrue(leader != null);
-
- StartCMResult result = startCMResultMap.remove(leader);
- Assert.assertTrue(result._manager != null);
- result._manager.disconnect();
-
- Assert.assertTrue(result._thread != null);
- result._thread.interrupt();
-
- boolean isNewLeaderElected = false;
- String newLeader = null;
- try {
- for (int i = 0; i < 5; i++) {
- Thread.sleep(1000);
- newLeader = getCurrentLeader(zkClient, clusterName);
- if (!newLeader.equals(leader)) {
- isNewLeaderElected = true;
- System.out.println("new leader elected: " + newLeader + " in " + clusterName);
- break;
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if (isNewLeaderElected == false) {
- System.out.println("fail to elect a new leader in " + clusterName);
- }
- AssertJUnit.assertTrue(isNewLeaderElected);
- return newLeader;
- }
-
protected void enableHealthCheck(String clusterName) {
ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName).build();
new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index e759fc7..5d169d5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -20,16 +20,9 @@ package org.apache.helix.integration;
*/
import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -57,8 +50,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
- protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
- protected ZkClient _zkClient;
+ protected MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
+ protected ClusterControllerManager _controller;
int _replica = 3;
@@ -67,11 +60,9 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
// Logger.getRootLogger().setLevel(Level.INFO);
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace)) {
- _zkClient.deleteRecursive(namespace);
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
}
_setupTool = new ClusterSetup(ZK_ADDR);
@@ -87,21 +78,14 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
// start dummy participants
for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null) {
- LOG.error("fail to start particpant:" + instanceName
- + "(participant with same name already exists)");
- } else {
- StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
- _startCMResultMap.put(instanceName, result);
- }
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i].syncStart();
}
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
boolean result =
ClusterStateVerifier
@@ -119,30 +103,11 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
* shutdown order: 1) disconnect the controller 2) disconnect participants
*/
- StartCMResult result;
- Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext()) {
- String instanceName = it.next().getKey();
- if (instanceName.startsWith(CONTROLLER_PREFIX)) {
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
- }
- }
-
- Thread.sleep(100);
- it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext()) {
- String instanceName = it.next().getKey();
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
+ _controller.syncStop();
+ for (int i = 0; i < NODE_NR; i++) {
+ _participants[i].syncStop();
}
- _zkClient.close();
- // logger.info("END at " + new Date(System.currentTimeMillis()));
System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
index f19e5dd..c6fbea6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
@@ -23,8 +23,11 @@ import java.util.List;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.StatusUpdate;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -36,6 +39,7 @@ import org.testng.annotations.BeforeClass;
*/
public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase {
+ @Override
@BeforeClass
public void beforeClass() throws Exception {
ZKPropertyTransferServer.PERIOD = 500;
@@ -44,19 +48,20 @@ public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneC
super.beforeClass();
Thread.sleep(1000);
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder kb = accessor.keyBuilder();
+
for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null) {
- HelixDataAccessor accessor =
- _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
- Builder kb = accessor.keyBuilder();
- List<StatusUpdate> statusUpdates =
- accessor.getChildValues(kb.stateTransitionStatus(instanceName,
- _startCMResultMap.get(instanceName)._manager.getSessionId(), TEST_DB));
+ String instanceName = _participants[i].getInstanceName();
+ List<StatusUpdate> statusUpdates =
+ accessor.getChildValues(kb.stateTransitionStatus(instanceName,
+ _participants[i].getSessionId(), TEST_DB));
+
for (int j = 0; j < 10; j++) {
statusUpdates =
accessor.getChildValues(kb.stateTransitionStatus(instanceName,
- _startCMResultMap.get(instanceName)._manager.getSessionId(), TEST_DB));
+ _participants[i].getSessionId(), TEST_DB));
if (statusUpdates.size() == 0) {
Thread.sleep(500);
} else {
@@ -70,10 +75,10 @@ public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneC
Assert
.assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
}
- }
}
}
+ @Override
@AfterClass
public void afterClass() throws Exception {
super.afterClass();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index e0da9fb..b8f0f2b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -22,12 +22,14 @@ package org.apache.helix.integration.manager;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ControllerManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.log4j.Logger;
-public class ClusterControllerManager extends ControllerManager implements Runnable, ZkTestManager {
+public class ClusterControllerManager extends ZKHelixManager implements Runnable, ZkTestManager {
private static Logger LOG = Logger.getLogger(ClusterControllerManager.class);
private final CountDownLatch _startCountDown = new CountDownLatch(1);
@@ -35,7 +37,7 @@ public class ClusterControllerManager extends ControllerManager implements Runna
private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) {
- super(zkAddr, clusterName, controllerName);
+ super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
}
public void syncStop() {
@@ -43,8 +45,7 @@ public class ClusterControllerManager extends ControllerManager implements Runna
try {
_waitStopFinishCountDown.await();
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted waiting for finish", e);
}
}
@@ -54,8 +55,7 @@ public class ClusterControllerManager extends ControllerManager implements Runna
try {
_startCountDown.await();
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted waiting for start", e);
}
}
@@ -84,4 +84,7 @@ public class ClusterControllerManager extends ControllerManager implements Runna
return _handlers;
}
+ public List<HelixTimerTask> getControllerTimerTasks() {
+ return _controllerTimerTasks;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 751c3cb..44d0957 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -22,12 +22,15 @@ package org.apache.helix.integration.manager;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.DistributedControllerManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.participant.DistClusterControllerStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
import org.apache.log4j.Logger;
-public class ClusterDistributedController extends DistributedControllerManager implements Runnable,
+public class ClusterDistributedController extends ZKHelixManager implements Runnable,
ZkTestManager {
private static Logger LOG = Logger.getLogger(ClusterDistributedController.class);
@@ -36,7 +39,7 @@ public class ClusterDistributedController extends DistributedControllerManager i
private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
public ClusterDistributedController(String zkAddr, String clusterName, String controllerName) {
- super(zkAddr, clusterName, controllerName);
+ super(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
}
public void syncStop() {
@@ -44,8 +47,7 @@ public class ClusterDistributedController extends DistributedControllerManager i
try {
_waitStopFinishCountDown.await();
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted waiting for finish", e);
}
}
@@ -55,14 +57,18 @@ public class ClusterDistributedController extends DistributedControllerManager i
try {
_startCountDown.await();
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted waiting for start", e);
}
}
@Override
public void run() {
try {
+ StateMachineEngine stateMach = getStateMachineEngine();
+ DistClusterControllerStateModelFactory lsModelFactory =
+ new DistClusterControllerStateModelFactory(_zkAddress);
+ stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+
connect();
_startCountDown.countDown();
_stopCountDown.await();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 8249f4a..34efe34 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -22,18 +22,20 @@ package org.apache.helix.integration.manager;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.mock.participant.MockJobIntf;
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.mock.participant.MockSchemataModelFactory;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.log4j.Logger;
-public class MockParticipantManager extends ParticipantManager implements Runnable, ZkTestManager {
+public class MockParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager {
private static Logger LOG = Logger.getLogger(MockParticipantManager.class);
private final CountDownLatch _startCountDown = new CountDownLatch(1);
@@ -43,7 +45,7 @@ public class MockParticipantManager extends ParticipantManager implements Runnab
private final MockMSModelFactory _msModelFactory = new MockMSModelFactory(null);
public MockParticipantManager(String zkAddr, String clusterName, String instanceName) {
- super(zkAddr, clusterName, instanceName);
+ super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
}
public void setTransition(MockTransition transition) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index b5ef255..aa00a8d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -22,14 +22,16 @@ package org.apache.helix.integration.manager;
import java.util.Date;
import java.util.List;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.integration.ZkIntegrationTestBase;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.DistributedControllerManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.model.LiveInstance;
@@ -61,11 +63,12 @@ public class TestDistributedControllerManager extends ZkIntegrationTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- DistributedControllerManager[] distributedControllers = new DistributedControllerManager[n];
+ HelixManager[] distributedControllers = new HelixManager[n];
for (int i = 0; i < n; i++) {
int port = 12918 + i;
distributedControllers[i] =
- new DistributedControllerManager(ZK_ADDR, clusterName, "localhost_" + port);
+ new ZKHelixManager(clusterName, "localhost_" + port, InstanceType.CONTROLLER_PARTICIPANT,
+ ZK_ADDR);
distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
new MockMSModelFactory());
distributedControllers[i].connect();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index a818fd3..82f583f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -24,6 +24,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathConfig;
@@ -33,9 +35,8 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.manager.zk.ControllerManager;
-import org.apache.helix.manager.zk.ParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.participant.MockMSModelFactory;
@@ -70,13 +71,14 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
1, // replicas
"MasterSlave", true); // do rebalance
- ParticipantManager participant =
- new ParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
+ HelixManager participant =
+ new ZKHelixManager(clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
new MockMSModelFactory());
participant.connect();
- ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
+ HelixManager controller =
+ new ZKHelixManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
controller.connect();
boolean result =
@@ -121,8 +123,9 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
- controller.connect();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
@@ -151,7 +154,7 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
Assert.assertNotSame(newSessionId, oldSessionId);
// cleanup
- controller.disconnect();
+ controller.syncStop();
for (int i = 0; i < n; i++) {
participants[i].syncStop();
}
@@ -207,8 +210,9 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
// start controller
- ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
- controller.connect();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
@@ -245,7 +249,7 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
Assert.assertTrue(errString.indexOf("InterruptedException") != -1);
// cleanup
- controller.disconnect();
+ controller.syncStop();
for (int i = 0; i < n; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
index 1b9ee62..2aa4544 100644
--- a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
+++ b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
@@ -40,8 +40,8 @@ public class TestJosqlProcessor extends ZkStandAloneCMTestBase {
"integrationTest"
})
public void testJosqlQuery() throws Exception {
- HelixManager manager =
- ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
+ HelixManager manager = _participants[0];
+ // ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
// Find the instance name that contains partition TestDB_2 and state is 'MASTER'
String SQL =
@@ -183,8 +183,8 @@ public class TestJosqlProcessor extends ZkStandAloneCMTestBase {
@Test(groups = ("unitTest"))
public void testOrderby() throws Exception {
- HelixManager manager =
- ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
+ HelixManager manager = _participants[0];
+ // ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
Map<String, ZNRecord> scnMap = new HashMap<String, ZNRecord>();
for (int i = 0; i < NODE_NR; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
index 4cef5a0..8b5b30c 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
@@ -30,10 +30,13 @@ import org.apache.helix.manager.zk.DefaultControllerMessageHandlerFactory.Defaul
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
public class TestDefaultControllerMsgHandlerFactory {
+ private static Logger LOG = Logger.getLogger(TestDefaultControllerMsgHandlerFactory.class);
+
@Test()
public void testDefaultControllerMsgHandlerFactory() {
System.out.println("START TestDefaultControllerMsgHandlerFactory at "
@@ -70,8 +73,7 @@ public class TestDefaultControllerMsgHandlerFactory {
} catch (HelixException e) {
exceptionCaught = true;
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted handling message", e);
}
AssertJUnit.assertTrue(exceptionCaught);
@@ -83,8 +85,7 @@ public class TestDefaultControllerMsgHandlerFactory {
} catch (HelixException e) {
exceptionCaught = true;
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted handling message", e);
}
AssertJUnit.assertFalse(exceptionCaught);
System.out.println("END TestDefaultControllerMsgHandlerFactory at "
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
index 3cca10c..352cdd5 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -21,10 +21,10 @@ package org.apache.helix.manager.zk;
import java.util.Date;
-import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -47,17 +47,17 @@ public class TestHandleNewSession extends ZkIntegrationTestBase {
3, // replicas
"MasterSlave", true); // do rebalance
- ZKHelixManager manager =
- new ZKHelixManager(clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
- manager.connect();
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
+ participant.syncStart();
// Logger.getRootLogger().setLevel(Level.INFO);
- String lastSessionId = manager.getSessionId();
+ String lastSessionId = participant.getSessionId();
for (int i = 0; i < 3; i++) {
// System.err.println("curSessionId: " + lastSessionId);
- ZkTestHelper.expireSession(manager._zkClient);
+ ZkTestHelper.expireSession(participant.getZkClient());
- String sessionId = manager.getSessionId();
+ String sessionId = participant.getSessionId();
Assert.assertTrue(sessionId.compareTo(lastSessionId) > 0,
"Session id should be increased after expiry");
lastSessionId = sessionId;
@@ -71,7 +71,7 @@ public class TestHandleNewSession extends ZkIntegrationTestBase {
// Logger.getRootLogger().setLevel(Level.INFO);
System.out.println("Disconnecting ...");
- manager.disconnect();
+ participant.syncStop();
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
index a49d655..547e863 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -19,10 +19,8 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -30,24 +28,20 @@ import org.testng.annotations.Test;
public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
@Test
public void testInstanceBounce() throws Exception {
- String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult controllerResult = _startCMResultMap.get(controllerName);
- ZkHelixTestManager controller = controllerResult._manager;
- int handlerSize = controller.getHandlers().size();
+ int handlerSize = _controller.getHandlers().size();
for (int i = 0; i < 2; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
// kill 2 participants
- _startCMResultMap.get(instanceName)._manager.disconnect();
- _startCMResultMap.get(instanceName)._thread.interrupt();
+ _participants[i].syncStop();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// restart the participant
- StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
- _startCMResultMap.put(instanceName, result);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i].syncStart();
Thread.sleep(100);
}
Thread.sleep(4000);
@@ -61,11 +55,11 @@ public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertySe
// and we will remove current-state listener on expired session
// so the number of callback handlers is unchanged
for (int j = 0; j < 10; j++) {
- if (controller.getHandlers().size() == (handlerSize)) {
+ if (_controller.getHandlers().size() == (handlerSize)) {
break;
}
Thread.sleep(400);
}
- Assert.assertEquals(controller.getHandlers().size(), handlerSize);
+ Assert.assertEquals(_controller.getHandlers().size(), handlerSize);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
index c85f207..50a9a78 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
@@ -20,11 +20,10 @@ package org.apache.helix.manager.zk;
*/
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -35,34 +34,28 @@ public class TestZKPropertyTransferServer extends ZkStandAloneCMTestBaseWithProp
@Test
public void TestControllerChange() throws Exception {
String controllerName = CONTROLLER_PREFIX + "_0";
- _startCMResultMap.get(controllerName)._manager.disconnect();
+ _controller.syncStop();
Thread.sleep(1000);
// kill controller, participant should not know about the svc url
for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
HelixDataAccessor accessor =
- _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+ _participants[i].getHelixDataAccessor();
ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl == null
|| zkAccessor._zkPropertyTransferSvcUrl.equals(""));
}
- _startCMResultMap.get(controllerName)._thread.interrupt();
- _startCMResultMap.remove(controllerName);
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
Thread.sleep(1000);
// create controller again, the svc url is notified to the participants
for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
HelixDataAccessor accessor =
- _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+ _participants[i].getHelixDataAccessor();
ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl.equals(ZKPropertyTransferServer
.getInstance().getWebserviceUrl()));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index c099232..83dc986 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -27,6 +27,12 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.AccessOption;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -35,14 +41,10 @@ import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.manager.MockListener;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
@@ -65,6 +67,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
ZKHelixManager controller =
new ZKHelixManager(clusterName, null, InstanceType.CONTROLLER, ZK_ADDR);
+
try {
controller.connect();
Assert.fail("Should throw HelixException if initial cluster structure is not setup");
@@ -193,8 +196,9 @@ public class TestZkClusterManager extends ZkUnitTestBase {
// //////////////////////////////////
- ZkHelixTestManager manager2 =
- new ZkHelixTestManager(clusterName, "localhost_3", InstanceType.PARTICIPANT, ZK_ADDR);
+ MockParticipantManager manager2 =
+ new MockParticipantManager(ZK_ADDR, clusterName, "localhost_3");
+
manager2.setLiveInstanceInfoProvider(new provider(true));
manager2.connect();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
index 5b35148..c329e9d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -13,8 +13,8 @@ import org.apache.helix.TestHelper;
import org.apache.helix.TestHelper.Verifier;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.LiveInstance;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.testng.Assert;
@@ -134,10 +134,11 @@ public class TestZkFlapping extends ZkUnitTestBase {
"MasterSlave", false);
final String instanceName = "localhost_12918";
- MockParticipant participant = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
- final ZkClient client = participant.getManager().getZkClient();
+ final ZkClient client = participant.getZkClient();
final ZkStateCountListener listener = new ZkStateCountListener();
client.subscribeStateChanges(listener);
@@ -212,10 +213,11 @@ public class TestZkFlapping extends ZkUnitTestBase {
1, // replicas
"MasterSlave", false);
- ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
controller.syncStart();
- final ZkClient client = controller.getManager().getZkClient();
+ final ZkClient client = controller.getZkClient();
final ZkStateCountListener listener = new ZkStateCountListener();
client.subscribeStateChanges(listener);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
index 249fcea..a62e39d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
@@ -19,13 +19,12 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.util.UUID;
-import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -46,8 +45,8 @@ public class TestZkManagerFlappingDetection extends ZkIntegrationTestBase {
"MasterSlave", true); // do rebalance
String instanceName = "localhost_" + (12918 + 0);
- ZkHelixTestManager manager =
- new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
+ MockParticipantManager manager = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
manager.connect();
ZkClient zkClient = manager.getZkClient();
ZkTestHelper.expireSession(zkClient);
@@ -69,58 +68,59 @@ public class TestZkManagerFlappingDetection extends ZkIntegrationTestBase {
Assert.assertFalse(manager.isConnected());
}
- @Test(enabled = false)
- public void testDisconnectFlappingWindow() throws Exception {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String instanceName = "localhost_" + (12918 + 1);
- final String clusterName = className + "_" + methodName + UUID.randomUUID();
-
- testDisconnectFlappingWindow2(instanceName, InstanceType.PARTICIPANT);
- testDisconnectFlappingWindow2("admin", InstanceType.ADMINISTRATOR);
- }
-
- public void testDisconnectFlappingWindow2(String instanceName, InstanceType type)
- throws Exception {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- final String clusterName = className + "_" + methodName + UUID.randomUUID();
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave", true); // do rebalance
-
- // flapping time window to 5 sec
- System.setProperty("helixmanager.flappingTimeWindow", "15000");
- System.setProperty("helixmanager.maxDisconnectThreshold", "7");
- ZkHelixTestManager manager2 = new ZkHelixTestManager(clusterName, instanceName, type, ZK_ADDR);
- manager2.connect();
- ZkClient zkClient = manager2.getZkClient();
- for (int i = 0; i < 3; i++) {
- ZkTestHelper.expireSession(zkClient);
- Thread.sleep(500);
- Assert.assertTrue(manager2.isConnected());
- }
- Thread.sleep(15000);
- // Old entries should be cleaned up
- for (int i = 0; i < 7; i++) {
- ZkTestHelper.expireSession(zkClient);
- Thread.sleep(1000);
- Assert.assertTrue(manager2.isConnected());
- }
- ZkTestHelper.disconnectSession(zkClient);
- for (int i = 0; i < 20; i++) {
- Thread.sleep(500);
- if (!manager2.isConnected())
- break;
- }
- Assert.assertFalse(manager2.isConnected());
- }
+ // TODO test was disabled. check if it is still needed
+ // @Test(enabled = false)
+ // public void testDisconnectFlappingWindow() throws Exception {
+ // String className = TestHelper.getTestClassName();
+ // String methodName = TestHelper.getTestMethodName();
+ // String instanceName = "localhost_" + (12918 + 1);
+ // final String clusterName = className + "_" + methodName + UUID.randomUUID();
+ //
+ // testDisconnectFlappingWindow2(instanceName, InstanceType.PARTICIPANT);
+ // testDisconnectFlappingWindow2("admin", InstanceType.ADMINISTRATOR);
+ // }
+ //
+ // public void testDisconnectFlappingWindow2(String instanceName, InstanceType type)
+ // throws Exception {
+ // String className = TestHelper.getTestClassName();
+ // String methodName = TestHelper.getTestMethodName();
+ // final String clusterName = className + "_" + methodName + UUID.randomUUID();
+ //
+ // TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ // "localhost", // participant name prefix
+ // "TestDB", // resource name prefix
+ // 1, // resources
+ // 10, // partitions per resource
+ // 5, // number of nodes
+ // 3, // replicas
+ // "MasterSlave", true); // do rebalance
+ //
+ // // flapping time window to 5 sec
+ // System.setProperty("helixmanager.flappingTimeWindow", "15000");
+ // System.setProperty("helixmanager.maxDisconnectThreshold", "7");
+ // ZkHelixTestManager manager2 = new ZkHelixTestManager(clusterName, instanceName, type, ZK_ADDR);
+ // manager2.connect();
+ // ZkClient zkClient = manager2.getZkClient();
+ // for (int i = 0; i < 3; i++) {
+ // ZkTestHelper.expireSession(zkClient);
+ // Thread.sleep(500);
+ // Assert.assertTrue(manager2.isConnected());
+ // }
+ // Thread.sleep(15000);
+ // // Old entries should be cleaned up
+ // for (int i = 0; i < 7; i++) {
+ // ZkTestHelper.expireSession(zkClient);
+ // Thread.sleep(1000);
+ // Assert.assertTrue(manager2.isConnected());
+ // }
+ // ZkTestHelper.disconnectSession(zkClient);
+ // for (int i = 0; i < 20; i++) {
+ // Thread.sleep(500);
+ // if (!manager2.isConnected())
+ // break;
+ // }
+ // Assert.assertFalse(manager2.isConnected());
+ // }
// @Test
public void testDisconnectFlappingWindowController() throws Exception {
@@ -140,8 +140,7 @@ public class TestZkManagerFlappingDetection extends ZkIntegrationTestBase {
// flapping time window to 5 sec
System.setProperty("helixmanager.flappingTimeWindow", "5000");
System.setProperty("helixmanager.maxDisconnectThreshold", "3");
- ZkHelixTestManager manager2 =
- new ZkHelixTestManager(clusterName, null, InstanceType.CONTROLLER, ZK_ADDR);
+ ClusterControllerManager manager2 = new ClusterControllerManager(ZK_ADDR, clusterName, null);
manager2.connect();
Thread.sleep(100);
ZkClient zkClient = manager2.getZkClient();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
index 4f15f90..aeb32f9 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
@@ -19,68 +19,70 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestZkStateChangeListener extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
- @Test
+ // TODO this test has been covered by TestZkFlapping. check if still needed
+ // @Test
public void testDisconnectHistory() throws Exception {
- String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult controllerResult = _startCMResultMap.get(controllerName);
- ZKHelixManager controller = controllerResult._manager;
- ZkStateChangeListener listener1 = new ZkStateChangeListener(controller, 5000, 10);
+ // String controllerName = CONTROLLER_PREFIX + "_0";
+ // StartCMResult controllerResult = _startCMResultMap.get(controllerName);
+ // ZKHelixManager controller = (ZKHelixManager) controllerResult._manager;
+ // ZkStateChangeListener listener1 = new ZkStateChangeListener(controller, 5000, 10);
+ // ZkStateChangeListener listener1 = new ZkStateChangeListener(_controller, 5000, 10);
+
// 11 disconnects in 5 sec
for (int i = 0; i < 11; i++) {
Thread.sleep(200);
- listener1.handleStateChanged(KeeperState.Disconnected);
+ _controller.handleStateChanged(KeeperState.Disconnected);
if (i < 10) {
- Assert.assertTrue(controller.isConnected());
+ Assert.assertTrue(_controller.isConnected());
} else {
- Assert.assertFalse(controller.isConnected());
+ Assert.assertFalse(_controller.isConnected());
}
}
// If maxDisconnectThreshold is 0 it should be set to 1
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- ZKHelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ // String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+ // ZKHelixManager manager = (ZKHelixManager) _startCMResultMap.get(instanceName)._manager;
- ZkStateChangeListener listener2 = new ZkStateChangeListener(manager, 5000, 0);
+ // ZkStateChangeListener listener2 = new ZkStateChangeListener(_participants[0], 5000, 0);
for (int i = 0; i < 2; i++) {
Thread.sleep(200);
- listener2.handleStateChanged(KeeperState.Disconnected);
+ _participants[0].handleStateChanged(KeeperState.Disconnected);
if (i < 1) {
- Assert.assertTrue(manager.isConnected());
+ Assert.assertTrue(_participants[0].isConnected());
} else {
- Assert.assertFalse(manager.isConnected());
+ Assert.assertFalse(_participants[0].isConnected());
}
}
// If there are long time after disconnect, older history should be cleanup
- instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
- manager = _startCMResultMap.get(instanceName)._manager;
+ // instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
+ // manager = (ZKHelixManager) _startCMResultMap.get(instanceName)._manager;
- ZkStateChangeListener listener3 = new ZkStateChangeListener(manager, 5000, 5);
+ // ZkStateChangeListener listener3 = new ZkStateChangeListener(_participants[1], 5000, 5);
for (int i = 0; i < 3; i++) {
Thread.sleep(200);
- listener3.handleStateChanged(KeeperState.Disconnected);
- Assert.assertTrue(manager.isConnected());
+ _participants[1].handleStateChanged(KeeperState.Disconnected);
+ Assert.assertTrue(_participants[1].isConnected());
}
Thread.sleep(5000);
// Old entries should be cleaned up
for (int i = 0; i < 3; i++) {
Thread.sleep(200);
- listener3.handleStateChanged(KeeperState.Disconnected);
- Assert.assertTrue(manager.isConnected());
+ _participants[1].handleStateChanged(KeeperState.Disconnected);
+ Assert.assertTrue(_participants[1].isConnected());
}
for (int i = 0; i < 2; i++) {
Thread.sleep(200);
- listener3.handleStateChanged(KeeperState.Disconnected);
- Assert.assertTrue(manager.isConnected());
+ _participants[1].handleStateChanged(KeeperState.Disconnected);
+ Assert.assertTrue(_participants[1].isConnected());
}
- listener3.handleStateChanged(KeeperState.Disconnected);
- Assert.assertFalse(manager.isConnected());
+ _participants[1].handleStateChanged(KeeperState.Disconnected);
+ Assert.assertFalse(_participants[1].isConnected());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index f4566a0..c71a782 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -77,7 +77,7 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
@Test
public void TestThreadPoolSizeConfig() {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ HelixManager manager = _participants[0];
ConfigAccessor accessor = manager.getConfigAccessor();
ConfigScope scope =
@@ -91,9 +91,9 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
for (int i = 0; i < NODE_NR; i++) {
instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _startCMResultMap.get(instanceName)._manager.getMessagingService()
- .registerMessageHandlerFactory("TestMsg", new TestMessagingHandlerFactory());
- _startCMResultMap.get(instanceName)._manager.getMessagingService()
+ _participants[i].getMessagingService().registerMessageHandlerFactory("TestMsg",
+ new TestMessagingHandlerFactory());
+ _participants[i].getMessagingService()
.registerMessageHandlerFactory("TestMsg2", new TestMessagingHandlerFactory2());
}
@@ -102,7 +102,7 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
DefaultMessagingService svc =
- (DefaultMessagingService) (_startCMResultMap.get(instanceName)._manager
+ (DefaultMessagingService) (_participants[i]
.getMessagingService());
HelixTaskExecutor helixExecutor = svc.getExecutor();
ThreadPoolExecutor executor =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 9104866..a5777ab 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -35,8 +35,7 @@ import org.testng.annotations.Test;
public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
@Test
public void TestThreadPoolSizeConfig() {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ HelixManager manager = _participants[0];
ConfigAccessor accessor = manager.getConfigAccessor();
ConfigScope scope =
new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
@@ -52,11 +51,8 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
long taskcount = 0;
for (int i = 0; i < NODE_NR; i++) {
- instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-
DefaultMessagingService svc =
- (DefaultMessagingService) (_startCMResultMap.get(instanceName)._manager
- .getMessagingService());
+ (DefaultMessagingService) (_participants[i].getMessagingService());
HelixTaskExecutor helixExecutor = svc.getExecutor();
ThreadPoolExecutor executor =
(ThreadPoolExecutor) (helixExecutor._executorMap.get(MessageType.STATE_TRANSITION + "."
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
deleted file mode 100644
index a04a213..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.helix.mock.controller;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.InstanceType;
-import org.apache.helix.ZkHelixTestManager;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.participant.DistClusterControllerStateModelFactory;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class ClusterController extends Thread {
- private static Logger LOG = Logger.getLogger(ClusterController.class);
-
- private final CountDownLatch _startCountDown = new CountDownLatch(1);
- private final CountDownLatch _stopCountDown = new CountDownLatch(1);
- private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
- private final String _controllerMode;
- private final String _zkAddr;
-
- private ZkHelixTestManager _manager;
-
- public ClusterController(String clusterName, String controllerName, String zkAddr)
- throws Exception {
- this(clusterName, controllerName, zkAddr, HelixControllerMain.STANDALONE.toString());
- }
-
- public ClusterController(String clusterName, String controllerName, String zkAddr,
- String controllerMode) throws Exception {
- _controllerMode = controllerMode;
- _zkAddr = zkAddr;
-
- if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString())) {
- _manager =
- new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
- } else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString())) {
- _manager =
- new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT,
- zkAddr);
- } else {
- throw new IllegalArgumentException("Controller mode: " + controllerMode + " NOT recoginized");
- }
- }
-
- public ZkHelixTestManager getManager() {
- return _manager;
- }
-
- public void syncStop() {
- if (_manager == null) {
- LOG.warn("manager already stopped");
- return;
- }
-
- _stopCountDown.countDown();
- try {
- _waitStopFinishCountDown.await();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public void syncStart() {
- // TODO: prevent start multiple times
-
- super.start();
- try {
- _startCountDown.await();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- @Override
- public void run() {
- try {
- try {
- if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString())) {
- _manager.connect();
- } else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString())) {
- DistClusterControllerStateModelFactory stateModelFactory =
- new DistClusterControllerStateModelFactory(_zkAddr);
-
- StateMachineEngine stateMach = _manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
- _manager.connect();
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally {
- _startCountDown.countDown();
- _stopCountDown.await();
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally {
- synchronized (_manager) {
- _manager.disconnect();
- _manager = null;
- }
- _waitStopFinishCountDown.countDown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
index 59d9a0a..31811bb 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
@@ -33,6 +33,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixManager;
import org.apache.helix.healthcheck.HealthReportProvider;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.log4j.Logger;
public class MockHealthReportParticipant {
@@ -209,9 +210,9 @@ public class MockHealthReportParticipant {
// NOT working for kill -9, working for kill -2/-15
static class MockHealthReportParticipantShutdownHook extends Thread {
- final MockParticipant _participant;
+ final MockParticipantManager _participant;
- MockHealthReportParticipantShutdownHook(MockParticipant participant) {
+ MockHealthReportParticipantShutdownHook(MockParticipantManager participant) {
_participant = participant;
}
@@ -231,12 +232,11 @@ public class MockHealthReportParticipant {
String instanceName = hostStr + "_" + portStr;
- MockParticipant participant =
- new MockParticipant(clusterName, instanceName, zkConnectStr, null, // new
- // StoreAccessDiffNodeTransition(),
- // // new
- // StoreAccessOneNodeTransition(),
- new MockHealthReportJob());
+ MockParticipantManager participant =
+ new MockParticipantManager(zkConnectStr, clusterName, instanceName);
+ // participant.setTransition(new StoreAccessDiffNodeTransition());
+ // participant.setTransition(new StoreAccessOneNodeTransition()));
+ // new MockHealthReportJob());
Runtime.getRuntime().addShutdownHook(new MockHealthReportParticipantShutdownHook(participant));
// Espresso_driver.py will consume this
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
deleted file mode 100644
index 4030b99..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package org.apache.helix.mock.participant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.InstanceType;
-import org.apache.helix.ZkHelixTestManager;
-import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
-import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.log4j.Logger;
-
-public class MockParticipant extends Thread {
- private static Logger LOG = Logger.getLogger(MockParticipant.class);
- private final String _clusterName;
- private final String _instanceName;
- // private final String _zkAddr;
-
- private final CountDownLatch _startCountDown = new CountDownLatch(1);
- private final CountDownLatch _stopCountDown = new CountDownLatch(1);
- private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
- private final ZkHelixTestManager _manager;
- private final StateModelFactory _msModelFactory;
- private final MockJobIntf _job;
-
- public MockParticipant(String clusterName, String instanceName, String zkAddr) throws Exception {
- this(clusterName, instanceName, zkAddr, null, null);
- }
-
- public MockParticipant(String clusterName, String instanceName, String zkAddr,
- MockTransition transition) throws Exception {
- this(clusterName, instanceName, zkAddr, transition, null);
- }
-
- public MockParticipant(String clusterName, String instanceName, String zkAddr,
- MockTransition transition, MockJobIntf job) throws Exception {
- _clusterName = clusterName;
- _instanceName = instanceName;
- _msModelFactory = new MockMSModelFactory(transition);
-
- _manager =
- new ZkHelixTestManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, zkAddr);
- _job = job;
- }
-
- public MockParticipant(StateModelFactory factory, String clusterName, String instanceName,
- String zkAddr, MockJobIntf job) throws Exception {
- _clusterName = clusterName;
- _instanceName = instanceName;
- _msModelFactory = factory;
-
- _manager =
- new ZkHelixTestManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, zkAddr);
- _job = job;
- }
-
- public StateModelFactory getStateModelFactory() {
- return _msModelFactory;
- }
-
- public MockParticipant(ZkHelixTestManager manager, MockTransition transition) {
- _clusterName = manager.getClusterName();
- _instanceName = manager.getInstanceName();
- _manager = manager;
-
- _msModelFactory = new MockMSModelFactory(transition);
- _job = null;
- }
-
- public void setTransition(MockTransition transition) {
- if (_msModelFactory instanceof MockMSModelFactory) {
- ((MockMSModelFactory) _msModelFactory).setTrasition(transition);
- }
- }
-
- public ZkHelixTestManager getManager() {
- return _manager;
- }
-
- public String getInstanceName() {
- return _instanceName;
- }
-
- public String getClusterName() {
- return _clusterName;
- }
-
- public void syncStop() {
- _stopCountDown.countDown();
- try {
- _waitStopFinishCountDown.await();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // synchronized (_manager)
- // {
- // _manager.disconnect();
- // }
- }
-
- public void syncStart() {
- super.start();
- try {
- _startCountDown.await();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- @Override
- public void run() {
- try {
- StateMachineEngine stateMach = _manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
-
- DummyLeaderStandbyStateModelFactory lsModelFactory =
- new DummyLeaderStandbyStateModelFactory(10);
- DummyOnlineOfflineStateModelFactory ofModelFactory =
- new DummyOnlineOfflineStateModelFactory(10);
- stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
- stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
-
- MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
- stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
- // MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
- // stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
-
- if (_job != null) {
- _job.doPreConnectJob(_manager);
- }
-
- _manager.connect();
- _startCountDown.countDown();
-
- if (_job != null) {
- _job.doPostConnectJob(_manager);
- }
-
- _stopCountDown.await();
- } catch (InterruptedException e) {
- String msg =
- "participant: " + _instanceName + ", " + Thread.currentThread().getName()
- + " is interrupted";
- LOG.info(msg);
- System.err.println(msg);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally {
- _startCountDown.countDown();
-
- synchronized (_manager) {
- _manager.disconnect();
- }
- _waitStopFinishCountDown.countDown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 6de77b2..d97b22a 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -261,4 +261,10 @@ public class MockZKHelixManager implements HelixManager {
return null;
}
+ @Override
+ public void addControllerMessageListener(MessageListener listener) {
+ // TODO Auto-generated method stub
+
+ }
+
}