You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/10/25 03:21:18 UTC
[05/10] [HELIX-279] Apply gc handling fixes to main ZKHelixManager
class
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
index 7f004d3..b00e26c 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
@@ -40,10 +40,13 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+ private static Logger LOG = Logger.getLogger(TestAlertActionTriggering.class);
+
String _statName = "TestStat@DB=db1";
String _stat = "TestStat";
String metricName1 = "TestMetric1";
@@ -51,8 +54,7 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
void setHealthData(int[] val1, int[] val2) {
for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ HelixManager manager = _participants[i];
ZNRecord record = new ZNRecord(_stat);
Map<String, String> valMap = new HashMap<String, String>();
valMap.put(metricName1, val1[i] + "");
@@ -68,15 +70,13 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("sleep interrupted", e);
}
}
void setHealthData2(int[] val1) {
for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ HelixManager manager = _participants[i];
ZNRecord record = new ZNRecord(_stat);
Map<String, String> valMap = new HashMap<String, String>();
valMap.put(metricName2, val1[i] + "");
@@ -91,8 +91,7 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("sleep interrupted", e);
}
}
@@ -127,11 +126,9 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
};
setHealthData(metrics1, metrics2);
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller;
- HealthStatsAggregator task =
- new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+ HealthStatsAggregator task = new HealthStatsAggregator(manager);
task.aggregate();
Thread.sleep(4000);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
index 125f61f..c18b643 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
@@ -31,12 +31,15 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
import org.apache.helix.model.AlertHistory;
import org.apache.helix.model.HealthStat;
import org.apache.helix.model.HelixConfigScope;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -46,6 +49,8 @@ import org.testng.annotations.Test;
*/
public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+ private final static Logger LOG = Logger.getLogger(TestAlertFireHistory.class);
+
String _statName = "TestStat@DB=db1";
String _stat = "TestStat";
String metricName1 = "TestMetric1";
@@ -57,8 +62,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
void setHealthData(int[] val1, int[] val2) {
for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ HelixManager manager = _participants[i];
ZNRecord record = new ZNRecord(_stat);
Map<String, String> valMap = new HashMap<String, String>();
valMap.put(metricName1, val1[i] + "");
@@ -74,13 +78,12 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted sleep", e);
}
}
@Test
- public void TestAlertDisable() throws InterruptedException {
+ public void testAlertDisable() throws InterruptedException {
int[] metrics1 = {
10, 15, 22, 24, 16
@@ -90,29 +93,27 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
};
setHealthData(metrics1, metrics2);
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller;
manager.startTimerTasks();
_setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
_setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
- // ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
Map<String, String> properties = new HashMap<String, String>();
properties.put("healthChange.enabled", "false");
_setupTool.getClusterManagementTool().setConfig(scope, properties);
- HealthStatsAggregator task =
- new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+ HealthStatsAggregator task = new HealthStatsAggregator(_controller);
+
task.aggregate();
Thread.sleep(100);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
AlertHistory history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
- //
+
Assert.assertEquals(history, null);
properties.put("healthChange.enabled", "true");
@@ -128,7 +129,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test
- public void TestAlertHistory() throws InterruptedException {
+ public void testAlertHistory() throws InterruptedException {
int[] metrics1 = {
10, 15, 22, 24, 16
};
@@ -137,9 +138,10 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
};
setHealthData(metrics1, metrics2);
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
- manager.stopTimerTasks();
+ HelixManager manager = _controller;
+ for (HelixTimerTask task : _controller.getControllerTimerTasks()) {
+ task.stop();
+ }
_setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
_setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
@@ -154,8 +156,8 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
historySize = property.getRecord().getMapFields().size();
}
- HealthStatsAggregator task =
- new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+ HealthStatsAggregator task = new HealthStatsAggregator(_controller);
+
task.aggregate();
Thread.sleep(100);
@@ -421,3 +423,4 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
}
}
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
index c5f373c..b8bd634 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
@@ -29,10 +29,10 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.HealthStat;
import org.apache.helix.model.Message;
@@ -73,7 +73,7 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
String clusterName = className + "_" + methodName;
final int n = 5;
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -95,15 +95,16 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
"EXP(decay(1.0)(*.defaultPerfCounters@defaultPerfCounters.availableCPUs))CMP(GREATER)CON(2)");
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new DummyAlertsTransition());
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new DummyAlertsTransition());
participants[i].syncStart();
}
@@ -137,7 +138,6 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
}
// clean up
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
index 69d1062..69b52e7 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
@@ -29,18 +29,14 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -51,7 +47,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestExpandAlert extends ZkIntegrationTestBase {
- ZkClient _zkClient;
protected ClusterSetup _setupTool = null;
protected final String _alertStr =
"EXP(decay(1.0)(localhost_*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(16)";
@@ -61,15 +56,12 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class ExpandAlertTransition extends MockTransition {
@@ -120,7 +112,7 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
@Test()
public void testExpandAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START TestExpandAlert at " + new Date(System.currentTimeMillis()));
@@ -136,18 +128,19 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
_setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) // !!!change back to 5
{
String instanceName = "localhost_" + (12918 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new ExpandAlertTransition());
- participants[i].start();
- // new Thread(participants[i]).start();
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ExpandAlertTransition());
+ participants[i].syncStart();
}
boolean result =
@@ -158,13 +151,14 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
Thread.sleep(1000);
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ // new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger
Thread.sleep(3000);
// other verifications go here
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
// for (int i = 0; i < 1; i++) //change 1 back to 5
@@ -181,6 +175,12 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
Assert.assertFalse(fired);
// }
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+
+ }
System.out.println("END TestExpandAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
index 1db5ddd..ccc0a79 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
@@ -29,18 +29,14 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -51,7 +47,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestSimpleAlert extends ZkIntegrationTestBase {
- ZkClient _zkClient;
protected ClusterSetup _setupTool = null;
protected final String _alertStr =
"EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -60,15 +55,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
-
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class SimpleAlertTransition extends MockTransition {
@@ -125,7 +116,7 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
@Test()
public void testSimpleAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START TestSimpleAlert at " + new Date(System.currentTimeMillis()));
@@ -140,10 +131,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
// enableHealthCheck(clusterName);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
- cmResult._manager.startTimerTasks();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+ controller.startTimerTasks();
+
_setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
// start participants
for (int i = 0; i < 5; i++) // !!!change back to 5
@@ -151,9 +143,9 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
String instanceName = "localhost_" + (12918 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new SimpleAlertTransition(15));
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new SimpleAlertTransition(15));
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
boolean result =
@@ -163,13 +155,14 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ // new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger
Thread.sleep(3000);
// other verifications go here
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
// for (int i = 0; i < 1; i++) //change 1 back to 5
// {
@@ -197,6 +190,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
.equals("ON"));
// }
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END TestSimpleAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
index c5b55da..417a53a 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
@@ -28,29 +28,27 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
- ZkClient _zkClient;
+ private static Logger LOG = Logger.getLogger(TestSimpleWildcardAlert.class);
+
protected ClusterSetup _setupTool = null;
protected final String _alertStr =
"EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -59,15 +57,12 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class SimpleAlertTransition extends MockTransition {
@@ -124,7 +119,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
@Test()
public void testSimpleWildcardAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
@@ -139,10 +134,10 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
// enableHealthCheck(clusterName);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
- cmResult._manager.stopTimerTasks();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+ controller.stopTimerTasks();
String alertwildcard =
"EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -154,9 +149,9 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
String instanceName = "localhost_" + (12944 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new SimpleAlertTransition(i * 5));
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new SimpleAlertTransition(i * 5));
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
boolean result =
@@ -167,13 +162,13 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
Thread.sleep(1000);
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger
Thread.sleep(1000);
// other verifications go here
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
Map<String, Map<String, String>> recMap = record.getMapFields();
@@ -209,7 +204,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
alertwildcard =
"EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(15)";
_setupTool.getClusterManagementTool().addAlert(clusterName, alertwildcard);
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
Thread.sleep(1000);
record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
@@ -241,6 +236,11 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
Assert.assertTrue(delta.get(alertString).equals("ON"));
}
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
index 2304b41..cdb7d1d 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
@@ -29,18 +29,14 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -51,7 +47,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestStalenessAlert extends ZkIntegrationTestBase {
- ZkClient _zkClient;
protected ClusterSetup _setupTool = null;
protected final String _alertStr = "EXP(decay(1)(localhost_*.reportingage))CMP(GREATER)CON(600)";
protected final String _alertStatusStr = _alertStr + " : (localhost_12918.reportingage)";
@@ -59,15 +54,12 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class StalenessAlertTransition extends MockTransition {
@@ -118,7 +110,7 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
@Test()
public void testStalenessAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START TestStalenessAlert at " + new Date(System.currentTimeMillis()));
@@ -134,18 +126,19 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
_setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) // !!!change back to 5
{
String instanceName = "localhost_" + (12918 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new StalenessAlertTransition());
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new StalenessAlertTransition());
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
boolean result =
@@ -155,13 +148,13 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger
Thread.sleep(3000);
// other verifications go here
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
// for (int i = 0; i < 1; i++) //change 1 back to 5
// {
@@ -177,6 +170,11 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
// Assert.assertFalse(fired);
// }
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END TestStalenessAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
index a0456a7..cc819de 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
@@ -44,18 +44,14 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
@@ -126,7 +122,6 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
}
private static final Logger _logger = Logger.getLogger(TestWildcardAlert.class);
- ZkClient _zkClient;
protected ClusterSetup _setupTool = null;
protected final String _alertStr =
"EXP(decay(1)(localhost_*.RestQueryStats@DBName=TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)";
@@ -135,15 +130,12 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class WildcardAlertTransition extends MockTransition {
@@ -209,7 +201,7 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
@Test()
public void testWildcardAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START TestWildcardAlert at " + new Date(System.currentTimeMillis()));
@@ -228,18 +220,18 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
_setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
// _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr2);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) // !!!change back to 5
{
String instanceName = "localhost_" + (12918 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new WildcardAlertTransition());
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new WildcardAlertTransition());
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
TestClusterMBeanObserver jmxMBeanObserver =
@@ -252,13 +244,13 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
Thread.sleep(3000);
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger and for bean to trigger
Thread.sleep(3000);
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
// for (int i = 0; i < 1; i++) //change 1 back to 5
@@ -293,6 +285,12 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
"EXP(decay(1)(localhost_%.RestQueryStats@DBName#TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)--(%)");
// }
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END TestWildcardAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
index 8547666..32fdcff 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -20,16 +20,9 @@ package org.apache.helix.integration;
*/
import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.helix.PropertyType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.model.PauseSignal;
+
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
@@ -46,13 +39,15 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
protected static final int START_PORT = 12918;
protected static final String STATE_MODEL = "MasterSlave";
protected ClusterSetup _setupTool = null;
- protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
protected final String CLASS_NAME = getShortClassName();
protected final String CONTROLLER_CLUSTER = CONTROLLER_CLUSTER_PREFIX + "_" + CLASS_NAME;
protected static final String TEST_DB = "TestDB";
+ MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
+ ClusterDistributedController[] _distControllers = new ClusterDistributedController[NODE_NR];
+
@BeforeClass
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
@@ -90,29 +85,18 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
"MasterSlave", 3, true);
// start dummy participants for the first cluster
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null) {
- LOG.error("fail to start participant:" + instanceName
- + "(participant with the same name already running");
- } else {
- StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, firstCluster, instanceName);
- _startCMResultMap.put(instanceName, result);
- }
+ _participants[i] = new MockParticipantManager(ZK_ADDR, firstCluster, instanceName);
+ _participants[i].syncStart();
}
// start distributed cluster controllers
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < NODE_NR; i++) {
String controllerName = CONTROLLER_PREFIX + "_" + i;
- if (_startCMResultMap.get(controllerName) != null) {
- LOG.error("fail to start controller:" + controllerName
- + "(controller with the same name already running");
- } else {
- StartCMResult result =
- TestHelper.startController(CONTROLLER_CLUSTER, controllerName, ZK_ADDR,
- HelixControllerMain.DISTRIBUTED);
- _startCMResultMap.put(controllerName, result);
- }
+ _distControllers[i] =
+ new ClusterDistributedController(ZK_ADDR, CONTROLLER_CLUSTER, controllerName);
+ _distControllers[i].syncStart();
}
verifyClusters();
@@ -134,36 +118,22 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
* 3) disconnect leader/disconnect participant
*/
String leader = getCurrentLeader(_gZkClient, CONTROLLER_CLUSTER);
- // pauseController(_startCMResultMap.get(leader)._manager.getDataAccessor());
-
- StartCMResult result;
-
- Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
-
- while (it.hasNext()) {
- String instanceName = it.next().getKey();
- if (!instanceName.equals(leader) && instanceName.startsWith(CONTROLLER_PREFIX)) {
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
+ int leaderIdx = -1;
+ for (int i = 0; i < NODE_NR; i++) {
+ if (!_distControllers[i].getInstanceName().equals(leader)) {
+ _distControllers[i].syncStop();
+ verifyClusters();
+ } else {
+ leaderIdx = i;
}
- verifyClusters();
}
+ Assert.assertNotSame(leaderIdx, -1);
- result = _startCMResultMap.remove(leader);
- result._manager.disconnect();
- result._thread.interrupt();
-
- it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext()) {
- String instanceName = it.next().getKey();
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
- }
+ _distControllers[leaderIdx].syncStop();
+ for (int i = 0; i < NODE_NR; i++) {
+ _participants[i].syncStop();
+ }
System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
index b135d92..79d8b89 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
@@ -21,18 +21,15 @@ package org.apache.helix.integration;
import java.util.Date;
import java.util.List;
-import java.util.Set;
-import org.apache.helix.InstanceType;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
@@ -52,16 +49,17 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 20, nodeNr - 1,
3, "MasterSlave", true);
- MockParticipant[] participants = new MockParticipant[nodeNr];
+ MockParticipantManager[] participants = new MockParticipantManager[nodeNr];
for (int i = 0; i < nodeNr - 1; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
- new Thread(participants[i]).start();
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
}
- ZkHelixTestManager controller =
- new ZkHelixTestManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
- controller.connect();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
boolean result;
result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -75,7 +73,7 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
_gSetupTool.addInstanceToCluster(clusterName, "localhost_12922");
_gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 3);
- participants[nodeNr - 1] = new MockParticipant(clusterName, "localhost_12922", ZK_ADDR);
+ participants[nodeNr - 1] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12922");
new Thread(participants[nodeNr - 1]).start();
result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -86,11 +84,10 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
- // controller.disconnect();
- // for (int i = 0; i < nodeNr; i++)
- // {
- // participants[i].syncStop();
- // }
+ controller.syncStop();
+ for (int i = 0; i < nodeNr; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@@ -101,11 +98,13 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
// setup grand cluster
- TestHelper.setupCluster("GRAND_" + clusterName, ZK_ADDR, 0, "controller", null, 0, 0, 1, 0,
+ final String grandClusterName = "GRAND_" + clusterName;
+ TestHelper.setupCluster(grandClusterName, ZK_ADDR, 0, "controller", null, 0, 0, 1, 0,
null, true);
- TestHelper.startController("GRAND_" + clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.DISTRIBUTED);
+ ClusterDistributedController distController =
+ new ClusterDistributedController(ZK_ADDR, grandClusterName, "controller_0");
+ distController.syncStart();
// setup cluster
_gSetupTool.addCluster(clusterName, true);
@@ -127,12 +126,11 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
_gSetupTool.addResourceToCluster(clusterName, "TestDB0", 1, "LeaderStandby");
_gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 1);
- MockParticipant[] participants = new MockParticipant[nodeNr];
+ MockParticipantManager[] participants = new MockParticipantManager[nodeNr];
for (int i = 0; i < nodeNr - 1; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
result =
@@ -150,10 +148,8 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
_gSetupTool.addInstanceToCluster(clusterName, "localhost_12919");
_gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 2);
- participants[nodeNr - 1] = new MockParticipant(clusterName, "localhost_12919", ZK_ADDR);
+ participants[nodeNr - 1] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12919");
participants[nodeNr - 1].syncStart();
- // new Thread(participants[nodeNr - 1]).start();
-
result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, clusterName));
@@ -165,10 +161,10 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
Assert.assertEquals(numberOfListeners, 2); // 1 of participant, and 1 of controller
// clean up
- // for (int i = 0; i < nodeNr; i++)
- // {
- // participants[i].syncStop();
- // }
+ distController.syncStop();
+ for (int i = 0; i < nodeNr; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index 33938ad..cd888d7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -19,20 +19,17 @@ package org.apache.helix.integration;
* under the License.
*/
-import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -52,7 +49,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -63,14 +60,15 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
3, // replicas
"MasterSlave", true); // do rebalance
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -116,7 +114,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
// register "TestDB1_Factory" state model factory
// Logger.getRootLogger().setLevel(Level.INFO);
for (int i = 0; i < n; i++) {
- participants[i].getManager().getStateMachineEngine()
+ participants[i].getStateMachineEngine()
.registerStateModelFactory("MasterSlave", new MockMSModelFactory(), "TestDB1_Factory");
}
@@ -127,7 +125,6 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
// clean up
// wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
index 1ffb86f..bc1c1b0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
@@ -27,8 +27,8 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.DefaultIdealStateCalculator;
@@ -74,15 +74,16 @@ public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase {
_gZkClient.writeData(idealPath, curIdealState);
// start controller
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -92,6 +93,7 @@ public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 1943364..b4f9223 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -29,11 +29,10 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
@@ -51,18 +50,17 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
String db2 = TEST_DB + "2";
String _tag = "SSDSSD";
+ @Override
@BeforeClass
public void beforeClass() throws Exception {
// Logger.getRootLogger().setLevel(Level.INFO);
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace)) {
- _zkClient.deleteRecursive(namespace);
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
}
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
// setup storage cluster
_setupTool.addCluster(CLUSTER_NAME, true);
@@ -89,24 +87,21 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
// start dummy participants
for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null) {
- LOG.error("fail to start particpant:" + instanceName
- + "(participant with same name already exists)");
- } else {
- StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
- _startCMResultMap.put(instanceName, result);
- }
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ participant.syncStart();
+ _participants[i] = participant;
+
}
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ _controller =
+ new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
@@ -122,7 +117,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 1);
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, "MyDB"));
Assert.assertTrue(result);
@@ -140,7 +135,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB2", 3);
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, "MyDB2"));
Assert.assertTrue(result);
@@ -154,16 +149,11 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
@Test()
public void testAutoRebalance() throws Exception {
-
// kill 1 node
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- _startCMResultMap.get(instanceName)._manager.disconnect();
- Thread.currentThread().sleep(1000);
- _startCMResultMap.get(instanceName)._thread.interrupt();
+ _participants[0].syncStop();
- // verifyBalanceExternalView();
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
@@ -172,22 +162,22 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
_setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- StartCMResult resultx =
- TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
- _startCMResultMap.put(storageNodeName, resultx);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
+ participant.syncStart();
}
Thread.sleep(1000);
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, db2));
Assert.assertTrue(result);
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
Set<String> instancesSet = new HashSet<String>();
@@ -242,12 +232,10 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
}
public static class ExternalViewBalancedVerifier implements ZkVerifier {
- ZkClient _client;
String _clusterName;
String _resourceName;
public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
- _client = client;
_clusterName = clusterName;
_resourceName = resourceName;
}
@@ -255,7 +243,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
@Override
public boolean verify() {
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
int numberOfPartitions =
accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
@@ -286,7 +274,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
@Override
public ZkClient getZkClient() {
- return _client;
+ return _gZkClient;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 32cafcf..74a5699 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -25,14 +25,12 @@ import java.util.Map;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
@@ -49,16 +47,15 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class
.getName());
+ @Override
@BeforeClass
public void beforeClass() throws Exception {
// Logger.getRootLogger().setLevel(Level.INFO);
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace)) {
- _zkClient.deleteRecursive(namespace);
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
}
_setupTool = new ClusterSetup(ZK_ADDR);
@@ -75,40 +72,33 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller; // _startCMResultMap.get(controllerName)._manager;
HelixDataAccessor accessor = manager.getHelixDataAccessor();
// start dummy participants
for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null) {
- LOG.error("fail to start particpant:" + instanceName
- + "(participant with same name already exists)");
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i].syncStart();
+ Thread.sleep(2000);
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
+ CLUSTER_NAME, TEST_DB));
+ Assert.assertTrue(result);
+ ExternalView ev =
+ manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+ System.out.println(ev.getPartitionSet().size());
+ if (i < 3) {
+ Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1));
} else {
- startResult = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
- _startCMResultMap.put(instanceName, startResult);
- Thread.sleep(2000);
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
- CLUSTER_NAME, TEST_DB));
- Assert.assertTrue(result);
- ExternalView ev =
- manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
- System.out.println(ev.getPartitionSet().size());
- if (i < 3) {
- Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1));
- } else {
- Assert.assertEquals(ev.getPartitionSet().size(), 100);
- }
+ Assert.assertEquals(ev.getPartitionSet().size(), 100);
}
}
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
@@ -116,17 +106,13 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
@Test()
public void testAutoRebalanceWithMaxPartitionPerNode() throws Exception {
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller;
// kill 1 node
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- _startCMResultMap.get(instanceName)._manager.disconnect();
- Thread.currentThread().sleep(1000);
- _startCMResultMap.get(instanceName)._thread.interrupt();
+ _participants[0].syncStop();
// verifyBalanceExternalView();
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
HelixDataAccessor accessor = manager.getHelixDataAccessor();
@@ -134,14 +120,11 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
Assert.assertEquals(ev.getPartitionSet().size(), 100);
- instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
- _startCMResultMap.get(instanceName)._manager.disconnect();
- Thread.currentThread().sleep(1000);
- _startCMResultMap.get(instanceName)._thread.interrupt();
+ _participants[1].syncStop();
// verifyBalanceExternalView();
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
@@ -152,13 +135,15 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
_setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- StartCMResult resultx =
- TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
- _startCMResultMap.put(storageNodeName, resultx);
+ String newInstanceName = storageNodeName.replace(':', '_');
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newInstanceName);
+ participant.syncStart();
}
+
Thread.sleep(1000);
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
}
@@ -209,12 +194,10 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
}
public static class ExternalViewBalancedVerifier implements ZkVerifier {
- ZkClient _client;
String _clusterName;
String _resourceName;
public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
- _client = client;
_clusterName = clusterName;
_resourceName = resourceName;
}
@@ -222,7 +205,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
@Override
public boolean verify() {
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
int numberOfPartitions =
accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
@@ -240,7 +223,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
@Override
public ZkClient getZkClient() {
- return _client;
+ return _gZkClient;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
index bf2de1e..03fc85b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
@@ -30,13 +30,10 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.HelixProperty.HelixPropertyAttribute;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZkTestHelper;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
@@ -91,15 +88,16 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
TestZkChildListener listener = new TestZkChildListener();
_gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -112,7 +110,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
// clean up
// wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < n; i++) {
participants[i].syncStop();
@@ -141,15 +138,16 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
2, // replicas
"MasterSlave", true); // do rebalance
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -159,7 +157,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// stop all participants
- Thread.sleep(1000);
for (int i = 0; i < n; i++) {
participants[i].syncStop();
}
@@ -180,7 +177,7 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -193,7 +190,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
// clean up
// wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < n; i++) {
participants[i].syncStop();
@@ -209,10 +205,9 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
String clusterName = className + "_" + methodName;
final int n = 5;
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- // ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, // resource#
6, // partition#
@@ -228,19 +223,20 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
idealState.setBatchMessageMode(true);
accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
if (i == 1) {
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR,
- new ErrTransition(errPartitions));
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ErrTransition(errPartitions));
} else {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
}
participants[i].syncStart();
}
@@ -297,7 +293,8 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
TestZkChildListener listener = new TestZkChildListener();
_gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// pause controller
@@ -307,11 +304,11 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
});
// start participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -335,7 +332,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
// clean up
// wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < n; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
index 2ae8bf3..9bfdfb5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
@@ -26,12 +26,12 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.messaging.handling.BatchMessageWrapper;
-import org.apache.helix.mock.controller.ClusterController;
import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -90,17 +90,19 @@ public class TestBatchMessageWrapper extends ZkUnitTestBase {
idealState.setBatchMessageMode(true);
accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestMockMSModelFactory[] ftys = new TestMockMSModelFactory[n];
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
ftys[i] = new TestMockMSModelFactory();
- participants[i] = new MockParticipant(ftys[i], clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", ftys[i]);
participants[i].syncStart();
// wait for each participant to complete state transitions, so we have deterministic results
@@ -133,6 +135,12 @@ public class TestBatchMessageWrapper extends ZkUnitTestBase {
Assert.assertEquals(wrapper._startCount, 2,
"Expect 2 batch.end: O->S and S->M for 2nd participant");
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
index 8e75537..207a318 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -24,10 +24,10 @@ import java.util.Date;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -45,7 +45,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
// ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
@@ -66,13 +66,15 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
idealState.setBucketSize(1);
accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -87,6 +89,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}