You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/10/03 21:05:38 UTC

git commit: [HELIX-518] Add integration tests to ensure helix tasks work as expected during master failover, rb=26272

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 2ebfe7d19 -> 8c5e63ab2


[HELIX-518] Add integration tests to ensure helix tasks work as expected during master failover, rb=26272


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8c5e63ab
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8c5e63ab
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8c5e63ab

Branch: refs/heads/helix-0.6.x
Commit: 8c5e63ab263d2cbdf1f17bb98335afb69974be99
Parents: 2ebfe7d
Author: zzhang <zz...@apache.org>
Authored: Fri Oct 3 12:03:43 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Oct 3 12:03:43 2014 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  16 +-
 .../helix/integration/task/DummyTask.java       |  72 +++++++
 .../task/TestTaskRebalancerFailover.java        | 216 +++++++++++++++++++
 3 files changed, 289 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8c5e63ab/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 546e3bb..81c2e3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -32,7 +32,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.IdealStateChangeListener;
@@ -60,7 +59,6 @@ import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.controller.stages.ResourceValidationStage;
 import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -84,7 +82,7 @@ import org.apache.log4j.Logger;
  */
 public class GenericHelixController implements ConfigChangeListener, IdealStateChangeListener,
     LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
-    ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener {
+    ControllerChangeListener, InstanceConfigChangeListener {
   private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
   volatile boolean init = false;
   private final PipelineRegistry _registry;
@@ -307,18 +305,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   // callback
 
   @Override
-  public void onExternalViewChange(List<ExternalView> externalViewList,
-      NotificationContext changeContext) {
-    // logger.info("START: GenericClusterController.onExternalViewChange()");
-    // ClusterEvent event = new ClusterEvent("externalViewChange");
-    // event.addAttribute("helixmanager", changeContext.getManager());
-    // event.addAttribute("changeContext", changeContext);
-    // event.addAttribute("eventData", externalViewList);
-    // _eventQueue.put(event);
-    // logger.info("END: GenericClusterController.onExternalViewChange()");
-  }
-
-  @Override
   public void onStateChange(String instanceName, List<CurrentState> statesInfo,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onStateChange()");

http://git-wip-us.apache.org/repos/asf/helix/blob/8c5e63ab/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
new file mode 100644
index 0000000..b6054d0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
@@ -0,0 +1,72 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+public class DummyTask implements Task {
+  private static final String TIMEOUT_CONFIG = "Timeout";
+  private final long _delay;
+  private volatile boolean _canceled;
+
+  public DummyTask(TaskCallbackContext context) {
+    JobConfig jobCfg = context.getJobConfig();
+    Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
+    if (cfg == null) {
+      cfg = Collections.emptyMap();
+    }
+    _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
+  }
+
+  @Override
+  public TaskResult run() {
+    long expiry = System.currentTimeMillis() + _delay;
+    long timeLeft;
+    while (System.currentTimeMillis() < expiry) {
+      if (_canceled) {
+        timeLeft = expiry - System.currentTimeMillis();
+        return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+            : timeLeft));
+      }
+      sleep(50);
+    }
+    timeLeft = expiry - System.currentTimeMillis();
+    return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+  }
+
+  @Override
+  public void cancel() {
+    _canceled = true;
+  }
+
+  private static void sleep(long d) {
+    try {
+      Thread.sleep(d);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8c5e63ab/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
new file mode 100644
index 0000000..b8e1c09
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -0,0 +1,216 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestTaskRebalancerFailover extends ZkUnitTestBase {
+  private static final Logger LOG = Logger.getLogger(TestTaskRebalancerFailover.class);
+
+  private final String _clusterName = TestHelper.getTestClassName();
+  private static final int _n = 5;
+  private static final int _p = 20;
+  private static final int _r = 3;
+  private final MockParticipantManager[] _participants = new MockParticipantManager[_n];
+  private ClusterControllerManager _controller;
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    ClusterSetup setup = new ClusterSetup(_gZkClient);
+    setup.addCluster(_clusterName, true);
+    for (int i = 0; i < _n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      setup.addInstanceToCluster(_clusterName, instanceName);
+    }
+
+    // Set up target db
+    setup.addResourceToCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _p, "MasterSlave");
+    setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("DummyTask", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new DummyTask(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < _n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = "controller";
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager =
+        HelixManagerFactory.getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                _clusterName));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (int i = 0; i < _n; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].syncStop();
+      }
+    }
+    _manager.disconnect();
+  }
+
+  @Test
+  public void test() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue queue = new JobQueue.Builder(queueName).build();
+    _driver.createQueue(queue);
+
+    // Enqueue jobs
+    Set<String> master = Sets.newHashSet("MASTER");
+    JobConfig.Builder job =
+        new JobConfig.Builder().setCommand("DummyTask")
+            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+    String job1Name = "masterJob";
+    LOG.info("Enqueuing job: " + job1Name);
+    _driver.enqueueJob(queueName, job1Name, job);
+
+    // check all tasks completed on MASTER
+    String namespacedJob1 = String.format("%s_%s", queueName, job1Name);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    ExternalView ev =
+        accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
+    JobContext ctx = TaskUtil.getJobContext(_manager, namespacedJob1);
+    Set<String> failOverPartitions = Sets.newHashSet();
+    for (int p = 0; p < _p; p++) {
+      String instanceName = ctx.getAssignedParticipant(p);
+      Assert.assertNotNull(instanceName);
+      String partitionName = ctx.getTargetForPartition(p);
+      Assert.assertNotNull(partitionName);
+      String state = ev.getStateMap(partitionName).get(instanceName);
+      Assert.assertNotNull(state);
+      Assert.assertEquals(state, "MASTER");
+      if (instanceName.equals("localhost_12918")) {
+        failOverPartitions.add(partitionName);
+      }
+    }
+
+    // enqueue another master job and fail localhost_12918
+    String job2Name = "masterJob2";
+    String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
+    LOG.info("Enqueuing job: " + job2Name);
+    _driver.enqueueJob(queueName, job2Name, job);
+
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS);
+    _participants[0].syncStop();
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+
+    // tasks previously assigned to localhost_12918 should be re-scheduled on new master
+    ctx = TaskUtil.getJobContext(_manager, namespacedJob2);
+    ev = accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
+    for (int p = 0; p < _p; p++) {
+      String partitionName = ctx.getTargetForPartition(p);
+      Assert.assertNotNull(partitionName);
+      if (failOverPartitions.contains(partitionName)) {
+        String instanceName = ctx.getAssignedParticipant(p);
+        Assert.assertNotNull(instanceName);
+        Assert.assertNotSame(instanceName, "localhost_12918");
+        String state = ev.getStateMap(partitionName).get(instanceName);
+        Assert.assertNotNull(state);
+        Assert.assertEquals(state, "MASTER");
+      }
+    }
+
+    // Flush queue and check cleanup
+    _driver.flushQueue(queueName);
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2)));
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
+    JobDag dag = workflowCfg.getJobDag();
+    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1));
+    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
+  }
+}