You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/11 22:10:00 UTC
[06/10] [HELIX-279] Apply gc handling fixes to ZKHelixManager
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java
index 0269764..aea9b70 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java
@@ -19,7 +19,6 @@ package org.apache.helix.controller.strategy;
* under the License.
*/
-import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -29,8 +28,6 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.ZNRecord;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.testng.Assert;
import org.testng.AssertJUnit;
@@ -68,44 +65,33 @@ public class TestShufflingTwoStateStrategy {
// ByteArrayOutputStream baos = new ByteArrayOutputStream();
StringWriter sw = new StringWriter();
- try {
- mapper.writeValue(sw, result);
- // System.out.println(sw.toString());
-
- ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
- System.out.println(result.toString());
- System.out.println(zn.toString());
- AssertJUnit.assertTrue(zn.toString().equalsIgnoreCase(result.toString()));
- System.out.println();
-
- sw = new StringWriter();
- mapper.writeValue(sw, result2);
-
- ZNRecord zn2 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
- System.out.println(result2.toString());
- System.out.println(zn2.toString());
- AssertJUnit.assertTrue(zn2.toString().equalsIgnoreCase(result2.toString()));
-
- sw = new StringWriter();
- mapper.writeValue(sw, result3);
- System.out.println();
-
- ZNRecord zn3 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
- System.out.println(result3.toString());
- System.out.println(zn3.toString());
- AssertJUnit.assertTrue(zn3.toString().equalsIgnoreCase(result3.toString()));
- System.out.println();
-
- } catch (JsonGenerationException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (JsonMappingException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ mapper.writeValue(sw, result);
+ // System.out.println(sw.toString());
+
+ ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+ System.out.println(result.toString());
+ System.out.println(zn.toString());
+ AssertJUnit.assertTrue(zn.toString().equalsIgnoreCase(result.toString()));
+ System.out.println();
+
+ sw = new StringWriter();
+ mapper.writeValue(sw, result2);
+
+ ZNRecord zn2 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+ System.out.println(result2.toString());
+ System.out.println(zn2.toString());
+ AssertJUnit.assertTrue(zn2.toString().equalsIgnoreCase(result2.toString()));
+
+ sw = new StringWriter();
+ mapper.writeValue(sw, result3);
+ System.out.println();
+
+ ZNRecord zn3 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+ System.out.println(result3.toString());
+ System.out.println(zn3.toString());
+ AssertJUnit.assertTrue(zn3.toString().equalsIgnoreCase(result3.toString()));
+ System.out.println();
+
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
index e0e1544..cadbdc7 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
@@ -28,17 +28,15 @@ import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -49,7 +47,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestAddDropAlert extends ZkIntegrationTestBase {
- ZkClient _zkClient;
protected ClusterSetup _setupTool = null;
protected final String _alertStr =
"EXP(accumulate()(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -58,15 +55,11 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
-
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class AddDropAlertTransition extends MockTransition {
@@ -114,7 +107,7 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
@Test()
public void testAddDropAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START TestAddDropAlert at " + new Date(System.currentTimeMillis()));
@@ -130,18 +123,18 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
_setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) // !!!change back to 5
{
String instanceName = "localhost_" + (12918 + i);
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new AddDropAlertTransition());
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new AddDropAlertTransition());
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
boolean result =
@@ -152,10 +145,10 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
// drop alert soon after adding, but leave enough time for alert to fire once
// Thread.sleep(3000);
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
String instance = "localhost_12918";
ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
Map<String, Map<String, String>> recMap = record.getMapFields();
@@ -163,7 +156,7 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
Assert.assertTrue(keySet.size() > 0);
_setupTool.getClusterManagementTool().dropAlert(clusterName, _alertStr);
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// other verifications go here
// for (int i = 0; i < 1; i++) //change 1 back to 5
// {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
index f1d2ba6..37f8205 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
@@ -38,10 +38,13 @@ import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+ private static Logger LOG = Logger.getLogger(TestAlertActionTriggering.class);
+
String _statName = "TestStat@DB=db1";
String _stat = "TestStat";
String metricName1 = "TestMetric1";
@@ -49,8 +52,7 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
void setHealthData(int[] val1, int[] val2) {
for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ HelixManager manager = _participants[i];
ZNRecord record = new ZNRecord(_stat);
Map<String, String> valMap = new HashMap<String, String>();
valMap.put(metricName1, val1[i] + "");
@@ -66,15 +68,13 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("sleep interrupted", e);
}
}
void setHealthData2(int[] val1) {
for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ HelixManager manager = _participants[i];
ZNRecord record = new ZNRecord(_stat);
Map<String, String> valMap = new HashMap<String, String>();
valMap.put(metricName2, val1[i] + "");
@@ -89,8 +89,7 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("sleep interrupted", e);
}
}
@@ -125,11 +124,9 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
};
setHealthData(metrics1, metrics2);
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller;
- HealthStatsAggregator task =
- new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+ HealthStatsAggregator task = new HealthStatsAggregator(manager);
task.aggregate();
Thread.sleep(4000);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
index 24595d0..8618b1c 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
@@ -27,14 +27,16 @@ import java.util.TreeMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.HelixTimerTask;
import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
import org.apache.helix.model.AlertHistory;
import org.apache.helix.model.HealthStat;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -44,6 +46,8 @@ import org.testng.annotations.Test;
*/
public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+ private final static Logger LOG = Logger.getLogger(TestAlertFireHistory.class);
+
String _statName = "TestStat@DB=db1";
String _stat = "TestStat";
String metricName1 = "TestMetric1";
@@ -55,8 +59,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
void setHealthData(int[] val1, int[] val2) {
for (int i = 0; i < NODE_NR; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+ HelixManager manager = _participants[i];
ZNRecord record = new ZNRecord(_stat);
Map<String, String> valMap = new HashMap<String, String>();
valMap.put(metricName1, val1[i] + "");
@@ -72,13 +75,12 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error("Interrupted sleep", e);
}
}
@Test
- public void TestAlertDisable() throws InterruptedException {
+ public void testAlertDisable() throws InterruptedException {
int[] metrics1 = {
10, 15, 22, 24, 16
@@ -88,29 +90,27 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
};
setHealthData(metrics1, metrics2);
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller;
manager.startTimerTasks();
_setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
_setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
- // ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
Map<String, String> properties = new HashMap<String, String>();
properties.put("healthChange.enabled", "false");
_setupTool.getClusterManagementTool().setConfig(scope, properties);
- HealthStatsAggregator task =
- new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+ HealthStatsAggregator task = new HealthStatsAggregator(_controller);
+
task.aggregate();
Thread.sleep(100);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
AlertHistory history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
- //
+
Assert.assertEquals(history, null);
properties.put("healthChange.enabled", "true");
@@ -126,7 +126,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test
- public void TestAlertHistory() throws InterruptedException {
+ public void testAlertHistory() throws InterruptedException {
int[] metrics1 = {
10, 15, 22, 24, 16
};
@@ -135,9 +135,10 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
};
setHealthData(metrics1, metrics2);
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
- manager.stopTimerTasks();
+ HelixManager manager = _controller;
+ for (HelixTimerTask task : _controller.getControllerTimerTasks()) {
+ task.stop();
+ }
_setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
_setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
@@ -152,8 +153,8 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
historySize = property.getRecord().getMapFields().size();
}
- HealthStatsAggregator task =
- new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+ HealthStatsAggregator task = new HealthStatsAggregator(_controller);
+
task.aggregate();
Thread.sleep(100);
@@ -419,3 +420,4 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
}
}
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
index efbd3b4..29f4893 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
@@ -29,10 +29,10 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.HealthStat;
import org.apache.helix.model.Message;
@@ -73,7 +73,7 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
String clusterName = className + "_" + methodName;
final int n = 5;
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -95,15 +95,16 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
"EXP(decay(1.0)(*.defaultPerfCounters@defaultPerfCounters.availableCPUs))CMP(GREATER)CON(2)");
// start controller
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new DummyAlertsTransition());
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new DummyAlertsTransition());
participants[i].syncStart();
}
@@ -137,7 +138,6 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
}
// clean up
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
index f429b5f..23741c3 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
@@ -28,18 +28,16 @@ import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
import org.apache.helix.alerts.AlertValueAndStatus;
import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -50,7 +48,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestExpandAlert extends ZkIntegrationTestBase {
- ZkClient _zkClient;
protected ClusterSetup _setupTool = null;
protected final String _alertStr =
"EXP(decay(1.0)(localhost_*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(16)";
@@ -60,15 +57,12 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class ExpandAlertTransition extends MockTransition {
@@ -119,7 +113,7 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
@Test()
public void testExpandAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START TestExpandAlert at " + new Date(System.currentTimeMillis()));
@@ -135,18 +129,19 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
_setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) // !!!change back to 5
{
String instanceName = "localhost_" + (12918 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new ExpandAlertTransition());
- participants[i].start();
- // new Thread(participants[i]).start();
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new ExpandAlertTransition());
+ participants[i].syncStart();
}
boolean result =
@@ -157,13 +152,14 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
Thread.sleep(1000);
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ // new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger
Thread.sleep(3000);
// other verifications go here
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
// for (int i = 0; i < 1; i++) //change 1 back to 5
@@ -180,6 +176,12 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
Assert.assertFalse(fired);
// }
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+
+ }
System.out.println("END TestExpandAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
index 6d33df0..dbbd7aa 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
@@ -28,18 +28,16 @@ import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
import org.apache.helix.alerts.AlertValueAndStatus;
import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -50,7 +48,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestSimpleAlert extends ZkIntegrationTestBase {
- ZkClient _zkClient;
protected ClusterSetup _setupTool = null;
protected final String _alertStr =
"EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -59,15 +56,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
-
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class SimpleAlertTransition extends MockTransition {
@@ -124,7 +117,7 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
@Test()
public void testSimpleAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START TestSimpleAlert at " + new Date(System.currentTimeMillis()));
@@ -139,10 +132,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
// enableHealthCheck(clusterName);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
- cmResult._manager.startTimerTasks();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+ controller.startTimerTasks();
+
_setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
// start participants
for (int i = 0; i < 5; i++) // !!!change back to 5
@@ -150,9 +144,9 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
String instanceName = "localhost_" + (12918 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new SimpleAlertTransition(15));
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new SimpleAlertTransition(15));
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
boolean result =
@@ -162,13 +156,14 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ // new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger
Thread.sleep(3000);
// other verifications go here
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
// for (int i = 0; i < 1; i++) //change 1 back to 5
// {
@@ -196,6 +191,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
.equals("ON"));
// }
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END TestSimpleAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
index 3eb31ed..90223ad 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
@@ -27,29 +27,29 @@ import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
import org.apache.helix.alerts.AlertValueAndStatus;
import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
- ZkClient _zkClient;
+ private static Logger LOG = Logger.getLogger(TestSimpleWildcardAlert.class);
+
protected ClusterSetup _setupTool = null;
protected final String _alertStr =
"EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -58,15 +58,12 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class SimpleAlertTransition extends MockTransition {
@@ -123,7 +120,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
@Test()
public void testSimpleWildcardAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
@@ -138,10 +135,10 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
// enableHealthCheck(clusterName);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
- cmResult._manager.stopTimerTasks();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+ controller.stopTimerTasks();
String alertwildcard =
"EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -153,9 +150,9 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
String instanceName = "localhost_" + (12944 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new SimpleAlertTransition(i * 5));
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new SimpleAlertTransition(i * 5));
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
boolean result =
@@ -166,13 +163,13 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
Thread.sleep(1000);
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger
Thread.sleep(1000);
// other verifications go here
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
Map<String, Map<String, String>> recMap = record.getMapFields();
@@ -208,7 +205,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
alertwildcard =
"EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(15)";
_setupTool.getClusterManagementTool().addAlert(clusterName, alertwildcard);
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
Thread.sleep(1000);
record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
@@ -240,6 +237,11 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
Assert.assertTrue(delta.get(alertString).equals("ON"));
}
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
index 2661560..76784d3 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
@@ -28,18 +28,16 @@ import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
import org.apache.helix.alerts.AlertValueAndStatus;
import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -50,7 +48,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestStalenessAlert extends ZkIntegrationTestBase {
- ZkClient _zkClient;
protected ClusterSetup _setupTool = null;
protected final String _alertStr = "EXP(decay(1)(localhost_*.reportingage))CMP(GREATER)CON(600)";
protected final String _alertStatusStr = _alertStr + " : (localhost_12918.reportingage)";
@@ -58,15 +55,12 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class StalenessAlertTransition extends MockTransition {
@@ -117,7 +111,7 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
@Test()
public void testStalenessAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START TestStalenessAlert at " + new Date(System.currentTimeMillis()));
@@ -133,18 +127,19 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
_setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
// start participants
for (int i = 0; i < 5; i++) // !!!change back to 5
{
String instanceName = "localhost_" + (12918 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new StalenessAlertTransition());
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new StalenessAlertTransition());
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
boolean result =
@@ -154,13 +149,13 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger
Thread.sleep(3000);
// other verifications go here
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
// for (int i = 0; i < 1; i++) //change 1 back to 5
// {
@@ -176,6 +171,11 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
// Assert.assertFalse(fired);
// }
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END TestStalenessAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
index 5265ebb..79c85ca 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
@@ -43,18 +43,16 @@ import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
import org.apache.helix.alerts.AlertValueAndStatus;
import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
@@ -125,7 +123,6 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
}
private static final Logger _logger = Logger.getLogger(TestWildcardAlert.class);
- ZkClient _zkClient;
protected ClusterSetup _setupTool = null;
protected final String _alertStr =
"EXP(decay(1)(localhost_*.RestQueryStats@DBName=TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)";
@@ -134,15 +131,12 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
}
@AfterClass
public void afterClass() {
- _zkClient.close();
}
public class WildcardAlertTransition extends MockTransition {
@@ -208,7 +202,7 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
@Test()
public void testWildcardAlert() throws Exception {
String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
System.out.println("START TestWildcardAlert at " + new Date(System.currentTimeMillis()));
@@ -227,18 +221,18 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
_setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
// _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr2);
- StartCMResult cmResult =
- TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
for (int i = 0; i < 5; i++) // !!!change back to 5
{
String instanceName = "localhost_" + (12918 + i);
participants[i] =
- new MockParticipant(clusterName, instanceName, ZK_ADDR, new WildcardAlertTransition());
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].setTransition(new WildcardAlertTransition());
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
TestClusterMBeanObserver jmxMBeanObserver =
@@ -251,13 +245,13 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
Thread.sleep(3000);
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregator(cmResult._manager).aggregate();
+ new HealthStatsAggregator(controller).aggregate();
// sleep for a few seconds to give stats stage time to trigger and for bean to trigger
Thread.sleep(3000);
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
// for (int i = 0; i < 1; i++) //change 1 back to 5
@@ -292,6 +286,12 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
"EXP(decay(1)(localhost_%.RestQueryStats@DBName#TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)--(%)");
// }
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+
System.out.println("END TestWildcardAlert at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
index 82e45cc..32fdcff 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -20,14 +20,9 @@ package org.apache.helix.integration;
*/
import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
+
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
@@ -44,13 +39,15 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
protected static final int START_PORT = 12918;
protected static final String STATE_MODEL = "MasterSlave";
protected ClusterSetup _setupTool = null;
- protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
protected final String CLASS_NAME = getShortClassName();
protected final String CONTROLLER_CLUSTER = CONTROLLER_CLUSTER_PREFIX + "_" + CLASS_NAME;
protected static final String TEST_DB = "TestDB";
+ MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
+ ClusterDistributedController[] _distControllers = new ClusterDistributedController[NODE_NR];
+
@BeforeClass
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
@@ -88,29 +85,18 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
"MasterSlave", 3, true);
// start dummy participants for the first cluster
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null) {
- LOG.error("fail to start participant:" + instanceName
- + "(participant with the same name already running");
- } else {
- StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, firstCluster, instanceName);
- _startCMResultMap.put(instanceName, result);
- }
+ _participants[i] = new MockParticipantManager(ZK_ADDR, firstCluster, instanceName);
+ _participants[i].syncStart();
}
// start distributed cluster controllers
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < NODE_NR; i++) {
String controllerName = CONTROLLER_PREFIX + "_" + i;
- if (_startCMResultMap.get(controllerName) != null) {
- LOG.error("fail to start controller:" + controllerName
- + "(controller with the same name already running");
- } else {
- StartCMResult result =
- TestHelper.startController(CONTROLLER_CLUSTER, controllerName, ZK_ADDR,
- HelixControllerMain.DISTRIBUTED);
- _startCMResultMap.put(controllerName, result);
- }
+ _distControllers[i] =
+ new ClusterDistributedController(ZK_ADDR, CONTROLLER_CLUSTER, controllerName);
+ _distControllers[i].syncStart();
}
verifyClusters();
@@ -132,36 +118,22 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
* 3) disconnect leader/disconnect participant
*/
String leader = getCurrentLeader(_gZkClient, CONTROLLER_CLUSTER);
- // pauseController(_startCMResultMap.get(leader)._manager.getDataAccessor());
-
- StartCMResult result;
-
- Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
-
- while (it.hasNext()) {
- String instanceName = it.next().getKey();
- if (!instanceName.equals(leader) && instanceName.startsWith(CONTROLLER_PREFIX)) {
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
+ int leaderIdx = -1;
+ for (int i = 0; i < NODE_NR; i++) {
+ if (!_distControllers[i].getInstanceName().equals(leader)) {
+ _distControllers[i].syncStop();
+ verifyClusters();
+ } else {
+ leaderIdx = i;
}
- verifyClusters();
}
+ Assert.assertNotSame(leaderIdx, -1);
- result = _startCMResultMap.remove(leader);
- result._manager.disconnect();
- result._thread.interrupt();
-
- it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext()) {
- String instanceName = it.next().getKey();
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
- }
+ _distControllers[leaderIdx].syncStop();
+ for (int i = 0; i < NODE_NR; i++) {
+ _participants[i].syncStop();
+ }
System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
index d1014ed..79d8b89 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
@@ -22,15 +22,14 @@ package org.apache.helix.integration;
import java.util.Date;
import java.util.List;
-import org.apache.helix.InstanceType;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
@@ -50,16 +49,17 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 20, nodeNr - 1,
3, "MasterSlave", true);
- MockParticipant[] participants = new MockParticipant[nodeNr];
+ MockParticipantManager[] participants = new MockParticipantManager[nodeNr];
for (int i = 0; i < nodeNr - 1; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
- new Thread(participants[i]).start();
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
}
- ZkHelixTestManager controller =
- new ZkHelixTestManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
- controller.connect();
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
boolean result;
result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -73,7 +73,7 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
_gSetupTool.addInstanceToCluster(clusterName, "localhost_12922");
_gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 3);
- participants[nodeNr - 1] = new MockParticipant(clusterName, "localhost_12922", ZK_ADDR);
+ participants[nodeNr - 1] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12922");
new Thread(participants[nodeNr - 1]).start();
result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -84,11 +84,10 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
- // controller.disconnect();
- // for (int i = 0; i < nodeNr; i++)
- // {
- // participants[i].syncStop();
- // }
+ controller.syncStop();
+ for (int i = 0; i < nodeNr; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@@ -99,11 +98,13 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
// setup grand cluster
- TestHelper.setupCluster("GRAND_" + clusterName, ZK_ADDR, 0, "controller", null, 0, 0, 1, 0,
+ final String grandClusterName = "GRAND_" + clusterName;
+ TestHelper.setupCluster(grandClusterName, ZK_ADDR, 0, "controller", null, 0, 0, 1, 0,
null, true);
- TestHelper.startController("GRAND_" + clusterName, "controller_0", ZK_ADDR,
- HelixControllerMain.DISTRIBUTED);
+ ClusterDistributedController distController =
+ new ClusterDistributedController(ZK_ADDR, grandClusterName, "controller_0");
+ distController.syncStart();
// setup cluster
_gSetupTool.addCluster(clusterName, true);
@@ -125,12 +126,11 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
_gSetupTool.addResourceToCluster(clusterName, "TestDB0", 1, "LeaderStandby");
_gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 1);
- MockParticipant[] participants = new MockParticipant[nodeNr];
+ MockParticipantManager[] participants = new MockParticipantManager[nodeNr];
for (int i = 0; i < nodeNr - 1; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
- // new Thread(participants[i]).start();
}
result =
@@ -148,10 +148,8 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
_gSetupTool.addInstanceToCluster(clusterName, "localhost_12919");
_gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 2);
- participants[nodeNr - 1] = new MockParticipant(clusterName, "localhost_12919", ZK_ADDR);
+ participants[nodeNr - 1] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12919");
participants[nodeNr - 1].syncStart();
- // new Thread(participants[nodeNr - 1]).start();
-
result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, clusterName));
@@ -163,10 +161,10 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
Assert.assertEquals(numberOfListeners, 2); // 1 of participant, and 1 of controller
// clean up
- // for (int i = 0; i < nodeNr; i++)
- // {
- // participants[i].syncStop();
- // }
+ distController.syncStop();
+ for (int i = 0; i < nodeNr; i++) {
+ participants[i].syncStop();
+ }
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index ba4eee2..123ce6e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -28,9 +28,9 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
@@ -50,7 +50,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipant[] participants = new MockParticipant[n];
+ MockParticipantManager[] participants = new MockParticipantManager[n];
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
@@ -61,14 +61,15 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
3, // replicas
"MasterSlave", true); // do rebalance
- ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -114,7 +115,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
// register "TestDB1_Factory" state model factory
// Logger.getRootLogger().setLevel(Level.INFO);
for (int i = 0; i < n; i++) {
- participants[i].getManager().getStateMachineEngine()
+ participants[i].getStateMachineEngine()
.registerStateModelFactory("MasterSlave", new MockMSModelFactory(), "TestDB1_Factory");
}
@@ -125,7 +126,6 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
// clean up
// wait for all zk callbacks done
- Thread.sleep(1000);
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
index a008814..fe23cb9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
@@ -27,9 +27,9 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -74,15 +74,16 @@ public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase {
_gZkClient.writeData(idealPath, curIdealState);
// start controller
- TestHelper
- .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
// start participants
- MockParticipant[] participants = new MockParticipant[5];
+ MockParticipantManager[] participants = new MockParticipantManager[5];
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
}
@@ -92,6 +93,7 @@ public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// clean up
+ controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 0e7f4fa..2aec114 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -28,13 +28,13 @@ import java.util.Set;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
@@ -62,13 +62,11 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
// Logger.getRootLogger().setLevel(Level.INFO);
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace)) {
- _zkClient.deleteRecursive(namespace);
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
}
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
// setup storage cluster
_setupTool.addCluster(CLUSTER_NAME, true);
@@ -95,24 +93,21 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
// start dummy participants
for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null) {
- LOG.error("fail to start particpant:" + instanceName
- + "(participant with same name already exists)");
- } else {
- StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
- _startCMResultMap.put(instanceName, result);
- }
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ participant.syncStart();
+ _participants[i] = participant;
+
}
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ _controller =
+ new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
@@ -128,7 +123,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 1);
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, "MyDB"));
Assert.assertTrue(result);
@@ -146,7 +141,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB2", 3);
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, "MyDB2"));
Assert.assertTrue(result);
@@ -160,16 +155,11 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
@Test()
public void testAutoRebalance() throws Exception {
-
// kill 1 node
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- _startCMResultMap.get(instanceName)._manager.disconnect();
- Thread.sleep(1000);
- _startCMResultMap.get(instanceName)._thread.interrupt();
+ _participants[0].syncStop();
- // verifyBalanceExternalView();
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
@@ -178,22 +168,22 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
_setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- StartCMResult resultx =
- TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
- _startCMResultMap.put(storageNodeName, resultx);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
+ participant.syncStart();
}
Thread.sleep(5000);
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, db2));
Assert.assertTrue(result);
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
Set<String> instancesSet = new HashSet<String>();
@@ -246,12 +236,10 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
}
public static class ExternalViewBalancedVerifier implements ZkVerifier {
- ZkClient _client;
String _clusterName;
String _resourceName;
public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
- _client = client;
_clusterName = clusterName;
_resourceName = resourceName;
}
@@ -259,7 +247,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
@Override
public boolean verify() {
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
IdealState idealState = accessor.getProperty(keyBuilder.idealStates(_resourceName));
if (idealState == null) {
@@ -298,7 +286,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
@Override
public ZkClient getZkClient() {
- return _client;
+ return _gZkClient;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 3523461..5f9f48c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -27,12 +27,12 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
@@ -58,13 +58,11 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
// Logger.getRootLogger().setLevel(Level.INFO);
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace)) {
- _zkClient.deleteRecursive(namespace);
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
}
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_gZkClient);
// setup storage cluster
_setupTool.addCluster(CLUSTER_NAME, true);
@@ -79,40 +77,33 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller; // _startCMResultMap.get(controllerName)._manager;
HelixDataAccessor accessor = manager.getHelixDataAccessor();
// start dummy participants
for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null) {
- LOG.error("fail to start particpant:" + instanceName
- + "(participant with same name already exists)");
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i].syncStart();
+ Thread.sleep(2000);
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
+ CLUSTER_NAME, TEST_DB));
+ Assert.assertTrue(result);
+ ExternalView ev =
+ manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+ System.out.println(ev.getPartitionSet().size());
+ if (i < 3) {
+ Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1));
} else {
- startResult = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
- _startCMResultMap.put(instanceName, startResult);
- Thread.sleep(2000);
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
- CLUSTER_NAME, TEST_DB));
- Assert.assertTrue(result);
- ExternalView ev =
- manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
- System.out.println(ev.getPartitionSet().size());
- if (i < 3) {
- Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1));
- } else {
- Assert.assertEquals(ev.getPartitionSet().size(), 100);
- }
+ Assert.assertEquals(ev.getPartitionSet().size(), 100);
}
}
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
@@ -120,49 +111,50 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
@Test()
public void testAutoRebalanceWithMaxPartitionPerNode() throws Exception {
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixManager manager = _controller;
// kill 1 node
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- _startCMResultMap.get(instanceName)._manager.disconnect();
- Thread.sleep(1000);
- _startCMResultMap.get(instanceName)._thread.interrupt();
+ _participants[0].syncStop();
// verifyBalanceExternalView();
boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ final HelixDataAccessor accessor = manager.getHelixDataAccessor();
ExternalView ev =
manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
Assert.assertEquals(ev.getPartitionSet().size(), 100);
- instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
- _startCMResultMap.get(instanceName)._manager.disconnect();
- Thread.sleep(1000);
- _startCMResultMap.get(instanceName)._thread.interrupt();
+ _participants[1].syncStop();
// verifyBalanceExternalView();
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
- ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
- Assert.assertEquals(ev.getPartitionSet().size(), 75);
+ result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView(TEST_DB));
+ return ev.getPartitionSet().size() == 75;
+ }
+ }, 3 * 1000);
// add 2 nodes
for (int i = 0; i < 2; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
_setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- StartCMResult resultx =
- TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
- _startCMResultMap.put(storageNodeName, resultx);
+ String newInstanceName = storageNodeName.replace(':', '_');
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newInstanceName);
+ participant.syncStart();
}
- Thread.sleep(1000);
+
+ // Thread.sleep(1000);
result =
- ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
CLUSTER_NAME, TEST_DB));
Assert.assertTrue(result);
}
@@ -213,12 +205,10 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
}
public static class ExternalViewBalancedVerifier implements ZkVerifier {
- ZkClient _client;
String _clusterName;
String _resourceName;
public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
- _client = client;
_clusterName = clusterName;
_resourceName = resourceName;
}
@@ -226,7 +216,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
@Override
public boolean verify() {
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
IdealState idealState = accessor.getProperty(keyBuilder.idealStates(_resourceName));
int numberOfPartitions = idealState.getRecord().getListFields().size();
@@ -244,7 +234,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
@Override
public ZkClient getZkClient() {
- return _client;
+ return _gZkClient;
}
@Override