You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/04 21:14:44 UTC

[helix] 08/10: Quota calculation based on CurrentState (#1165)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 82c4640d0c2ed79ae69804a28cbaeb1e086d4342
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Fri Jul 24 11:12:28 2020 -0700

    Quota calculation based on CurrentState (#1165)
    
    Calculate quota based on CurrentState
    
    In this commit, the new methods have been added to AssignableInstanceManager
    which allow controller to calculate quota based on CurrentState and pending
    messages.
---
 .../WorkflowControllerDataProvider.java            |  11 --
 .../controller/stages/CurrentStateOutput.java      |  14 +-
 .../stages/task/TaskSchedulingStage.java           |   8 +
 .../helix/task/AssignableInstanceManager.java      | 172 +++++++++++++++++++
 .../helix/integration/task/TestStuckTaskQuota.java | 189 +++++++++++++++++++++
 5 files changed, 382 insertions(+), 12 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index d5bc11e..45e1319 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -92,17 +92,6 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
     // Refresh TaskCache
     _taskDataCache.refresh(accessor, getResourceConfigMap());
 
-    // Refresh AssignableInstanceManager
-    AssignableInstanceManager assignableInstanceManager =
-        _taskDataCache.getAssignableInstanceManager();
-
-    // Build from scratch every time
-    assignableInstanceManager.buildAssignableInstances(getClusterConfig(), _taskDataCache,
-        getLiveInstances(), getInstanceConfigMap());
-
-    // TODO: (Hunter) Consider this for optimization after fixing the problem of quotas not being
-    assignableInstanceManager.logQuotaProfileJSON(false);
-
     long duration = System.currentTimeMillis() - startTime;
     LogUtil.logInfo(logger, getClusterEventId(), String.format(
         "END: WorkflowControllerDataProvider.refresh() for cluster %s, started at %d took %d for %s pipeline",
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index dc82a61..a81fd2c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -273,7 +273,7 @@ public class CurrentStateOutput {
   }
 
   /**
-   * Given resource, returns current state map (parition -> instance -> currentState)
+   * Given resource, returns current state map (partition -> instance -> currentState)
    * @param resourceName
    * @return
    */
@@ -338,6 +338,18 @@ public class CurrentStateOutput {
   }
 
   /**
+   * Given resource, returns pending message map (partition -> instance -> message)
+   * @param resourceName
+   * @return
+   */
+  public Map<Partition, Map<String, Message>> getPendingMessageMap(String resourceName) {
+    if (_pendingMessageMap.containsKey(resourceName)) {
+      return _pendingMessageMap.get(resourceName);
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
    * Get the partitions mapped in the current state
    * @param resourceId resource to look up
    * @return set of mapped partitions, or empty set if there are none
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index 430de4b..5b4d580 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -76,6 +76,14 @@ public class TaskSchedulingStage extends AbstractBaseStage {
           "Missing attributes in event:" + event + ". Requires CURRENT_STATE|RESOURCES|DataCache");
     }
 
+
+    // Build quota capacity based on Current State and Pending Messages
+    cache.getAssignableInstanceManager().buildAssignableInstancesFromCurrentState(
+        cache.getClusterConfig(), cache.getTaskDataCache(), cache.getLiveInstances(), cache.getInstanceConfigMap(),
+        currentStateOutput, resourceMap);
+
+    cache.getAssignableInstanceManager().logQuotaProfileJSON(false);
+
     // Reset current INIT/RUNNING tasks on participants for throttling
     cache.resetActiveTaskCount(currentStateOutput);
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index eb966ac..cca9335 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -27,9 +27,13 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
 import org.apache.helix.task.assigner.AssignableInstance;
 import org.apache.helix.task.assigner.TaskAssignResult;
 import org.codehaus.jackson.JsonNode;
@@ -177,6 +181,174 @@ public class AssignableInstanceManager {
   }
 
   /**
+   * Builds AssignableInstances and restores TaskAssignResults from scratch by reading from
+   * CurrentState. It re-computes current quota profile for each AssignableInstance.
+   * If a task current state is INIT or RUNNING or if there is a pending message which it's ToState
+   * is RUNNING, the task/partition will be assigned to AssignableInstances of the instance.
+   * @param clusterConfig
+   * @param taskDataCache
+   * @param liveInstances
+   * @param instanceConfigs
+   * @param currentStateOutput
+   * @param resourceMap
+   */
+  public void buildAssignableInstancesFromCurrentState(ClusterConfig clusterConfig,
+      TaskDataCache taskDataCache, Map<String, LiveInstance> liveInstances,
+      Map<String, InstanceConfig> instanceConfigs, CurrentStateOutput currentStateOutput,
+      Map<String, Resource> resourceMap) {
+    _assignableInstanceMap.clear();
+    _taskAssignResultMap.clear();
+
+    // Create all AssignableInstance objects based on what's in liveInstances
+    for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) {
+      // Prepare instance-specific metadata
+      String instanceName = liveInstanceEntry.getKey();
+      LiveInstance liveInstance = liveInstanceEntry.getValue();
+      if (!instanceConfigs.containsKey(instanceName)) {
+        continue; // Ill-formatted input; skip over this instance
+      }
+      InstanceConfig instanceConfig = instanceConfigs.get(instanceName);
+
+      // Create an AssignableInstance
+      AssignableInstance assignableInstance =
+          new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
+      _assignableInstanceMap.put(instanceConfig.getInstanceName(), assignableInstance);
+      LOG.debug("AssignableInstance created for instance: {}", instanceName);
+    }
+
+    Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap();
+
+    // Update task profiles by traversing all CurrentStates
+    for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
+      String resourceName = resourceEntry.getKey();
+      if (resourceEntry.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+        JobConfig jobConfig = jobConfigMap.get(resourceName);
+        JobContext jobContext = taskDataCache.getJobContext(resourceName);
+        String quotaType = getQuotaType(jobConfig);
+        Map<Partition, Map<String, String>> currentStateMap =
+            currentStateOutput.getCurrentStateMap(resourceName);
+        for (Map.Entry<Partition, Map<String, String>> currentStateMapEntry : currentStateMap
+            .entrySet()) {
+          Partition partition = currentStateMapEntry.getKey();
+          String taskId = getTaskID(jobConfig, jobContext, partition);
+          for (Map.Entry<String, String> instanceCurrentStateEntry : currentStateMapEntry.getValue()
+              .entrySet()) {
+            String assignedInstance = instanceCurrentStateEntry.getKey();
+            String taskState = instanceCurrentStateEntry.getValue();
+            // If a task in in INIT or RUNNING state on the instance, this task should occupy one
+            // quota from this instance.
+            if (taskState.equals(TaskPartitionState.INIT.name())
+                || taskState.equals(TaskPartitionState.RUNNING.name())) {
+              assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType);
+            }
+          }
+        }
+        Map<Partition, Map<String, Message>> pendingMessageMap =
+            currentStateOutput.getPendingMessageMap(resourceName);
+        for (Map.Entry<Partition, Map<String, Message>> pendingMessageMapEntry : pendingMessageMap
+            .entrySet()) {
+          Partition partition = pendingMessageMapEntry.getKey();
+          String taskId = getTaskID(jobConfig, jobContext, partition);
+          for (Map.Entry<String, Message> instancePendingMessageEntry : pendingMessageMapEntry
+              .getValue().entrySet()) {
+            String assignedInstance = instancePendingMessageEntry.getKey();
+            String messageToState = instancePendingMessageEntry.getValue().getToState();
+            // If there is a pending message on the instance which has ToState of RUNNING, the task
+            // will run on the instance soon. So the task needs to occupy one quota on this instance.
+            if (messageToState.equals(TaskPartitionState.RUNNING.name())
+                && !TaskPartitionState.INIT.name().equals(
+                    currentStateOutput.getCurrentState(resourceName, partition, assignedInstance))
+                && !TaskPartitionState.RUNNING.name().equals(currentStateOutput
+                    .getCurrentState(resourceName, partition, assignedInstance))) {
+              assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType);
+            }
+          }
+        }
+      }
+    }
+    LOG.info(
+        "AssignableInstanceManager built AssignableInstances from scratch based on contexts in TaskDataCache due to Controller switch or ClusterConfig change.");
+    computeGlobalThreadBasedCapacity();
+  }
+
+  /**
+   * Assign the task to the instance's Assignable Instance
+   * @param instance
+   * @param jobConfig
+   * @param taskId
+   * @param quotaType
+   */
+  private void assignTaskToInstance(String instance, JobConfig jobConfig, String taskId,
+      String quotaType) {
+    if (_assignableInstanceMap.containsKey(instance)) {
+      TaskConfig taskConfig = getTaskConfig(jobConfig, taskId);
+      AssignableInstance assignableInstance = _assignableInstanceMap.get(instance);
+      TaskAssignResult taskAssignResult =
+          assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType);
+      if (taskAssignResult.isSuccessful()) {
+        _taskAssignResultMap.put(taskId, taskAssignResult);
+        LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId,
+            instance);
+      }
+    } else {
+      LOG.debug(
+          "While building AssignableInstance map, discovered that the instance a task is assigned to is no "
+              + "longer a LiveInstance! TaskAssignResult will not be created and no resource will be taken "
+              + "up for this task. TaskId: {}, Instance: {}",
+          taskId, instance);
+    }
+  }
+
+  /**
+   * Extract the quota type information of the Job
+   * @param jobConfig
+   * @return
+   */
+  private String getQuotaType(JobConfig jobConfig) {
+    // If jobConfig is null (job has been deleted but participant has not dropped the task yet), use
+    // default quota for the task
+    if (jobConfig == null || jobConfig.getJobType() == null) {
+      return AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }
+    return jobConfig.getJobType();
+  }
+
+  /**
+   * Calculate the TaskID based on the JobConfig and JobContext information
+   * @param jobConfig
+   * @param jobContext
+   * @param partition
+   * @return
+   */
+  private String getTaskID(JobConfig jobConfig, JobContext jobContext, Partition partition) {
+    if (jobConfig == null || jobContext == null) {
+      // If JobConfig or JobContext is null, use the partition name
+      return partition.getPartitionName();
+    }
+    int taskIndex = TaskUtil.getPartitionId(partition.getPartitionName());
+    String taskId = jobContext.getTaskIdForPartition(taskIndex);
+    if (taskId == null) {
+      // For targeted tasks, taskId will be null
+      // We instead use pName (see FixedTargetTaskAssignmentCalculator)
+      taskId = String.format("%s_%s", jobConfig.getJobId(), taskIndex);
+    }
+    return taskId;
+  }
+
+  /**
+   * A method that return the task config a task based on the JonConfig information
+   * @param jobConfig
+   * @param taskId
+   * @return
+   */
+  private TaskConfig getTaskConfig (JobConfig jobConfig, String taskId) {
+    if (jobConfig == null){
+      return new TaskConfig(null, null, taskId, null);
+    }
+    return jobConfig.getTaskConfig(taskId);
+  }
+
+  /**
    * Updates AssignableInstances when there are changes in LiveInstances or InstanceConfig. This
    * update only keeps an up-to-date count of AssignableInstances and does NOT re-build tasks
    * (because it's costly).
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
new file mode 100644
index 0000000..a118c6b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
@@ -0,0 +1,189 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+
+public class TestStuckTaskQuota extends TaskTestBase {
+  private CountDownLatch latch = new CountDownLatch(1);
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numNodes = 2;
+    super.beforeClass();
+
+    // Stop participants that have been started in super class
+    for (int i = 0; i < _numNodes; i++) {
+      super.stopParticipant(i);
+      Assert.assertFalse(_participants[i].isConnected());
+    }
+    _participants = new MockParticipantManager[_numNodes];
+
+    // Start first participant
+    startParticipantAndRegisterNewMockTask(0);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    super.afterClass();
+  }
+
+  @Test
+  public void testStuckTaskQuota() throws Exception {
+    String workflowName1 = TestHelper.getTestMethodName() + "_1";
+    String workflowName2 = TestHelper.getTestMethodName() + "_2";
+    String workflowName3 = TestHelper.getTestMethodName() + "_3";
+    String jobName = "JOB0";
+    JobConfig.Builder jobBuilder1 =
+        new JobConfig.Builder().setWorkflow(workflowName1).setNumberOfTasks(40)
+            .setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    JobConfig.Builder jobBuilder2 = new JobConfig.Builder().setWorkflow(workflowName2)
+        .setNumberOfTasks(1).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    JobConfig.Builder jobBuilder3 = new JobConfig.Builder().setWorkflow(workflowName3)
+        .setNumberOfTasks(1).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1);
+    Workflow.Builder workflowBuilder2 =
+        new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder2);
+    Workflow.Builder workflowBuilder3 =
+        new Workflow.Builder(workflowName3).addJob(jobName, jobBuilder3);
+
+    _driver.start(workflowBuilder1.build());
+
+    // Make sure the JOB0 of workflow1 is started and all of the tasks are assigned to the
+    // participant 0
+    _driver.pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName),
+        TaskState.IN_PROGRESS);
+
+    String participant0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+    for (int i = 0; i < 40; i++) {
+      int finalI = i;
+      Assert.assertTrue(TestHelper.verify(() -> (TaskPartitionState.RUNNING
+          .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName1, jobName))
+              .getPartitionState(finalI))
+          && participant0
+              .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName1, jobName))
+                  .getAssignedParticipant(finalI))),
+          TestHelper.WAIT_DURATION));
+    }
+
+    // Start the second participant
+    startParticipantAndRegisterNewMockTask(1);
+
+    _driver.start(workflowBuilder2.build());
+    // Make sure the JOB0 of workflow2 is started and the only task of this job is assigned to
+    // participant1
+    _driver.pollForJobState(workflowName2, TaskUtil.getNamespacedJobName(workflowName2, jobName),
+        TaskState.IN_PROGRESS);
+    String participant1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1);
+    Assert.assertTrue(TestHelper.verify(() -> (TaskPartitionState.RUNNING.equals(_driver
+        .getJobContext(TaskUtil.getNamespacedJobName(workflowName2, jobName)).getPartitionState(0))
+        && participant1
+            .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName2, jobName))
+                .getAssignedParticipant(0))),
+        TestHelper.WAIT_DURATION));
+
+    // Delete the workflow1
+    _driver.delete(workflowName1);
+
+    // Since the tasks will be stuck for workflow1 after the deletion, the participant 0 is out of
+    // capacity. Hence, the new tasks should be assigned to participant 1
+    _driver.start(workflowBuilder3.build());
+
+    // Make sure the JOB0 of workflow3 is started and the only task of this job is assigned to
+    // participant1
+    _driver.pollForJobState(workflowName3, TaskUtil.getNamespacedJobName(workflowName3, jobName),
+        TaskState.IN_PROGRESS);
+
+    Assert.assertTrue(TestHelper
+        .verify(() -> (TaskPartitionState.RUNNING
+            .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName3, jobName))
+                .getPartitionState(0))),
+            TestHelper.WAIT_DURATION)
+        && participant1
+            .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName3, jobName))
+                .getAssignedParticipant(0)));
+    latch.countDown();
+    // Stop the workflow2 and workflow3
+    _driver.waitToStop(workflowName2, 5000L);
+    _driver.waitToStop(workflowName3, 5000L);
+  }
+
+  private void startParticipantAndRegisterNewMockTask(int participantIndex) {
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    taskFactoryReg.put(NewMockTask.TASK_COMMAND, NewMockTask::new);
+    String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + participantIndex);
+    _participants[participantIndex] =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+    // Register a Task state model factory.
+    StateMachineEngine stateMachine = _participants[participantIndex].getStateMachineEngine();
+    stateMachine.registerStateModelFactory("Task",
+        new TaskStateModelFactory(_participants[participantIndex], taskFactoryReg));
+    _participants[participantIndex].syncStart();
+  }
+
+  /**
+   * A mock task that extents MockTask class to count the number of cancel messages.
+   */
+  private class NewMockTask extends MockTask {
+
+    NewMockTask(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public void cancel() {
+      try {
+        latch.await();
+      } catch (Exception e) {
+        // Pass
+      }
+      super.cancel();
+    }
+  }
+}