You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 22:57:19 UTC
[4/4] helix git commit: Fix TestZkConnectionLost to use seperate zk
server to avoid stuck other tests.
Fix TestZkConnectionLost to use seperate zk server to avoid stuck other tests.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/aaa632f3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/aaa632f3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/aaa632f3
Branch: refs/heads/master
Commit: aaa632f3d53a0268bfbf1b7ed18996cdc72894a4
Parents: 2049f93
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu May 17 13:29:31 2018 -0700
Committer: Junkai Xue <jx...@jxue-mn2.linkedin.biz>
Committed: Mon Jul 9 15:56:40 2018 -0700
----------------------------------------------------------------------
.../helix/integration/TestZkConnectionLost.java | 22 ++++++++--------
.../helix/task/TaskSynchronizedTestBase.java | 27 +++++++++++++++++---
2 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/aaa632f3/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
index 6fb966e..3721b2c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
@@ -34,30 +34,33 @@ public class TestZkConnectionLost extends TaskTestBase {
private final AtomicReference<ZkServer> _zkServerRef = new AtomicReference<>();
+ private String _zkAddr = "localhost:2189";
+
@BeforeClass
public void beforeClass() throws Exception {
+ ZkServer zkServer = TestHelper.startZkServer(_zkAddr);
+ _zkServerRef.set(zkServer);
+
_participants = new MockParticipantManager[_numNodes];
String namespace = "/" + CLUSTER_NAME;
if (_gZkClient.exists(namespace)) {
_gZkClient.deleteRecursively(namespace);
}
- _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool = new ClusterSetup(_zkAddr);
_setupTool.addCluster(CLUSTER_NAME, true);
setupParticipants();
setupDBs();
- createManagers();
+ createManagers(_zkAddr, CLUSTER_NAME);
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller = new ClusterControllerManager(_zkAddr, CLUSTER_NAME, controllerName);
_controller.syncStart();
HelixClusterVerifier clusterVerifier =
- new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(_zkAddr).build();
Assert.assertTrue(clusterVerifier.verify());
-
- _zkServerRef.set(_zkServer);
}
@Test
@@ -66,7 +69,7 @@ public class TestZkConnectionLost extends TaskTestBase {
System.setProperty("zk.session.timeout", "1000");
String queueName = TestHelper.getTestMethodName();
- startParticipants();
+ startParticipants(_zkAddr);
// Create a queue
LOG.info("Starting job-queue: " + queueName);
@@ -78,7 +81,6 @@ public class TestZkConnectionLost extends TaskTestBase {
restartZkServer();
WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
- // ensure job 1 is started before stop it
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
_driver.pollForWorkflowState(scheduledQueue, 10000, TaskState.COMPLETED);
}
@@ -91,7 +93,7 @@ public class TestZkConnectionLost extends TaskTestBase {
String queueName = TestHelper.getTestMethodName();
stopParticipants();
- startParticipants();
+ startParticipants(_zkAddr);
LOG.info("Starting job-queue: " + queueName);
JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 6000);
@@ -124,7 +126,7 @@ public class TestZkConnectionLost extends TaskTestBase {
TestHelper.stopZkServer(_zkServerRef.get());
Thread.sleep(300);
System.out.println("Restart ZK server");
- _zkServerRef.set(TestHelper.startZkServer(ZK_ADDR, null, false));
+ _zkServerRef.set(TestHelper.startZkServer(_zkAddr, null, false));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/aaa632f3/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
index c377233..cab60ce 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
@@ -121,16 +121,30 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase {
}
protected void startParticipants() {
- startParticipants(_numNodes);
+ startParticipants(ZK_ADDR, _numNodes);
+ }
+
+ protected void startParticipants(String zkAddr) {
+ startParticipants(zkAddr, _numNodes);
}
protected void startParticipants(int numNodes) {
for (int i = 0; i < numNodes; i++) {
- startParticipant(i);
+ startParticipant(ZK_ADDR, i);
+ }
+ }
+
+ protected void startParticipants(String zkAddr, int numNodes) {
+ for (int i = 0; i < numNodes; i++) {
+ startParticipant(zkAddr, i);
}
}
protected void startParticipant(int i) {
+ startParticipant(ZK_ADDR, i);
+ }
+
+ protected void startParticipant(String zkAddr, int i) {
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
@Override public Task createNewTask(TaskCallbackContext context) {
@@ -138,7 +152,7 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase {
}
});
String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i] = new MockParticipantManager(zkAddr, CLUSTER_NAME, instanceName);
// Register a Task state model factory.
StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
@@ -165,12 +179,17 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase {
protected void createManagers() throws Exception {
+ createManagers(ZK_ADDR, CLUSTER_NAME);
+ }
+
+ protected void createManagers(String zkAddr, String clusterName) throws Exception {
_manager = HelixManagerFactory
- .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ .getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR, zkAddr);
_manager.connect();
_driver = new TaskDriver(_manager);
}
+
public void setSingleTestEnvironment() {
_numDbs = 1;
_numNodes = 1;