You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/03/31 18:19:37 UTC

[GitHub] [helix] alirezazamani opened a new pull request #923: Fix the scheduling decision for multiple currentStates

alirezazamani opened a new pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923
 
 
   ### Issues
   - [x] My PR addresses the following Helix issues and references them in the PR title:
   Fixes #461
   Fixes #922
   
   ### Description
   - [x] Here are some details about my PR, including screenshots of any UI changes:
   This commit specifically targets the scenario where there are multiple currentstates and previous assignment existed for one partition. In this case, since paMap has only one field for the mapping, the final result of scheduling is not deterministic. In this case, tasks can be stuck in INIT -> RUNNING and RUNNING -> DROPPED on the wrong instance. This PR fixes this scenario.
   
   ### Tests
   - [x] The following tests are written for this issue:
   TestTargetedTaskStateChange
   
   - [x] The following is the result of the "mvn test" command on the appropriate module:
   Test Result 1: mvn test
   Failed tests:
     TestWorkflowTermination.testWorkflowPausedTimeout:170->verifyWorkflowCleanup:257 expected:<true> but was:<false>
   
   Tests run: 1094, Failures: 1, Errors: 0, Skipped: 0
   
   [INFO] ------------------------------------------------------------------------
   [INFO] BUILD FAILURE
   [INFO] ------------------------------------------------------------------------
   [INFO] Total time:  01:13 h
   [INFO] Finished at: 2020-03-30T17:11:26-07:00
   [INFO] ------------------------------------------------------------------------
   
   The failed test has passed when I ran it individually.
   
   ### Commits
   
   - [x] My commits all reference appropriate Apache Helix GitHub issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - [x] My diff has been formatted using helix-style.xml
   
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] pkuwm commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
pkuwm commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r401134987
 
 

 ##########
 File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
 ##########
 @@ -115,6 +115,12 @@ public void updatePreviousAssignedTasksStatus(
         TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx, jobTgtState);
 
+        if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
+          LOG.warn(String.format(
 
 Review comment:
   I suggest using  {}-style parameters.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r401974747
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
 ##########
 @@ -0,0 +1,348 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTargetedTaskStateChange {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_RESOURCES = "TestDB";
+  private static final int NUM_TASKS = 1;
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out.println(
+        "START " + this.getClass().getSimpleName() + " at " + new Date(System.currentTimeMillis()));
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+    _assignableInstanceManager = new AssignableInstanceManager();
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there are two current states for two
+   * different instances.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Running
+   * CurrentState: Instance0: Running, Instance1: Running
+   * Expected paMap: Instance0 -> Dropped
+   */
+  @Test
+  public void testTwoRunningCurrentStates() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.DROPPED.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "0"));
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there is one current state which is
+   * different from
+   * Previous Assignment information.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Dropped
+   * CurrentState: Instance0: Running
+   * Expected paMap: Instance1 -> Running
+   */
+  @Test
+  public void testOneRunningOneNull() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment2);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput2, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.RUNNING.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "1"));
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    WorkflowConfig workflowConfig = workflowConfigBuilder.build();
+
+    return workflowConfig;
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    jobConfigBuilder.setJobId(JOB_NAME);
+    List<String> targetPartition = new ArrayList<>();
+    targetPartition.add(TARGET_RESOURCES + "_0");
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>();
+    targetPartitionStates.add("MASTER");
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    JobConfig jobConfig = jobConfigBuilder.build();
+    return jobConfig;
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
+    return new WorkflowContext(record);
+  }
+
+  private JobContext prepareJobContext(String instance) {
+    Set<Integer> _taskPartitionSet;
+    Map<Integer, TaskPartitionState> _taskPartitionStateMap;
+    Map<Integer, String> _partitionToTaskIDMap;
+    Map<Integer, String> _taskToInstanceMap;
+    _taskPartitionSet = new HashSet<>();
+    _taskPartitionStateMap = new HashMap<>();
+    _partitionToTaskIDMap = new HashMap<>();
+    _taskToInstanceMap = new HashMap<>();
 
 Review comment:
   Why do these local variables have underscores?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r401279901
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
 ##########
 @@ -0,0 +1,348 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTargetedTaskStateChange {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_RESOURCES = "TestDB";
+  private static final int NUM_TASKS = 1;
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out.println(
+        "START " + this.getClass().getSimpleName() + " at " + new Date(System.currentTimeMillis()));
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+    _assignableInstanceManager = new AssignableInstanceManager();
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there are two current states for two
+   * different instances.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Running
+   * CurrentState: Instance0: Running, Instance1: Running
+   * Expected paMap: Instance0 -> Dropped
+   */
+  @Test
+  public void testTwoRunningCurrentStates() {
+    Mock mock = new Mock();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.DROPPED.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "0"));
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there is one current state which is
+   * different from
+   * Previous Assignment information.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Dropped
+   * CurrentState: Instance0: Running
+   * Expected paMap: Instance1 -> Running
+   */
+  @Test
+  public void testOneRunningOneNull() {
+    Mock mock = new Mock();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment2);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput2, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.RUNNING.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "1"));
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    WorkflowConfig workflowConfig = workflowConfigBuilder.build();
+
+    return workflowConfig;
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    jobConfigBuilder.setJobId(JOB_NAME);
+    List<String> targetPartition = new ArrayList<>();
+    targetPartition.add(TARGET_RESOURCES + "_0");
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>();
+    targetPartitionStates.add("MASTER");
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    JobConfig jobConfig = jobConfigBuilder.build();
+    return jobConfig;
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
+    return new WorkflowContext(record);
+  }
+
+  private JobContext prepareJobContext(String instance) {
+    Set<Integer> _taskPartitionSet;
+    Map<Integer, TaskPartitionState> _taskPartitionStateMap;
+    Map<Integer, String> _partitionToTaskIDMap;
+    Map<Integer, String> _taskToInstanceMap;
+    _taskPartitionSet = new HashSet<>();
+    _taskPartitionStateMap = new HashMap<>();
+    _partitionToTaskIDMap = new HashMap<>();
+    _taskToInstanceMap = new HashMap<>();
+
+    _taskPartitionSet.add(0);
+    _taskPartitionStateMap.put(0, TaskPartitionState.RUNNING);
+    _partitionToTaskIDMap.put(0, "0");
+    String someInstance = INSTANCE_PREFIX + 0;
+    _taskToInstanceMap.put(0, someInstance);
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    JobContext jobContext = new JobContext(record);
+    jobContext.setStartTime(0L);
+    jobContext.setName(JOB_NAME);
+    jobContext.setStartTime(0L);
+    jobContext.setPartitionState(0, TaskPartitionState.RUNNING);
+    jobContext.setPartitionTarget(0, instance);
+    jobContext.setPartitionTarget(0, TARGET_RESOURCES + "_0");
+    return jobContext;
+  }
+
+  private Map<String, IdealState> prepareIdealStates(String instance1, String instance2,
+      String instance3) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    record.setSimpleField(IdealState.IdealStateProperty.NUM_PARTITIONS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.EXTERNAL_VIEW_DISABLED.name(), "true");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "TASK");
+    record.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "Task");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_FACTORY_NAME.name(), "DEFAULT");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.task.JobRebalancer");
+    record.setMapField(JOB_NAME + "_" + PARTITION_NAME, new HashMap<>());
+    record.setListField(JOB_NAME + "_" + PARTITION_NAME, new ArrayList<>());
+    Map<String, IdealState> idealStates = new HashMap<>();
+    idealStates.put(JOB_NAME, new IdealState(record));
+
+    ZNRecord recordDB = new ZNRecord(TARGET_RESOURCES);
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "3");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "FULL_AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO_REBALANCE");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "MasterSlave");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
+    Map<String, String> mapping = new HashMap<>();
+    mapping.put(instance1, "MASTER");
+    mapping.put(instance2, "SLAVE");
+    mapping.put(instance3, "SLAVE");
+    recordDB.setMapField(TARGET_RESOURCES + "_0", mapping);
+    List<String> listField = new ArrayList<>();
+    listField.add(instance1);
+    listField.add(instance2);
+    listField.add(instance3);
+    recordDB.setListField(TARGET_RESOURCES + "_0", listField);
+    idealStates.put(TARGET_RESOURCES, new IdealState(recordDB));
+
+    return idealStates;
+  }
+
+  private CurrentStateOutput prepareCurrentState(String masterInstance, String slaveInstance,
+      String masterState, String slaveState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, slaveInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, slaveInstance, slaveState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, slaveInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private CurrentStateOutput prepareCurrentState2(String masterInstance, String masterState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private ResourceAssignment preparePreviousAssignment(String instance, String state) {
+    ResourceAssignment prevAssignment = new ResourceAssignment(JOB_NAME);
+    Map<String, String> replicaMap = new HashMap<>();
+    replicaMap.put(instance, state);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    prevAssignment.addReplicaMap(taskPartition, replicaMap);
+    return prevAssignment;
+  }
+
+  private class Mock {
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc merged pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
dasahcc merged pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r402011381
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
 ##########
 @@ -0,0 +1,219 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test to check if targeted tasks correctly get assigned and also if cancel messages are not being
+ * sent when there are two CurrentStates.
+ */
+public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
+  private final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  protected HelixDataAccessor _accessor;
+  private PropertyKey.Builder _keyBuilder;
+  private static final AtomicInteger CANCEL_COUNT = new AtomicInteger(0);
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 3;
+    super.beforeClass();
+    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+
+    // Stop participants that have been started in super class
+    for (int i = 0; i < _numNodes; i++) {
+      super.stopParticipant(i);
+    }
+
+    // Check that participants are actually stopped
+    for (int i = 0; i < _numNodes; i++) {
+      Assert.assertFalse(_participants[i].isConnected());
+    }
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r401974146
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
 ##########
 @@ -0,0 +1,219 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test to check if targeted tasks correctly get assigned and also if cancel messages are not being
+ * sent when there are two CurrentStates.
+ */
+public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
+  private final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  protected HelixDataAccessor _accessor;
+  private PropertyKey.Builder _keyBuilder;
+  private static final AtomicInteger CANCEL_COUNT = new AtomicInteger(0);
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 3;
+    super.beforeClass();
+    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+
+    // Stop participants that have been started in super class
+    for (int i = 0; i < _numNodes; i++) {
+      super.stopParticipant(i);
+    }
+
+    // Check that participants are actually stopped
+    for (int i = 0; i < _numNodes; i++) {
+      Assert.assertFalse(_participants[i].isConnected());
+    }
 
 Review comment:
   Possible to merge these two loops?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r402015442
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
 ##########
 @@ -0,0 +1,330 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTargetedTaskStateChange {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_RESOURCES = "TestDB";
+  private static final int NUM_TASKS = 1;
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+    _assignableInstanceManager = new AssignableInstanceManager();
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there are two current states for two
+   * different instances.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Running
+   * CurrentState: Instance0: Running, Instance1: Running
+   * Expected paMap: Instance0 -> Dropped
+   */
+  @Test
+  public void testTwoRunningCurrentStates() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.DROPPED.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "0"));
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there is one current state which is
+   * different from
+   * Previous Assignment information.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Dropped
+   * CurrentState: Instance0: Running
+   * Expected paMap: Instance1 -> Running
+   */
+  @Test
+  public void testOneRunningOneNull() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment2);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput2, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.RUNNING.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "1"));
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    return workflowConfigBuilder.build();
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    jobConfigBuilder.setJobId(JOB_NAME);
+    List<String> targetPartition = new ArrayList<>();
+    targetPartition.add(TARGET_RESOURCES + "_0");
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>();
+    targetPartitionStates.add("MASTER");
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    JobConfig jobConfig = jobConfigBuilder.build();
+    return jobConfig;
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
+    return new WorkflowContext(record);
+  }
+
+  private JobContext prepareJobContext(String instance) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    JobContext jobContext = new JobContext(record);
+    jobContext.setStartTime(0L);
+    jobContext.setName(JOB_NAME);
+    jobContext.setStartTime(0L);
+    jobContext.setPartitionState(0, TaskPartitionState.RUNNING);
+    jobContext.setPartitionTarget(0, instance);
+    jobContext.setPartitionTarget(0, TARGET_RESOURCES + "_0");
+    return jobContext;
+  }
+
+  private Map<String, IdealState> prepareIdealStates(String instance1, String instance2,
+      String instance3) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    record.setSimpleField(IdealState.IdealStateProperty.NUM_PARTITIONS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.EXTERNAL_VIEW_DISABLED.name(), "true");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "TASK");
+    record.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "Task");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_FACTORY_NAME.name(), "DEFAULT");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.task.JobRebalancer");
+    record.setMapField(JOB_NAME + "_" + PARTITION_NAME, new HashMap<>());
+    record.setListField(JOB_NAME + "_" + PARTITION_NAME, new ArrayList<>());
+    Map<String, IdealState> idealStates = new HashMap<>();
+    idealStates.put(JOB_NAME, new IdealState(record));
+
+    ZNRecord recordDB = new ZNRecord(TARGET_RESOURCES);
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "3");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "FULL_AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO_REBALANCE");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "MasterSlave");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
+    Map<String, String> mapping = new HashMap<>();
+    mapping.put(instance1, "MASTER");
+    mapping.put(instance2, "SLAVE");
+    mapping.put(instance3, "SLAVE");
+    recordDB.setMapField(TARGET_RESOURCES + "_0", mapping);
+    List<String> listField = new ArrayList<>();
+    listField.add(instance1);
+    listField.add(instance2);
+    listField.add(instance3);
+    recordDB.setListField(TARGET_RESOURCES + "_0", listField);
+    idealStates.put(TARGET_RESOURCES, new IdealState(recordDB));
+
+    return idealStates;
+  }
+
+  private CurrentStateOutput prepareCurrentState(String masterInstance, String slaveInstance,
+      String masterState, String slaveState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, slaveInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, slaveInstance, slaveState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, slaveInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private CurrentStateOutput prepareCurrentState2(String masterInstance, String masterState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private ResourceAssignment preparePreviousAssignment(String instance, String state) {
+    ResourceAssignment prevAssignment = new ResourceAssignment(JOB_NAME);
+    Map<String, String> replicaMap = new HashMap<>();
+    replicaMap.put(instance, state);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    prevAssignment.addReplicaMap(taskPartition, replicaMap);
+    return prevAssignment;
+  }
+
+  private class MockTestInformation {
+    private String slaveInstance = INSTANCE_PREFIX + "0";
+    private String masterInstance = INSTANCE_PREFIX + "1";
+    private String slaveInstance2 = INSTANCE_PREFIX + "2";
+    private WorkflowControllerDataProvider cache = mock(WorkflowControllerDataProvider.class);
+    private WorkflowConfig _workflowConfig = prepareWorkflowConfig();
+    private WorkflowContext _workflowContext = prepareWorkflowContext();
+    private Map<String, IdealState> _idealStates =
+        prepareIdealStates(masterInstance, slaveInstance, slaveInstance2);
+    private JobConfig _jobConfig = prepareJobConfig();
+    private JobContext _jobContext = prepareJobContext(slaveInstance);
+    private BestPossibleStateOutput _bestPossibleStateOutput = mock(BestPossibleStateOutput.class);
+    private CurrentStateOutput _currentStateOutput = prepareCurrentState(masterInstance,
+        slaveInstance, TaskPartitionState.RUNNING.name(), TaskPartitionState.RUNNING.name());
+    private CurrentStateOutput _currentStateOutput2 =
+        prepareCurrentState2(masterInstance, TaskPartitionState.RUNNING.name());
+    private ResourceAssignment _resourceAssignment =
+        preparePreviousAssignment(slaveInstance, TaskPartitionState.RUNNING.name());
+    private ResourceAssignment _resourceAssignment2 =
+        preparePreviousAssignment(slaveInstance, TaskPartitionState.DROPPED.name());
+    private TaskDataCache _taskDataCache = mock(TaskDataCache.class);
+    private RuntimeJobDag _runtimeJobDag = mock(RuntimeJobDag.class);
 
 Review comment:
   Good point. Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r401973848
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
 ##########
 @@ -0,0 +1,219 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test to check if targeted tasks correctly get assigned and also if cancel messages are not being
+ * sent when there are two CurrentStates.
+ */
+public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
+  private final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
 
 Review comment:
   "private static final String" for constants?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r402009813
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
 ##########
 @@ -0,0 +1,330 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTargetedTaskStateChange {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_RESOURCES = "TestDB";
+  private static final int NUM_TASKS = 1;
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+    _assignableInstanceManager = new AssignableInstanceManager();
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there are two current states for two
+   * different instances.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Running
+   * CurrentState: Instance0: Running, Instance1: Running
+   * Expected paMap: Instance0 -> Dropped
+   */
+  @Test
+  public void testTwoRunningCurrentStates() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.DROPPED.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "0"));
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there is one current state which is
+   * different from
+   * Previous Assignment information.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Dropped
+   * CurrentState: Instance0: Running
+   * Expected paMap: Instance1 -> Running
+   */
+  @Test
+  public void testOneRunningOneNull() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment2);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput2, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.RUNNING.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "1"));
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    return workflowConfigBuilder.build();
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    jobConfigBuilder.setJobId(JOB_NAME);
+    List<String> targetPartition = new ArrayList<>();
+    targetPartition.add(TARGET_RESOURCES + "_0");
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>();
+    targetPartitionStates.add("MASTER");
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    JobConfig jobConfig = jobConfigBuilder.build();
+    return jobConfig;
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
+    return new WorkflowContext(record);
+  }
+
+  private JobContext prepareJobContext(String instance) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    JobContext jobContext = new JobContext(record);
+    jobContext.setStartTime(0L);
+    jobContext.setName(JOB_NAME);
+    jobContext.setStartTime(0L);
+    jobContext.setPartitionState(0, TaskPartitionState.RUNNING);
+    jobContext.setPartitionTarget(0, instance);
+    jobContext.setPartitionTarget(0, TARGET_RESOURCES + "_0");
+    return jobContext;
+  }
+
+  private Map<String, IdealState> prepareIdealStates(String instance1, String instance2,
+      String instance3) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    record.setSimpleField(IdealState.IdealStateProperty.NUM_PARTITIONS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.EXTERNAL_VIEW_DISABLED.name(), "true");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "TASK");
+    record.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "Task");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_FACTORY_NAME.name(), "DEFAULT");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.task.JobRebalancer");
+    record.setMapField(JOB_NAME + "_" + PARTITION_NAME, new HashMap<>());
+    record.setListField(JOB_NAME + "_" + PARTITION_NAME, new ArrayList<>());
+    Map<String, IdealState> idealStates = new HashMap<>();
+    idealStates.put(JOB_NAME, new IdealState(record));
+
+    ZNRecord recordDB = new ZNRecord(TARGET_RESOURCES);
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "3");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "FULL_AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO_REBALANCE");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "MasterSlave");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
+    Map<String, String> mapping = new HashMap<>();
+    mapping.put(instance1, "MASTER");
+    mapping.put(instance2, "SLAVE");
+    mapping.put(instance3, "SLAVE");
+    recordDB.setMapField(TARGET_RESOURCES + "_0", mapping);
+    List<String> listField = new ArrayList<>();
+    listField.add(instance1);
+    listField.add(instance2);
+    listField.add(instance3);
+    recordDB.setListField(TARGET_RESOURCES + "_0", listField);
+    idealStates.put(TARGET_RESOURCES, new IdealState(recordDB));
+
+    return idealStates;
+  }
+
+  private CurrentStateOutput prepareCurrentState(String masterInstance, String slaveInstance,
+      String masterState, String slaveState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, slaveInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, slaveInstance, slaveState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, slaveInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private CurrentStateOutput prepareCurrentState2(String masterInstance, String masterState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private ResourceAssignment preparePreviousAssignment(String instance, String state) {
+    ResourceAssignment prevAssignment = new ResourceAssignment(JOB_NAME);
+    Map<String, String> replicaMap = new HashMap<>();
+    replicaMap.put(instance, state);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    prevAssignment.addReplicaMap(taskPartition, replicaMap);
+    return prevAssignment;
+  }
+
+  private class MockTestInformation {
+    private String slaveInstance = INSTANCE_PREFIX + "0";
+    private String masterInstance = INSTANCE_PREFIX + "1";
+    private String slaveInstance2 = INSTANCE_PREFIX + "2";
+    private WorkflowControllerDataProvider cache = mock(WorkflowControllerDataProvider.class);
+    private WorkflowConfig _workflowConfig = prepareWorkflowConfig();
+    private WorkflowContext _workflowContext = prepareWorkflowContext();
+    private Map<String, IdealState> _idealStates =
+        prepareIdealStates(masterInstance, slaveInstance, slaveInstance2);
+    private JobConfig _jobConfig = prepareJobConfig();
+    private JobContext _jobContext = prepareJobContext(slaveInstance);
+    private BestPossibleStateOutput _bestPossibleStateOutput = mock(BestPossibleStateOutput.class);
+    private CurrentStateOutput _currentStateOutput = prepareCurrentState(masterInstance,
+        slaveInstance, TaskPartitionState.RUNNING.name(), TaskPartitionState.RUNNING.name());
+    private CurrentStateOutput _currentStateOutput2 =
+        prepareCurrentState2(masterInstance, TaskPartitionState.RUNNING.name());
+    private ResourceAssignment _resourceAssignment =
+        preparePreviousAssignment(slaveInstance, TaskPartitionState.RUNNING.name());
+    private ResourceAssignment _resourceAssignment2 =
+        preparePreviousAssignment(slaveInstance, TaskPartitionState.DROPPED.name());
+    private TaskDataCache _taskDataCache = mock(TaskDataCache.class);
+    private RuntimeJobDag _runtimeJobDag = mock(RuntimeJobDag.class);
 
 Review comment:
   Let's be consistent with underscores here as well? some have them, others don't.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r401279967
 
 

 ##########
 File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
 ##########
 @@ -115,6 +115,12 @@ public void updatePreviousAssignedTasksStatus(
         TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx, jobTgtState);
 
+        if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
+          LOG.warn(String.format(
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r401973848
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
 ##########
 @@ -0,0 +1,219 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test to check if targeted tasks correctly get assigned and also if cancel messages are not being
+ * sent when there are two CurrentStates.
+ */
+public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
+  private final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
 
 Review comment:
   "private static final String" for constants? Or just use DEFAULT_TGT_DB directly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r402011463
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
 ##########
 @@ -0,0 +1,348 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTargetedTaskStateChange {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_RESOURCES = "TestDB";
+  private static final int NUM_TASKS = 1;
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out.println(
+        "START " + this.getClass().getSimpleName() + " at " + new Date(System.currentTimeMillis()));
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+    _assignableInstanceManager = new AssignableInstanceManager();
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there are two current states for two
+   * different instances.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Running
+   * CurrentState: Instance0: Running, Instance1: Running
+   * Expected paMap: Instance0 -> Dropped
+   */
+  @Test
+  public void testTwoRunningCurrentStates() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.DROPPED.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "0"));
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there is one current state which is
+   * different from
+   * Previous Assignment information.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Dropped
+   * CurrentState: Instance0: Running
+   * Expected paMap: Instance1 -> Running
+   */
+  @Test
+  public void testOneRunningOneNull() {
+    MockTestInformation mock = new MockTestInformation();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment2);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput2, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.RUNNING.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "1"));
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    WorkflowConfig workflowConfig = workflowConfigBuilder.build();
+
+    return workflowConfig;
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    jobConfigBuilder.setJobId(JOB_NAME);
+    List<String> targetPartition = new ArrayList<>();
+    targetPartition.add(TARGET_RESOURCES + "_0");
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>();
+    targetPartitionStates.add("MASTER");
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    JobConfig jobConfig = jobConfigBuilder.build();
+    return jobConfig;
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
+    return new WorkflowContext(record);
+  }
+
+  private JobContext prepareJobContext(String instance) {
+    Set<Integer> _taskPartitionSet;
+    Map<Integer, TaskPartitionState> _taskPartitionStateMap;
+    Map<Integer, String> _partitionToTaskIDMap;
+    Map<Integer, String> _taskToInstanceMap;
+    _taskPartitionSet = new HashSet<>();
+    _taskPartitionStateMap = new HashMap<>();
+    _partitionToTaskIDMap = new HashMap<>();
+    _taskToInstanceMap = new HashMap<>();
 
 Review comment:
   I removed them. Realized we do not need them.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r402011353
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
 ##########
 @@ -0,0 +1,219 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test to check if targeted tasks correctly get assigned and also if cancel messages are not being
+ * sent when there are two CurrentStates.
+ */
+public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
+  private final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] pkuwm commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
pkuwm commented on a change in pull request #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r401136759
 
 

 ##########
 File path: helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
 ##########
 @@ -0,0 +1,348 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTargetedTaskStateChange {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_RESOURCES = "TestDB";
+  private static final int NUM_TASKS = 1;
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out.println(
+        "START " + this.getClass().getSimpleName() + " at " + new Date(System.currentTimeMillis()));
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+    _assignableInstanceManager = new AssignableInstanceManager();
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there are two current states for two
+   * different instances.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Running
+   * CurrentState: Instance0: Running, Instance1: Running
+   * Expected paMap: Instance0 -> Dropped
+   */
+  @Test
+  public void testTwoRunningCurrentStates() {
+    Mock mock = new Mock();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.DROPPED.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "0"));
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there is one current state which is
+   * different from
+   * Previous Assignment information.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Dropped
+   * CurrentState: Instance0: Running
+   * Expected paMap: Instance1 -> Running
+   */
+  @Test
+  public void testOneRunningOneNull() {
+    Mock mock = new Mock();
+    when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment2);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput2, bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.RUNNING.name(), bestPossibleStateOutput
+        .getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX + "1"));
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    WorkflowConfig workflowConfig = workflowConfigBuilder.build();
+
+    return workflowConfig;
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    jobConfigBuilder.setJobId(JOB_NAME);
+    List<String> targetPartition = new ArrayList<>();
+    targetPartition.add(TARGET_RESOURCES + "_0");
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>();
+    targetPartitionStates.add("MASTER");
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    JobConfig jobConfig = jobConfigBuilder.build();
+    return jobConfig;
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
+    record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
+    return new WorkflowContext(record);
+  }
+
+  private JobContext prepareJobContext(String instance) {
+    Set<Integer> _taskPartitionSet;
+    Map<Integer, TaskPartitionState> _taskPartitionStateMap;
+    Map<Integer, String> _partitionToTaskIDMap;
+    Map<Integer, String> _taskToInstanceMap;
+    _taskPartitionSet = new HashSet<>();
+    _taskPartitionStateMap = new HashMap<>();
+    _partitionToTaskIDMap = new HashMap<>();
+    _taskToInstanceMap = new HashMap<>();
+
+    _taskPartitionSet.add(0);
+    _taskPartitionStateMap.put(0, TaskPartitionState.RUNNING);
+    _partitionToTaskIDMap.put(0, "0");
+    String someInstance = INSTANCE_PREFIX + 0;
+    _taskToInstanceMap.put(0, someInstance);
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    JobContext jobContext = new JobContext(record);
+    jobContext.setStartTime(0L);
+    jobContext.setName(JOB_NAME);
+    jobContext.setStartTime(0L);
+    jobContext.setPartitionState(0, TaskPartitionState.RUNNING);
+    jobContext.setPartitionTarget(0, instance);
+    jobContext.setPartitionTarget(0, TARGET_RESOURCES + "_0");
+    return jobContext;
+  }
+
+  private Map<String, IdealState> prepareIdealStates(String instance1, String instance2,
+      String instance3) {
+    ZNRecord record = new ZNRecord(JOB_NAME);
+    record.setSimpleField(IdealState.IdealStateProperty.NUM_PARTITIONS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.EXTERNAL_VIEW_DISABLED.name(), "true");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "TASK");
+    record.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "1");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "Task");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_FACTORY_NAME.name(), "DEFAULT");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.task.JobRebalancer");
+    record.setMapField(JOB_NAME + "_" + PARTITION_NAME, new HashMap<>());
+    record.setListField(JOB_NAME + "_" + PARTITION_NAME, new ArrayList<>());
+    Map<String, IdealState> idealStates = new HashMap<>();
+    idealStates.put(JOB_NAME, new IdealState(record));
+
+    ZNRecord recordDB = new ZNRecord(TARGET_RESOURCES);
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "3");
+    recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "FULL_AUTO");
+    record.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(), "AUTO_REBALANCE");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(), "MasterSlave");
+    record.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
+        "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
+    record.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
+        "org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
+    Map<String, String> mapping = new HashMap<>();
+    mapping.put(instance1, "MASTER");
+    mapping.put(instance2, "SLAVE");
+    mapping.put(instance3, "SLAVE");
+    recordDB.setMapField(TARGET_RESOURCES + "_0", mapping);
+    List<String> listField = new ArrayList<>();
+    listField.add(instance1);
+    listField.add(instance2);
+    listField.add(instance3);
+    recordDB.setListField(TARGET_RESOURCES + "_0", listField);
+    idealStates.put(TARGET_RESOURCES, new IdealState(recordDB));
+
+    return idealStates;
+  }
+
+  private CurrentStateOutput prepareCurrentState(String masterInstance, String slaveInstance,
+      String masterState, String slaveState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, slaveInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, slaveInstance, slaveState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, slaveInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private CurrentStateOutput prepareCurrentState2(String masterInstance, String masterState) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
+    currentStateOutput.setBucketSize(JOB_NAME, 0);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    currentStateOutput.setEndTime(JOB_NAME, taskPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance, masterState);
+    currentStateOutput.setInfo(JOB_NAME, taskPartition, masterInstance, "");
+    currentStateOutput.setResourceStateModelDef(TARGET_RESOURCES, "MasterSlave");
+    currentStateOutput.setBucketSize(TARGET_RESOURCES, 0);
+    Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
+    currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
+    currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
+    currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
+    return currentStateOutput;
+  }
+
+  private ResourceAssignment preparePreviousAssignment(String instance, String state) {
+    ResourceAssignment prevAssignment = new ResourceAssignment(JOB_NAME);
+    Map<String, String> replicaMap = new HashMap<>();
+    replicaMap.put(instance, state);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    prevAssignment.addReplicaMap(taskPartition, replicaMap);
+    return prevAssignment;
+  }
+
+  private class Mock {
 
 Review comment:
   Can we have a more descriptive name for this class?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on issue #923: Fix the scheduling decision for multiple currentStates

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on issue #923: Fix the scheduling decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#issuecomment-607460166
 
 
   This PR is ready to be merged, approved by @dasahcc 
   
   FInal commit message:
   Fix the scheduling decision for multiple currentStates and prevAssignment
   
   In this commit, the problem of scheduling and dropping the tasks
   on the slave node has been addressed.
   Multiple tests have been added.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org