You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:26:58 UTC
[02/33] helix git commit: More fixes and cleanup on task unit tests.
More fixes and cleanup on task unit tests.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/03b90012
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/03b90012
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/03b90012
Branch: refs/heads/helix-0.6.x
Commit: 03b90012939e99eb3556c715ff8b8eaab710bb79
Parents: 760f8e3
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Jan 20 15:33:20 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:34:12 2016 -0700
----------------------------------------------------------------------
.../task/TestDisableJobExternalView.java | 210 +++++++++++++++++++
.../integration/task/TestRecurringJobQueue.java | 74 +------
.../task/TestRunJobsWithMissingTarget.java | 7 +-
.../integration/task/TestTaskRebalancer.java | 3 +-
.../task/TestTaskRebalancerFailover.java | 2 +-
.../task/TestTaskRebalancerParallel.java | 3 +-
.../task/TestTaskRebalancerRetryLimit.java | 2 +-
.../task/TestTaskRebalancerStopResume.java | 13 +-
8 files changed, 222 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/03b90012/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
new file mode 100644
index 0000000..b23e268
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
@@ -0,0 +1,210 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestDisableJobExternalView extends ZkIntegrationTestBase {
+ private static final Logger LOG = Logger.getLogger(TestDisableJobExternalView.class);
+ private static final int n = 5;
+ private static final int START_PORT = 12918;
+ private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+ private static final String TGT_DB = "TestDB";
+ private static final int NUM_PARTITIONS = 20;
+ private static final int NUM_REPLICAS = 3;
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+ private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+ private ClusterControllerManager _controller;
+
+ private HelixManager _manager;
+ private TaskDriver _driver;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ setupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < n; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ // Set up target db
+ setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
+ setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
+
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new MockTask(context);
+ }
+ });
+
+ // start dummy participants
+ for (int i = 0; i < n; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+ taskFactoryReg));
+
+ _participants[i].syncStart();
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // create cluster manager
+ _manager =
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+ ZK_ADDR);
+ _manager.connect();
+
+ _driver = new TaskDriver(_manager);
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
+ ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ _manager.disconnect();
+ for (int i = 0; i < n; i++) {
+ _participants[i].syncStop();
+ }
+ _controller.syncStop();
+ }
+
+
+ @Test
+ public void testJobsDisableExternalView() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ ExternviewChecker externviewChecker = new ExternviewChecker();
+ _manager.addExternalViewChangeListener(externviewChecker);
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
+
+ JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+
+ JobConfig.Builder job2 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setDisableExternalView(true);
+
+ JobConfig.Builder job3 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet("MASTER")).setDisableExternalView(false);
+
+ // enqueue jobs
+ queueBuilder.enqueueJob("job1", job1);
+ queueBuilder.enqueueJob("job2", job2);
+ queueBuilder.enqueueJob("job3", job3);
+
+ _driver.createQueue(queueBuilder.build());
+
+ // ensure all jobs are completed
+ String namedSpaceJob3 = String.format("%s_%s", queueName, "job3");
+ TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob3, TaskState.COMPLETED);
+
+ Set<String> seenExternalViews = externviewChecker.getSeenExternalViews();
+ String namedSpaceJob1 = String.format("%s_%s", queueName, "job1");
+ String namedSpaceJob2 = String.format("%s_%s", queueName, "job2");
+
+ Assert.assertTrue(seenExternalViews.contains(namedSpaceJob1),
+ "Can not find external View for " + namedSpaceJob1 + "!");
+ Assert.assertTrue(!seenExternalViews.contains(namedSpaceJob2),
+ "External View for " + namedSpaceJob2 + " shoudld not exist!");
+ Assert.assertTrue(seenExternalViews.contains(namedSpaceJob3),
+ "Can not find external View for " + namedSpaceJob3 + "!");
+
+ _manager
+ .removeListener(new PropertyKey.Builder(CLUSTER_NAME).externalViews(), externviewChecker);
+ }
+
+ private static class ExternviewChecker implements ExternalViewChangeListener {
+ private Set<String> _seenExternalViews = new HashSet<String>();
+
+ @Override public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext) {
+ for (ExternalView view : externalViewList) {
+ _seenExternalViews.add(view.getResourceName());
+ }
+ }
+
+ public Set<String> getSeenExternalViews() {
+ return _seenExternalViews;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/helix/blob/03b90012/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 4e21ef7..cb44f0e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -21,17 +21,13 @@ package org.apache.helix.integration.task;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
@@ -40,7 +36,6 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.ExternalView;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
@@ -150,11 +145,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
@AfterClass
public void afterClass() throws Exception {
+ _manager.disconnect();
_controller.syncStop();
for (int i = 0; i < n; i++) {
_participants[i].syncStop();
}
- _manager.disconnect();
}
@@ -372,73 +367,6 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
String.format("%s_%s", scheduledQueue, jobNames.get(JOB_COUNTS - 1)));
}
- @Test
- public void testJobsDisableExternalView() throws Exception {
- String queueName = TestHelper.getTestMethodName();
-
- ExternviewChecker externviewChecker = new ExternviewChecker();
- _manager.addExternalViewChangeListener(externviewChecker);
-
- // Create a queue
- LOG.info("Starting job-queue: " + queueName);
- JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName);
-
- JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
-
- JobConfig.Builder job2 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setDisableExternalView(true);
-
- JobConfig.Builder job3 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet("MASTER")).setDisableExternalView(false);
-
- // enqueue both jobs
- queueBuilder.enqueueJob("job1", job1);
- queueBuilder.enqueueJob("job2", job2);
- queueBuilder.enqueueJob("job3", job3);
-
- _driver.createQueue(queueBuilder.build());
-
- WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
- String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-
- // ensure all jobs are completed
- String namedSpaceJob3 = String.format("%s_%s", scheduledQueue, "job3");
- TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED);
-
- Set<String> seenExternalViews = externviewChecker.getSeenExternalViews();
- String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1");
- String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, "job2");
-
- Assert.assertTrue(seenExternalViews.contains(namedSpaceJob1),
- "Can not find external View for " + namedSpaceJob1 + "!");
- Assert.assertTrue(!seenExternalViews.contains(namedSpaceJob2),
- "External View for " + namedSpaceJob2 + " shoudld not exist!");
- Assert.assertTrue(seenExternalViews.contains(namedSpaceJob3),
- "Can not find external View for " + namedSpaceJob3 + "!");
-
- _manager
- .removeListener(new PropertyKey.Builder(CLUSTER_NAME).externalViews(), externviewChecker);
- }
-
- private static class ExternviewChecker implements ExternalViewChangeListener {
- private Set<String> _seenExternalViews = new HashSet<String>();
-
- @Override public void onExternalViewChange(List<ExternalView> externalViewList,
- NotificationContext changeContext) {
- for (ExternalView view : externalViewList) {
- _seenExternalViews.add(view.getResourceName());
- }
- }
-
- public Set<String> getSeenExternalViews() {
- return _seenExternalViews;
- }
- }
-
private void verifyJobDeleted(String queueName, String jobName) throws Exception {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
http://git-wip-us.apache.org/repos/asf/helix/blob/03b90012/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index d8d2b60..31e4325 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -35,10 +35,8 @@ import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
@@ -48,8 +46,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -60,7 +56,6 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
private static final int num_dbs = 5;
private static final int START_PORT = 12918;
private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
- private static final String TIMEOUT_CONFIG = "Timeout";
private static final int NUM_PARTITIONS = 20;
private static final int NUM_REPLICAS = 3;
private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
@@ -139,11 +134,11 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
@AfterClass
public void afterClass() throws Exception {
+ _manager.disconnect();
_controller.syncStop();
for (int i = 0; i < num_nodes; i++) {
_participants[i].syncStop();
}
- _manager.disconnect();
}
@Test public void testJobFailsWithMissingTarget() throws Exception {
http://git-wip-us.apache.org/repos/asf/helix/blob/03b90012/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 787ebcc..2d11f85 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -136,14 +136,13 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
@AfterClass
public void afterClass() throws Exception {
+ _manager.disconnect();
_controller.syncStop();
// _controller = null;
for (int i = 0; i < n; i++) {
_participants[i].syncStop();
// _participants[i] = null;
}
-
- _manager.disconnect();
}
@Test
http://git-wip-us.apache.org/repos/asf/helix/blob/03b90012/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
index 6f1c48e..a778dcd 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -122,13 +122,13 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
@AfterClass
public void afterClass() throws Exception {
+ _manager.disconnect();
_controller.syncStop();
for (int i = 0; i < _n; i++) {
if (_participants[i] != null && _participants[i].isConnected()) {
_participants[i].syncStop();
}
}
- _manager.disconnect();
}
@Test
http://git-wip-us.apache.org/repos/asf/helix/blob/03b90012/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index 5180a04..c9a0445 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -127,14 +127,13 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
@AfterClass
public void afterClass() throws Exception {
+ _manager.disconnect();
_controller.syncStop();
// _controller = null;
for (int i = 0; i < n; i++) {
_participants[i].syncStop();
// _participants[i] = null;
}
-
- _manager.disconnect();
}
@Test
http://git-wip-us.apache.org/repos/asf/helix/blob/03b90012/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index d25ffc5..8fec899 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -116,13 +116,13 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
@AfterClass
public void afterClass() throws Exception {
+ _manager.disconnect();
_controller.syncStop();
for (int i = 0; i < _n; i++) {
if (_participants[i] != null && _participants[i].isConnected()) {
_participants[i].syncStop();
}
}
- _manager.disconnect();
}
@Test public void test() throws Exception {
http://git-wip-us.apache.org/repos/asf/helix/blob/03b90012/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 471d130..7a8d305 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -150,11 +150,11 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
@AfterClass
public void afterClass() throws Exception {
+ _manager.disconnect();
_controller.syncStop();
for (int i = 0; i < n; i++) {
_participants[i].syncStop();
}
- _manager.disconnect();
}
@Test public void stopAndResume() throws Exception {
@@ -306,14 +306,14 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
_driver.resume(queueName);
// ensure job 2 is started
- TaskTestUtil.pollForJobState(_manager, queueName,
- String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
+ TaskTestUtil
+ .pollForJobState(_manager, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
// stop the queue
LOG.info("Pausing job-queue: " + queueName);
_driver.stop(queueName);
- TaskTestUtil.pollForJobState(_manager, queueName,
- String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
+ TaskTestUtil
+ .pollForJobState(_manager, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
// Ensure job 3 is not started before deleting it
@@ -334,8 +334,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
// add job 3 back
JobConfig.Builder job =
new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(Sets.newHashSet("SLAVE"));
LOG.info("Enqueuing job: " + deletedJob2);
_driver.enqueueJob(queueName, deletedJob2, job);
currentJobNames.add(deletedJob2);