You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/04 21:14:43 UTC

[helix] 07/10: Respect Maximum Number Of Attempts for the tasks (#1142)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit d39b4561080066993e04f54e83cba0cbcb9323be
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue Jul 21 13:01:47 2020 -0700

    Respect Maximum Number Of Attempts for the tasks (#1142)
    
    In this commit, several scheduling parts have been changed in order to
    enforce the scheduler to respect maximum number of attempts for
    the tasks.
    
    Also, it has been observed that when a task being dropped and
    scheduled again, max number of attempts is not being respected.
    in this commit, further checks are added to avoid schedule the
    tasks again once we reach its maximum number of attempts.
---
 .../WorkflowControllerDataProvider.java            |  18 +--
 .../apache/helix/task/AbstractTaskDispatcher.java  |  49 +++++--
 .../java/org/apache/helix/task/JobDispatcher.java  |   2 +-
 .../integration/task/TestForceDeleteWorkflow.java  |   6 +-
 .../task/TestMaxNumberOfAttemptsMasterSwitch.java  | 152 +++++++++++++++++++++
 .../helix/integration/task/TestStopWorkflow.java   |   6 +-
 .../helix/task/TestTargetedTaskStateChange.java    |   4 +-
 7 files changed, 205 insertions(+), 32 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 1032417..d5bc11e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -58,7 +58,7 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
 
   // For detecting live instance and target resource partition state change in task assignment
   // Used in AbstractTaskDispatcher
-  private boolean _existsLiveInstanceOrCurrentStateChange = false;
+  private boolean _existsLiveInstanceOrCurrentStateOrMessageChange = false;
 
   public WorkflowControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER);
@@ -71,12 +71,14 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
   }
 
   private void refreshClusterStateChangeFlags(Set<HelixConstants.ChangeType> propertyRefreshed) {
-    // This is for targeted jobs' task assignment. It needs to watch for current state changes for
-    // when targeted resources' state transitions complete
-    _existsLiveInstanceOrCurrentStateChange =
+    // This is for targeted jobs' task assignment. It needs to watch for current state or message
+    // changes for when targeted resources' state transitions complete
+    _existsLiveInstanceOrCurrentStateOrMessageChange =
         // TODO read and update CURRENT_STATE in the BaseControllerDataProvider as well.
-        // This check (and set) is necessary for now since the current state flag in _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
+        // This check (and set) is necessary for now since the current state flag in
+        // _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
         _propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
+            || _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE).getAndSet(false)
             || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
             || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
   }
@@ -119,7 +121,7 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
   }
 
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
-    _existsLiveInstanceOrCurrentStateChange = true;
+    _existsLiveInstanceOrCurrentStateOrMessageChange = true;
     super.setLiveInstances(liveInstances);
   }
 
@@ -257,8 +259,8 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
    * task-assigning in AbstractTaskDispatcher.
    * @return
    */
-  public boolean getExistsLiveInstanceOrCurrentStateChange() {
-    return _existsLiveInstanceOrCurrentStateChange;
+  public boolean getExistsLiveInstanceOrCurrentStateOrMessageChange() {
+    return _existsLiveInstanceOrCurrentStateOrMessageChange;
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index ffbdcef..904ecbd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
@@ -74,6 +75,12 @@ public abstract class AbstractTaskDispatcher {
       Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache,
       Map<String, Set<Integer>> tasksToDrop) {
 
+    // If a job is in one of the following states and its tasks are in RUNNING states, the tasks
+    // will be aborted.
+    Set<TaskState> jobStatesForAbortingTasks =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED));
+
     // Get AssignableInstanceMap for releasing resources for tasks in terminal states
     AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
 
@@ -185,17 +192,11 @@ public abstract class AbstractTaskDispatcher {
         switch (currState) {
         case RUNNING: {
           TaskPartitionState nextState = TaskPartitionState.RUNNING;
-          if (jobState == TaskState.TIMING_OUT) {
+          if (jobStatesForAbortingTasks.contains(jobState)) {
             nextState = TaskPartitionState.TASK_ABORTED;
           } else if (jobTgtState == TargetState.STOP) {
             nextState = TaskPartitionState.STOPPED;
-          } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED
-              || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) {
-            // Drop tasks if parent job is not in progress
-            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
-            break;
           }
-
           paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
           assignedPartitions.get(instance).add(pId);
           if (LOG.isDebugEnabled()) {
@@ -548,8 +549,8 @@ public abstract class AbstractTaskDispatcher {
       Set<Integer> allPartitions, final long currentTime, Collection<String> liveInstances) {
 
     // See if there was LiveInstance change and cache LiveInstances from this iteration of pipeline
-    boolean existsLiveInstanceOrCurrentStateChange =
-        cache.getExistsLiveInstanceOrCurrentStateChange();
+    boolean existsLiveInstanceOrCurrentStateOrMessageChangeChange =
+        cache.getExistsLiveInstanceOrCurrentStateOrMessageChange();
 
     // The excludeSet contains the set of task partitions that must be excluded from consideration
     // when making any new assignments.
@@ -560,7 +561,7 @@ public abstract class AbstractTaskDispatcher {
       excludeSet.addAll(assignedSet);
     }
     addCompletedTasks(excludeSet, jobCtx, allPartitions);
-    addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
+    addPartitionsReachedMaximumRetries(excludeSet, jobCtx, allPartitions, jobCfg);
     excludeSet.addAll(skippedPartitions);
     Set<Integer> partitionsWithDelay = TaskUtil.getNonReadyPartitions(jobCtx, currentTime);
     excludeSet.addAll(partitionsWithDelay);
@@ -576,7 +577,8 @@ public abstract class AbstractTaskDispatcher {
     Set<Integer> partitionsToRetryOnLiveInstanceChangeForTargetedJob = new HashSet<>();
     // If the job is a targeted job, in case of live instance change, we need to assign
     // non-terminal tasks so that they could be re-scheduled
-    if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
+    if (!TaskUtil.isGenericTaskJob(jobCfg)
+        && existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
       // This job is a targeted job, so FixedAssignmentCalculator will be used
       // There has been a live instance change. Must re-add incomplete task partitions to be
       // re-assigned and re-scheduled
@@ -612,7 +614,8 @@ public abstract class AbstractTaskDispatcher {
     }
 
     // If this is a targeted job and if there was a live instance change
-    if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
+    if (!TaskUtil.isGenericTaskJob(jobCfg)
+        && existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
       // Drop current jobs only if they are assigned to a different instance, regardless of
       // the jobCfg.isRebalanceRunningTask() setting
       dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
@@ -745,8 +748,12 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
-  // add all partitions that have been tried maxNumberAttempts
-  protected static void addGiveupPartitions(Set<Integer> set, JobContext ctx,
+  // Add all partitions/tasks that are cannot be retried. These tasks are:
+  // 1- Task is in ABORTED or ERROR state.
+  // 2- Task has just gone to TIMED_OUT, ERROR or DROPPED states and has reached to its
+  // maxNumberAttempts
+  // These tasks determine whether the job needs to FAILED or not.
+  protected static void addGivenUpPartitions(Set<Integer> set, JobContext ctx,
       Iterable<Integer> pIds, JobConfig cfg) {
     for (Integer pId : pIds) {
       if (isTaskGivenup(ctx, cfg, pId)) {
@@ -755,6 +762,17 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
+  // Add all partitions that have reached their maxNumberAttempts. These tasks should not be
+  // considered for scheduling again.
+  protected static void addPartitionsReachedMaximumRetries(Set<Integer> set, JobContext ctx,
+      Iterable<Integer> pIds, JobConfig cfg) {
+    for (Integer pId : pIds) {
+      if (ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask()) {
+        set.add(pId);
+      }
+    }
+  }
+
   private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
       Set<Integer> excluded, Set<Integer> throttled, int n) {
     List<Integer> result = new ArrayList<>();
@@ -829,7 +847,8 @@ public abstract class AbstractTaskDispatcher {
     if (state == TaskPartitionState.TASK_ABORTED || state == TaskPartitionState.ERROR) {
       return true;
     }
-    if (state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ERROR) {
+    if (state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ERROR
+        || state == TaskPartitionState.DROPPED) {
       return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
     }
     return false;
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index c2b724b..b10eb5e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -245,7 +245,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
         jobResource, currStateOutput, jobCtx, jobCfg, jobState, assignedPartitions,
         partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, tasksToDrop);
 
-    addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
+    addGivenUpPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
 
     if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()
         || (jobCfg.getTargetResource() != null
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
index 2a12568..ae724f0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
@@ -304,17 +304,17 @@ public class TestForceDeleteWorkflow extends TaskTestBase {
     //             JOB1 JOB2
 
     JobConfig.Builder jobBuilder0 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setTimeoutPerTask(LONG_TIMEOUT).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setTimeoutPerTask(LONG_TIMEOUT).setWorkflow(workflowName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, executionTime,
             DelayedStopTask.JOB_DELAY_CANCEL, stopDelay));
 
     JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setTimeoutPerTask(LONG_TIMEOUT).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setTimeoutPerTask(LONG_TIMEOUT).setWorkflow(workflowName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, executionTime,
             DelayedStopTask.JOB_DELAY_CANCEL, stopDelay));
 
     JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setTimeoutPerTask(LONG_TIMEOUT).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setTimeoutPerTask(LONG_TIMEOUT).setWorkflow(workflowName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, executionTime,
             DelayedStopTask.JOB_DELAY_CANCEL, stopDelay));
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java
new file mode 100644
index 0000000..8683c3f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java
@@ -0,0 +1,152 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+/**
+ * Test to check is maximum number of attempts being respected while target partition is switching
+ * continuously.
+ */
+public class TestMaxNumberOfAttemptsMasterSwitch extends TaskTestBase {
+  private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  protected HelixDataAccessor _accessor;
+  private List<String> _assignmentList1;
+  private List<String> _assignmentList2;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 3;
+    super.beforeClass();
+    _driver = new TaskDriver(_manager);
+
+    // Assignment1: localhost_12918: Master, localhost_12919:Slave, localhost_12920: Slave
+    _assignmentList1 = new ArrayList<>();
+    _assignmentList1.add(PARTICIPANT_PREFIX + "_" + (_startPort + 0));
+    _assignmentList1.add(PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+    _assignmentList1.add(PARTICIPANT_PREFIX + "_" + (_startPort + 2));
+
+    // Assignment2: localhost_12919: Master, localhost_12918:Slave, localhost_12920: Slave
+    _assignmentList2 = new ArrayList<>();
+    _assignmentList2.add(PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+    _assignmentList2.add(PARTICIPANT_PREFIX + "_" + (_startPort + 0));
+    _assignmentList2.add(PARTICIPANT_PREFIX + "_" + (_startPort + 2));
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    super.afterClass();
+  }
+
+  @Test
+  public void testMaxNumberOfAttemptsMasterSwitch() throws Exception {
+    String jobQueueName = TestHelper.getTestMethodName();
+    int maxNumberOfAttempts = 5;
+    assignCustomizedIdealState(_assignmentList1);
+
+    JobConfig.Builder jobBuilder0 =
+        new JobConfig.Builder().setWorkflow(jobQueueName).setTargetResource(DATABASE)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(maxNumberOfAttempts)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("JOB0", jobBuilder0);
+    String nameSpacedJobName = TaskUtil.getNamespacedJobName(jobQueueName, "JOB0");
+
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, nameSpacedJobName, TaskState.IN_PROGRESS);
+    boolean isAssignmentInIdealState = true;
+
+    // Turn on and off the instance (10 times) and make sure task gets retried and number of
+    // attempts gets incremented every time.
+    // Also make sure that the task won't be retried more than maxNumberOfAttempts
+    for (int i = 1; i <= 2 * maxNumberOfAttempts; i++) {
+      int expectedRetryNumber = Math.min(i, maxNumberOfAttempts);
+      Assert
+          .assertTrue(
+              TestHelper.verify(
+                  () -> (_driver.getJobContext(nameSpacedJobName)
+                      .getPartitionNumAttempts(0) == expectedRetryNumber),
+                  TestHelper.WAIT_DURATION));
+      if (isAssignmentInIdealState) {
+        assignCustomizedIdealState(_assignmentList2);
+        verifyMastership(_assignmentList2);
+        isAssignmentInIdealState = false;
+      } else {
+        assignCustomizedIdealState(_assignmentList1);
+        verifyMastership(_assignmentList1);
+        isAssignmentInIdealState = true;
+      }
+    }
+
+    // Since the task reaches max number of attempts, ths job will fails.
+    _driver.pollForJobState(jobQueueName, nameSpacedJobName, TaskState.FAILED);
+    Assert.assertEquals(_driver.getJobContext(nameSpacedJobName).getPartitionNumAttempts(0),
+        maxNumberOfAttempts);
+  }
+
+  private void assignCustomizedIdealState(List<String> _assignmentList) {
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DATABASE);
+    idealState.setPartitionState(DATABASE + "_0", _assignmentList.get(0), "MASTER");
+    idealState.setPartitionState(DATABASE + "_0", _assignmentList.get(1), "SLAVE");
+    idealState.setPartitionState(DATABASE + "_0", _assignmentList.get(2), "SLAVE");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, DATABASE,
+        idealState);
+  }
+
+  private void verifyMastership(List<String> _assignmentList) throws Exception {
+    String instance = _assignmentList.get(0);
+    boolean isMasterSwitchedToCorrectInstance = TestHelper.verify(() -> {
+      ExternalView externalView =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DATABASE);
+      if (externalView == null) {
+        return false;
+      }
+      Map<String, String> stateMap = externalView.getStateMap(DATABASE + "_0");
+      if (stateMap == null) {
+        return false;
+      }
+      return "MASTER".equals(stateMap.get(instance));
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isMasterSwitchedToCorrectInstance);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
index 25cab50..08dc776 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -58,9 +58,9 @@ public class TestStopWorkflow extends TaskTestBase {
     stopTestSetup(5);
 
     String jobQueueName = TestHelper.getTestMethodName();
-    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setMaxAttemptsPerTask(1).setWorkflow(jobQueueName)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
+    JobConfig.Builder jobBuilder =
+        JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG).setWorkflow(jobQueueName)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
 
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
     jobQueue.enqueueJob("job1_will_succeed", jobBuilder);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index b79dcb9..e849be2 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -93,7 +93,7 @@ public class TestTargetedTaskStateChange {
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
         _liveInstances, _instanceConfigs);
     when(mock._cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
-    when(mock._cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    when(mock._cache.getExistsLiveInstanceOrCurrentStateOrMessageChange()).thenReturn(true);
     Set<String> inflightJobDag = new HashSet<>();
     inflightJobDag.add(JOB_NAME);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
@@ -130,7 +130,7 @@ public class TestTargetedTaskStateChange {
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
         _liveInstances, _instanceConfigs);
     when(mock._cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
-    when(mock._cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    when(mock._cache.getExistsLiveInstanceOrCurrentStateOrMessageChange()).thenReturn(false);
     Set<String> inflightJobDag = new HashSet<>();
     inflightJobDag.add(JOB_NAME);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())