You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/04/02 16:43:26 UTC

[helix] branch master updated: Fix the scheduling decision for multiple currentStates (#923)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 184a50a  Fix the scheduling decision for multiple currentStates (#923)
184a50a is described below

commit 184a50ac173c18e0ea3d40ffb5a0f93d31aa558f
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Thu Apr 2 09:43:15 2020 -0700

    Fix the scheduling decision for multiple currentStates (#923)
    
    Fix the scheduling decision for multiple currentStates and prevAssignment
    
    In this commit, the problem of scheduling and dropping the tasks
    on the slave node has been addressed.
    Multiple tests have been added.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  |  11 +-
 .../task/TestTaskSchedulingTwoCurrentStates.java   | 216 ++++++++++++++
 .../helix/task/TestTargetedTaskStateChange.java    | 329 +++++++++++++++++++++
 3 files changed, 555 insertions(+), 1 deletion(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index fa4f249..bc06636 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -115,6 +115,12 @@ public abstract class AbstractTaskDispatcher {
         TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx, jobTgtState);
 
+        if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
+          LOG.warn(
+              "Instance {} does not match the assigned participant for pId {} in the job context. Skipping task scheduling.",
+              instance, pId);
+          continue;
+        }
         // This avoids a race condition in the case that although currentState is in the following
         // error condition, the pending message (INIT->RUNNNING) might still be present.
         // This is undesirable because this prevents JobContext from getting the proper update of
@@ -383,6 +389,7 @@ public abstract class AbstractTaskDispatcher {
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    jobCtx.setAssignedParticipant(pId, instance);
     jobCtx.setPartitionState(pId, currentState);
     String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
     if (taskMsg != null) {
@@ -827,7 +834,9 @@ public abstract class AbstractTaskDispatcher {
               break;
             }
           }
-          if (existsInNewAssignment) {
+          if (existsInNewAssignment
+              && instance.equals(jobContext.getAssignedParticipant(pId))
+          ) {
             // We need to drop this task in the old assignment
             paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
             jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
new file mode 100644
index 0000000..5b63b44
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
@@ -0,0 +1,216 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test to check if targeted tasks correctly get assigned and also if cancel messages are not being
+ * sent when there are two CurrentStates.
+ */
+public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
+  private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  protected HelixDataAccessor _accessor;
+  private PropertyKey.Builder _keyBuilder;
+  private static final AtomicInteger CANCEL_COUNT = new AtomicInteger(0);
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 3;
+    super.beforeClass();
+    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+
+    // Stop participants that have been started in super class
+    for (int i = 0; i < _numNodes; i++) {
+      super.stopParticipant(i);
+      Assert.assertFalse(_participants[i].isConnected());
+    }
+
+    // Start new participants that have new TaskStateModel (NewMockTask) information
+    _participants = new MockParticipantManager[_numNodes];
+    for (int i = 0; i < _numNodes; i++) {
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+      taskFactoryReg.put(NewMockTask.TASK_COMMAND, NewMockTask::new);
+      String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+  }
+
+  @Test
+  public void testTargetedTaskTwoCurrentStates() throws Exception {
+    String jobQueueName = TestHelper.getTestMethodName();
+
+    _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    _keyBuilder = _accessor.keyBuilder();
+    ClusterConfig clusterConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+    clusterConfig.setPersistIntermediateAssignment(true);
+    clusterConfig.setRebalanceTimePeriod(10000L);
+    _accessor.setProperty(_keyBuilder.clusterConfig(), clusterConfig);
+
+    List<String> preferenceList = new ArrayList<>();
+    preferenceList.add(PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+    preferenceList.add(PARTICIPANT_PREFIX + "_" + (_startPort + 0));
+    preferenceList.add(PARTICIPANT_PREFIX + "_" + (_startPort + 2));
+    // Change the Rebalance Mode to SEMI_AUTO
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DATABASE);
+    idealState.setPreferenceList(DATABASE + "_0", preferenceList);
+    idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, DATABASE,
+        idealState);
+
+    // [Participant0: localhost_12918, Participant1: localhost_12919, Participant2: localhost_12920]
+    // Preference list [localhost_12919, localhost_12918, localhost_12920]
+    // Status: [Participant1: Master, Participant0: Slave, Participant2: Slave]
+    // Based on the above preference list and since is is SEMI_AUTO, localhost_12919 will be Master.
+    JobConfig.Builder jobBuilder0 =
+        new JobConfig.Builder().setWorkflow(jobQueueName).setTargetResource(DATABASE)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("JOB0", jobBuilder0);
+
+    _driver.start(jobQueue.build());
+
+    String namespacedJobName = TaskUtil.getNamespacedJobName(jobQueueName, "JOB0");
+
+    _driver.pollForJobState(jobQueueName, namespacedJobName, TaskState.IN_PROGRESS);
+
+    // Task should be assigned to Master -> Participant0
+    boolean isTaskAssignedToMasterNode = TestHelper.verify(() -> {
+      JobContext ctx = _driver.getJobContext(namespacedJobName);
+      String participant = ctx.getAssignedParticipant(0);
+      if (participant == null) {
+        return false;
+      }
+      return (participant.equals(PARTICIPANT_PREFIX + "_" + (_startPort + 1)));
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isTaskAssignedToMasterNode);
+
+    String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+    ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
+    String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
+    String currentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 + "/CURRENTSTATES/"
+        + sessionIdP0 + "/" + namespacedJobName;
+
+    // Get the current state of Participant1
+    String instanceP1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1);
+    ZkClient clientP1 = (ZkClient) _participants[1].getZkClient();
+    String sessionIdP1 = ZkTestHelper.getSessionId(clientP1);
+    String currentStatePathP1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP1 + "/CURRENTSTATES/"
+        + sessionIdP1 + "/" + namespacedJobName;
+
+    boolean isCurrentStateCreated = TestHelper.verify(() -> {
+      ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor()
+          .get(currentStatePathP1, new Stat(), AccessOption.PERSISTENT);
+      if (record != null) {
+        record.setSimpleField(CurrentState.CurrentStateProperty.SESSION_ID.name(), sessionIdP0);
+        _manager.getHelixDataAccessor().getBaseDataAccessor().set(currentStatePathP0, record,
+            AccessOption.PERSISTENT);
+        return true;
+      } else {
+        return false;
+      }
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isCurrentStateCreated);
+
+    String previousAssignmentPath = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/"
+        + namespacedJobName + "/PreviousResourceAssignment";
+    ResourceAssignment prevAssignment = new ResourceAssignment(namespacedJobName);
+    Map<String, String> replicaMap = new HashMap<>();
+    replicaMap.put(instanceP0, TaskPartitionState.RUNNING.name());
+    Partition taskPartition = new Partition(namespacedJobName + "_0");
+    prevAssignment.addReplicaMap(taskPartition, replicaMap);
+    _manager.getHelixDataAccessor().getBaseDataAccessor().set(previousAssignmentPath,
+        prevAssignment.getRecord(), AccessOption.PERSISTENT);
+
+    // Wait until the job is finished.
+    _driver.pollForJobState(jobQueueName, namespacedJobName, TaskState.COMPLETED);
+    Assert.assertEquals(CANCEL_COUNT.get(), 0);
+  }
+
+  /**
+   * A mock task that extents MockTask class to count the number of cancel messages.
+   */
+  private class NewMockTask extends MockTask {
+
+    NewMockTask(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public void cancel() {
+      // Increment the cancel count so we know cancel() has been called
+      CANCEL_COUNT.incrementAndGet();
+      super.cancel();
+    }
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
new file mode 100644
index 0000000..0428c81
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -0,0 +1,329 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTargetedTaskStateChange {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_RESOURCES = "TestDB";
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+    _assignableInstanceManager = new AssignableInstanceManager();
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there are two current states for two
+   * different instances.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Running
+   * CurrentState: Instance0: Running, Instance1: Running
+   * Expected paMap: Instance0 -> Dropped
+   */
+  @Test
+  public void testTwoRunningCurrentStates() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock._cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock._cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment);
+    when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock._cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock._cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock._cache);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.DROPPED.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "0"));
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there is one current state which is
+   * different from
+   * Previous Assignment information.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Dropped
+   * CurrentState: Instance0: Running
+   * Expected paMap: Instance1 -> Running
+   */
+  @Test
+  public void testOneRunningOneNull() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock._cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock._cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment2);
+    when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock._cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock._cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock._cache);
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput2, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.RUNNING.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "1"));
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    return workflowConfigBuilder.build();
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    jobConfigBuilder.setJobId(JOB_NAME);
+    List<String> targetPartition = new ArrayList<>();
+    targetPartition.add(TARGET_RESOURCES + "_0");
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>();
+    targetPartitionStates.add("MASTER");
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    JobConfig jobConfig = jobConfigBuilder.build();
+    return jobConfig;
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
+    return new WorkflowContext(record);
+  }
+
+  private JobContext prepareJobContext(String instance) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    JobContext jobContext = new JobContext(record);
+    jobContext.setStartTime(0L);
+    jobContext.setName(JOB_NAME);
+    jobContext.setStartTime(0L);
+    jobContext.setPartitionState(0, TaskPartitionState.RUNNING);
+    jobContext.setPartitionTarget(0, instance);
+    jobContext.setPartitionTarget(0, TARGET_RESOURCES + "_0");
+    return jobContext;
+  }
+
+  private Map<String, IdealState> prepareIdealStates(String instance1, String instance2,
+      String instance3) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    record.setSimpleField(IdealState.IdealStateProperty.NUM_PARTITIONS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.EXTERNAL_VIEW_DISABLED.name(), "true");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "TASK");
+    record.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "Task");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_FACTORY_NAME.name(), "DEFAULT");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.task.JobRebalancer");
+    record.setMapField(JOB_NAME + "_" + PARTITION_NAME, new HashMap<>());
+    record.setListField(JOB_NAME + "_" + PARTITION_NAME, new ArrayList<>());
+    Map<String, IdealState> idealStates = new HashMap<>();
+    idealStates.put(JOB_NAME, new IdealState(record));
+
+    ZNRecord recordDB = new ZNRecord(TARGET_RESOURCES);
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "3");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "FULL_AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO_REBALANCE");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "MasterSlave");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
+    Map<String, String> mapping = new HashMap<>();
+    mapping.put(instance1, "MASTER");
+    mapping.put(instance2, "SLAVE");
+    mapping.put(instance3, "SLAVE");
+    recordDB.setMapField(TARGET_RESOURCES + "_0", mapping);
+    List<String> listField = new ArrayList<>();
+    listField.add(instance1);
+    listField.add(instance2);
+    listField.add(instance3);
+    recordDB.setListField(TARGET_RESOURCES + "_0", listField);
+    idealStates.put(TARGET_RESOURCES, new IdealState(recordDB));
+
+    return idealStates;
+  }
+
+  private CurrentStateOutput prepareCurrentState(String masterInstance, String slaveInstance,
+      String masterState, String slaveState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, slaveInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, slaveInstance, slaveState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, slaveInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private CurrentStateOutput prepareCurrentState2(String masterInstance, String masterState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private ResourceAssignment preparePreviousAssignment(String instance, String state) {
+    ResourceAssignment prevAssignment = new ResourceAssignment(JOB_NAME);
+    Map<String, String> replicaMap = new HashMap<>();
+    replicaMap.put(instance, state);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    prevAssignment.addReplicaMap(taskPartition, replicaMap);
+    return prevAssignment;
+  }
+
+  private class MockTestInformation {
+    private static final String SLAVE_INSTANCE = INSTANCE_PREFIX + "0";
+    private static final String MASTER_INSTANCE = INSTANCE_PREFIX + "1";
+    private static final String SLAVE_INSTANCE_2 = INSTANCE_PREFIX + "2";
+
+    private WorkflowControllerDataProvider _cache = mock(WorkflowControllerDataProvider.class);
+    private WorkflowConfig _workflowConfig = prepareWorkflowConfig();
+    private WorkflowContext _workflowContext = prepareWorkflowContext();
+    private Map<String, IdealState> _idealStates =
+        prepareIdealStates(MASTER_INSTANCE, SLAVE_INSTANCE, SLAVE_INSTANCE_2);
+    private JobConfig _jobConfig = prepareJobConfig();
+    private JobContext _jobContext = prepareJobContext(SLAVE_INSTANCE);
+    private CurrentStateOutput _currentStateOutput = prepareCurrentState(MASTER_INSTANCE,
+        SLAVE_INSTANCE, TaskPartitionState.RUNNING.name(), TaskPartitionState.RUNNING.name());
+    private CurrentStateOutput _currentStateOutput2 =
+        prepareCurrentState2(MASTER_INSTANCE, TaskPartitionState.RUNNING.name());
+    private ResourceAssignment _resourceAssignment =
+        preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.RUNNING.name());
+    private ResourceAssignment _resourceAssignment2 =
+        preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.DROPPED.name());
+    private TaskDataCache _taskDataCache = mock(TaskDataCache.class);
+    private RuntimeJobDag _runtimeJobDag = mock(RuntimeJobDag.class);
+
+    MockTestInformation() {
+    }
+  }
+}