You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/07/23 20:42:18 UTC

[GitHub] [helix] dasahcc commented on a change in pull request #1165: Quota calculation based on CurrentState

dasahcc commented on a change in pull request #1165:
URL: https://github.com/apache/helix/pull/1165#discussion_r459715342



##########
File path: helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
##########
@@ -176,6 +181,178 @@ public void buildAssignableInstances(ClusterConfig clusterConfig, TaskDataCache
     computeGlobalThreadBasedCapacity();
   }
 
+  /**
+   * Builds AssignableInstances and restores TaskAssignResults from scratch by reading from
+   * CurrentState. It re-computes current quota profile for each AssignableInstance.
+   * If a task current state is INIT or RUNNING or if there is a pending message which it's ToState
+   * is RUNNING, the task/partition will be assigned to AssignableInstances of the instance.
+   * @param clusterConfig
+   * @param taskDataCache
+   * @param liveInstances
+   * @param instanceConfigs
+   * @param currentStateOutput
+   * @param resourceMap
+   */
+  public void buildAssignableInstancesFromCurrentState(ClusterConfig clusterConfig,
+      TaskDataCache taskDataCache, Map<String, LiveInstance> liveInstances,
+      Map<String, InstanceConfig> instanceConfigs, CurrentStateOutput currentStateOutput,
+      Map<String, Resource> resourceMap) {
+    _assignableInstanceMap.clear();
+    _taskAssignResultMap.clear();
+
+    // Create all AssignableInstance objects based on what's in liveInstances
+    for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) {
+      // Prepare instance-specific metadata
+      String instanceName = liveInstanceEntry.getKey();
+      LiveInstance liveInstance = liveInstanceEntry.getValue();
+      if (!instanceConfigs.containsKey(instanceName)) {
+        continue; // Ill-formatted input; skip over this instance
+      }
+      InstanceConfig instanceConfig = instanceConfigs.get(instanceName);
+
+      // Create an AssignableInstance
+      AssignableInstance assignableInstance =
+          new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
+      _assignableInstanceMap.put(instanceConfig.getInstanceName(), assignableInstance);
+      LOG.debug("AssignableInstance created for instance: {}", instanceName);
+    }
+
+    Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap();
+
+    // Update task profiles by traversing all CurrentStates
+    for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
+      String resourceName = resourceEntry.getKey();
+      if (resourceEntry.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+        JobConfig jobConfig = jobConfigMap.get(resourceName);
+        JobContext jobContext = taskDataCache.getJobContext(resourceName);
+        String quotaType = getQuotaType(jobConfig);
+        Map<Partition, Map<String, String>> currentStateMap =
+            currentStateOutput.getCurrentStateMap(resourceName);
+        for (Map.Entry<Partition, Map<String, String>> currentStateMapEntry : currentStateMap
+            .entrySet()) {
+          Partition partition = currentStateMapEntry.getKey();
+          String taskId = getTaskID(jobConfig, jobContext, partition);
+          for (Map.Entry<String, String> instanceCurrentStateEntry : currentStateMapEntry.getValue()
+              .entrySet()) {
+            String assignedInstance = instanceCurrentStateEntry.getKey();
+            String taskState = instanceCurrentStateEntry.getValue();
+            // If a task in in INIT or RUNNING state on the instance, this task should occupy one
+            // quota from this instance.
+            if (taskState.equals(TaskPartitionState.INIT.name())
+                || taskState.equals(TaskPartitionState.RUNNING.name())) {
+              assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType);
+            }
+          }
+        }
+        Map<Partition, Map<String, Message>> pendingMessageMap =
+            currentStateOutput.getPendingMessageMap(resourceName);
+        for (Map.Entry<Partition, Map<String, Message>> pendingMessageMapEntry : pendingMessageMap
+            .entrySet()) {
+          Partition partition = pendingMessageMapEntry.getKey();
+          String taskId = getTaskID(jobConfig, jobContext, partition);
+          for (Map.Entry<String, Message> instancePendingMessageEntry : pendingMessageMapEntry
+              .getValue().entrySet()) {
+            String assignedInstance = instancePendingMessageEntry.getKey();
+            String messageToState = instancePendingMessageEntry.getValue().getToState();
+            // If there is a pending message on the instance which has ToState of RUNNING, the task
+            // will run on the instance soon. So the task needs to occupy one quota on this instance.
+            if (messageToState.equals(TaskPartitionState.RUNNING.name())
+                && !TaskPartitionState.INIT.name().equals(
+                    currentStateOutput.getCurrentState(resourceName, partition, assignedInstance))
+                && !TaskPartitionState.RUNNING.name().equals(currentStateOutput
+                    .getCurrentState(resourceName, partition, assignedInstance))) {
+              assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType);
+            }
+          }
+        }
+      }
+    }
+    LOG.info(
+        "AssignableInstanceManager built AssignableInstances from scratch based on contexts in TaskDataCache due to Controller switch or ClusterConfig change.");
+    computeGlobalThreadBasedCapacity();
+  }
+
+  /**
+   * Assign the task to the instance's Assignable Instance
+   * @param instance
+   * @param jobConfig
+   * @param taskId
+   * @param quotaType
+   */
+  private void assignTaskToInstance(String instance, JobConfig jobConfig, String taskId,
+      String quotaType) {
+    if (_assignableInstanceMap.containsKey(instance)) {
+      TaskConfig taskConfig = getTaskConfig(jobConfig, taskId);
+      AssignableInstance assignableInstance = _assignableInstanceMap.get(instance);
+      TaskAssignResult taskAssignResult =
+          assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType);
+      if (taskAssignResult.isSuccessful()) {
+        _taskAssignResultMap.put(taskId, taskAssignResult);
+        LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId,
+            instance);
+      }
+    } else {
+      LOG.debug(
+          "While building AssignableInstance map, discovered that the instance a task is assigned to is no "
+              + "longer a LiveInstance! TaskAssignResult will not be created and no resource will be taken "
+              + "up for this task. TaskId: {}, Instance: {}",
+          taskId, instance);
+    }
+  }
+
+  /**
+   * Extract the quota type information of the Job
+   * @param jobConfig
+   * @return
+   */
+  private String getQuotaType(JobConfig jobConfig) {
+    // If jobConfig is null (job has been deleted but participant has not dropped the task yet), use
+    // default quota for the task
+    if (jobConfig == null) {
+      return AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }
+    String quotaType = jobConfig.getJobType();
+    if (quotaType == null) {
+      quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }

Review comment:
       Let's combine it:
   
   if (jobConfig == null || jobConfig.getJobType() == null) {
    return AssignableInstance.DEFAULT_QUOTA_TYPE;
   }
   return jobConfig.getJobType();
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org