You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/10/03 21:05:38 UTC
git commit: [HELIX-518] Add integration tests to ensure helix tasks
work as expected during master failover, rb=26272
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 2ebfe7d19 -> 8c5e63ab2
[HELIX-518] Add integration tests to ensure helix tasks work as expected during master failover, rb=26272
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8c5e63ab
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8c5e63ab
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8c5e63ab
Branch: refs/heads/helix-0.6.x
Commit: 8c5e63ab263d2cbdf1f17bb98335afb69974be99
Parents: 2ebfe7d
Author: zzhang <zz...@apache.org>
Authored: Fri Oct 3 12:03:43 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Oct 3 12:03:43 2014 -0700
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 16 +-
.../helix/integration/task/DummyTask.java | 72 +++++++
.../task/TestTaskRebalancerFailover.java | 216 +++++++++++++++++++
3 files changed, 289 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8c5e63ab/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 546e3bb..81c2e3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -32,7 +32,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.IdealStateChangeListener;
@@ -60,7 +59,6 @@ import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.ResourceValidationStage;
import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -84,7 +82,7 @@ import org.apache.log4j.Logger;
*/
public class GenericHelixController implements ConfigChangeListener, IdealStateChangeListener,
LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
- ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener {
+ ControllerChangeListener, InstanceConfigChangeListener {
private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
volatile boolean init = false;
private final PipelineRegistry _registry;
@@ -307,18 +305,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
// callback
@Override
- public void onExternalViewChange(List<ExternalView> externalViewList,
- NotificationContext changeContext) {
- // logger.info("START: GenericClusterController.onExternalViewChange()");
- // ClusterEvent event = new ClusterEvent("externalViewChange");
- // event.addAttribute("helixmanager", changeContext.getManager());
- // event.addAttribute("changeContext", changeContext);
- // event.addAttribute("eventData", externalViewList);
- // _eventQueue.put(event);
- // logger.info("END: GenericClusterController.onExternalViewChange()");
- }
-
- @Override
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onStateChange()");
http://git-wip-us.apache.org/repos/asf/helix/blob/8c5e63ab/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
new file mode 100644
index 0000000..b6054d0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
@@ -0,0 +1,72 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+public class DummyTask implements Task {
+ private static final String TIMEOUT_CONFIG = "Timeout";
+ private final long _delay;
+ private volatile boolean _canceled;
+
+ public DummyTask(TaskCallbackContext context) {
+ JobConfig jobCfg = context.getJobConfig();
+ Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
+ if (cfg == null) {
+ cfg = Collections.emptyMap();
+ }
+ _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
+ }
+
+ @Override
+ public TaskResult run() {
+ long expiry = System.currentTimeMillis() + _delay;
+ long timeLeft;
+ while (System.currentTimeMillis() < expiry) {
+ if (_canceled) {
+ timeLeft = expiry - System.currentTimeMillis();
+ return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+ : timeLeft));
+ }
+ sleep(50);
+ }
+ timeLeft = expiry - System.currentTimeMillis();
+ return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+ }
+
+ @Override
+ public void cancel() {
+ _canceled = true;
+ }
+
+ private static void sleep(long d) {
+ try {
+ Thread.sleep(d);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8c5e63ab/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
new file mode 100644
index 0000000..b8e1c09
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -0,0 +1,216 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestTaskRebalancerFailover extends ZkUnitTestBase {
+ private static final Logger LOG = Logger.getLogger(TestTaskRebalancerFailover.class);
+
+ private final String _clusterName = TestHelper.getTestClassName();
+ private static final int _n = 5;
+ private static final int _p = 20;
+ private static final int _r = 3;
+ private final MockParticipantManager[] _participants = new MockParticipantManager[_n];
+ private ClusterControllerManager _controller;
+ private HelixManager _manager;
+ private TaskDriver _driver;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ ClusterSetup setup = new ClusterSetup(_gZkClient);
+ setup.addCluster(_clusterName, true);
+ for (int i = 0; i < _n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ setup.addInstanceToCluster(_clusterName, instanceName);
+ }
+
+ // Set up target db
+ setup.addResourceToCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _p, "MasterSlave");
+ setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
+
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put("DummyTask", new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new DummyTask(context);
+ }
+ });
+
+ // start dummy participants
+ for (int i = 0; i < _n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+ taskFactoryReg));
+ _participants[i].syncStart();
+ }
+
+ // start controller
+ String controllerName = "controller";
+ _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName);
+ _controller.syncStart();
+
+ // create cluster manager
+ _manager =
+ HelixManagerFactory.getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR,
+ ZK_ADDR);
+ _manager.connect();
+ _driver = new TaskDriver(_manager);
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ _clusterName));
+ Assert.assertTrue(result);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ _controller.syncStop();
+ for (int i = 0; i < _n; i++) {
+ if (_participants[i] != null && _participants[i].isConnected()) {
+ _participants[i].syncStop();
+ }
+ }
+ _manager.disconnect();
+ }
+
+ @Test
+ public void test() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue queue = new JobQueue.Builder(queueName).build();
+ _driver.createQueue(queue);
+
+ // Enqueue jobs
+ Set<String> master = Sets.newHashSet("MASTER");
+ JobConfig.Builder job =
+ new JobConfig.Builder().setCommand("DummyTask")
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+ String job1Name = "masterJob";
+ LOG.info("Enqueuing job: " + job1Name);
+ _driver.enqueueJob(queueName, job1Name, job);
+
+ // check all tasks completed on MASTER
+ String namespacedJob1 = String.format("%s_%s", queueName, job1Name);
+ TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ ExternalView ev =
+ accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
+ JobContext ctx = TaskUtil.getJobContext(_manager, namespacedJob1);
+ Set<String> failOverPartitions = Sets.newHashSet();
+ for (int p = 0; p < _p; p++) {
+ String instanceName = ctx.getAssignedParticipant(p);
+ Assert.assertNotNull(instanceName);
+ String partitionName = ctx.getTargetForPartition(p);
+ Assert.assertNotNull(partitionName);
+ String state = ev.getStateMap(partitionName).get(instanceName);
+ Assert.assertNotNull(state);
+ Assert.assertEquals(state, "MASTER");
+ if (instanceName.equals("localhost_12918")) {
+ failOverPartitions.add(partitionName);
+ }
+ }
+
+ // enqueue another master job and fail localhost_12918
+ String job2Name = "masterJob2";
+ String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
+ LOG.info("Enqueuing job: " + job2Name);
+ _driver.enqueueJob(queueName, job2Name, job);
+
+ TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS);
+ _participants[0].syncStop();
+ TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+
+ // tasks previously assigned to localhost_12918 should be re-scheduled on new master
+ ctx = TaskUtil.getJobContext(_manager, namespacedJob2);
+ ev = accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
+ for (int p = 0; p < _p; p++) {
+ String partitionName = ctx.getTargetForPartition(p);
+ Assert.assertNotNull(partitionName);
+ if (failOverPartitions.contains(partitionName)) {
+ String instanceName = ctx.getAssignedParticipant(p);
+ Assert.assertNotNull(instanceName);
+ Assert.assertNotSame(instanceName, "localhost_12918");
+ String state = ev.getStateMap(partitionName).get(instanceName);
+ Assert.assertNotNull(state);
+ Assert.assertEquals(state, "MASTER");
+ }
+ }
+
+ // Flush queue and check cleanup
+ _driver.flushQueue(queueName);
+ Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1)));
+ Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1)));
+ Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2)));
+ Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2)));
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
+ JobDag dag = workflowCfg.getJobDag();
+ Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1));
+ Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2));
+ Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1));
+ Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2));
+ Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
+ Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
+ }
+}