You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/10/22 00:58:34 UTC

[GitHub] [helix] jiajunwang commented on a change in pull request #1468: Implement deleteTask API

jiajunwang commented on a change in pull request #1468:
URL: https://github.com/apache/helix/pull/1468#discussion_r509802673



##########
File path: helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
##########
@@ -169,17 +170,16 @@
         "/{clusterName}/PROPERTYSTORE/TaskRebalancer/{workflowName}/Context"); // Old
     // WorkflowContext
     // path
-    addEntry(PropertyType.TASK_CONFIG_ROOT, 1, "/{clusterName}/CONFIGS/TASK");
-    addEntry(PropertyType.WORKFLOW_CONFIG, 3,
-        "/{clusterName}/CONFIGS/TASK/{workflowName}/{workflowName}");
-    addEntry(PropertyType.JOB_CONFIG, 4,
-        "/{clusterName}/CONFIGS/TASK/{workflowName}/{jobName}/{jobName}");
+    addEntry(PropertyType.TASK_CONFIG_ROOT, 1, "/{clusterName}/CONFIGS/RESOURCE");

Review comment:
       I think we introduced these "new" path patterns for the purpose of refactoring TF structure. While I believe this is still the future design, I agree that we want to make them work first. Can we add a TODO here?

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobContext.java
##########
@@ -313,4 +313,12 @@ public long getExecutionStartTime() {
     }
     return map;
   }
+
+  /**
+   * Remove the task from the map field of job context.
+   * @param taskNumber
+   */
+  public void removeTask(int taskNumber) {

Review comment:
       nit, taskSeqNumber?

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobContext.java
##########
@@ -313,4 +313,12 @@ public long getExecutionStartTime() {
     }
     return map;
   }
+
+  /**
+   * Remove the task from the map field of job context.
+   * @param taskNumber
+   */
+  public void removeTask(int taskNumber) {

Review comment:
       Moreover, according to the naming convention of this class, I guess we are using "partition" instead of "task".

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -699,6 +798,73 @@ private void addTaskToJobConfig(String workflowName, String jobName, TaskConfig
     throw new TimeoutException("An unexpected issue happened while task being added to the job!");
   }
 
+  /**
+   * A helper method which deletes an existing task from the job config and verifies if task is
+   * deleted from the context by the controller.
+   * @param workflowName
+   * @param jobName
+   * @param taskID
+   * @param endTime
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  private void deleteTaskFromJobConfig(String workflowName, String jobName, String taskID,

Review comment:
       I haven't reviewed these code completely. But my guess is that this method is different from addTaskToJobConfig() for only the updater part. All the rest are the same. Could you please merge?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -607,14 +615,101 @@ public void addTask(String workflowName, String jobName, TaskConfig taskConfig,
     addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
   }
 
+  /**
+   * Delete an existing task from a running (IN-PROGRESS) job or a job which has not started yet.
+   * Timeout for this operation is the default timeout which is 5 minutes.
+   * {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be deleted from the job which is in an illegal state. Task can be deleted
+   * from the job if the job is in-progress or it has not started yet.
+   * Note2: The tasks can only be deleted from non-targeted jobs.
+   * Note3: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully deleted or not.
+   * Note4: timeout is the time that this API checks whether the task has been successfully deleted
+   * or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskID
+   * @throws TimeoutException if the outcome of the task deletion is unknown and cannot be verified
+   * @throws IllegalArgumentException if the inputs are invalid
+   * @throws HelixException if the job is not in the states to accept a new task or if there is any
+   *           issue in updating jobConfig.
+   */
+  public void deleteTask(String workflowName, String jobName, String taskID)
+      throws TimeoutException, InterruptedException {
+    deleteTask(workflowName, jobName, taskID, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Delete an existing task from a running (IN-PROGRESS) job or a job which has not started yet.
+   * Note1: Task cannot be deleted from the job which is in an illegal state. Task can be deleted
+   * from the job if the job is in-progress or it has not started yet.
+   * Note2: The tasks can only be deleted from non-targeted jobs.
+   * Note3: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully deleted or not.
+   * Note4: timeout is the time that this API checks whether the task has been successfully deleted
+   * or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskID
+   * @param timeoutMs
+   * @throws TimeoutException if the outcome of the task deletion is unknown and cannot be verified
+   * @throws IllegalArgumentException if the inputs are invalid
+   * @throws HelixException if the job is not in the states to accept a new task or if there is any
+   *           issue in updating jobConfig.
+   */
+  public void deleteTask(String workflowName, String jobName, String taskID, long timeoutMs)
+      throws TimeoutException, InterruptedException {
+    long endTime = System.currentTimeMillis() + timeoutMs;
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = getJobConfig(nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new IllegalArgumentException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    TaskConfig taskConfig = null;
+    Map<String, TaskConfig> allTaskConfigs = jobConfig.getTaskConfigMap();
+    for (Map.Entry<String, TaskConfig> entry : allTaskConfigs.entrySet()) {
+      if (entry.getKey().equals(taskID)) {
+        taskConfig = entry.getValue();
+      }
+    }
+
+    validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);

Review comment:
       Simple code is what I like : ) It also applies to addTask().
   
   ```
   if (workflowContext != null && jobContext != null (maybe remove the 2nd condition?)) {
    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
     if (jobState != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
         throw new HelixException("Job " + nameSpaceJobName
             + " is in illegal state for task deletion. Job State is " + jobState);
     }
   }
   deleteTaskFromJobConfig(workflowName, jobName, taskID, endTime);
   ```

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -238,6 +238,10 @@ private ResourceAssignment computeResourceMapping(String jobResource,
 
     updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);
 
+    // Find the tasks that have been removed form the config, add them to TasksToDrop
+    handleDeletedTasks(jobResource, jobCfg, jobCtx, currentInstanceToTaskAssignments, tasksToDrop,

Review comment:
       I think the calculation should be done in the taskAssignmentCal, so that allPartitions won't contains the removed tasks at all. Then the following logic is simple and we only match the current assignment with the expected task list. Anything mismatch, then we add or remove accordingly.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -478,4 +482,61 @@ private TaskAssignmentCalculator getAssignmentCalculator(JobConfig jobConfig,
     }
     return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
   }
+
+  /**
+   * Find the tasks that have been removed from job config, ass them to tasksToDrop. If task's

Review comment:
       ass => add




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org