You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/11/03 14:06:55 UTC

[GitHub] [helix] alirezazamani commented on a change in pull request #1468: Implement deleteTask API

alirezazamani commented on a change in pull request #1468:
URL: https://github.com/apache/helix/pull/1468#discussion_r516265126



##########
File path: helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
##########
@@ -146,6 +147,20 @@ public ThreadCountBasedTaskAssignmentCalculator(TaskAssigner taskAssigner,
     return taskAssignment;
   }
 
+  @Override
+  public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext jobContext, Set<Integer> allPartitions) {

Review comment:
       I think it is safe to keep it as it is because for other methods the same rule is applied. For example, getAllTaskPartiotions are the same in both classes.

##########
File path: helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
##########
@@ -95,6 +97,20 @@
     return placement.computeMapping(jobCfg, jobContext, partitionNums, resourceId);
   }
 
+  @Override
+  public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext jobContext, Set<Integer> allPartitions) {
+    // Get all partitions existed in the context
+    Set<Integer> deletedPartitions = new HashSet<>();
+    // Check whether the tasks have been deleted from jobConfig
+    for (Integer partition : jobContext.getPartitionSet()) {
+      String partitionID = jobContext.getTaskIdForPartition(partition);
+      if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
+        deletedPartitions.add(partition);
+      }
+    }

Review comment:
       I will sync up with you on this offline.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -585,36 +585,195 @@ public void addTask(String workflowName, String jobName, TaskConfig taskConfig,
 
     long endTime = System.currentTimeMillis() + timeoutMs;
 
-    validateAddTaskConfigs(workflowName, jobName, taskConfig);
+    validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
 
     String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
     WorkflowContext workflowContext = getWorkflowContext(workflowName);
     JobContext jobContext = getJobContext(nameSpaceJobName);
+    // If workflow context or job context is null. It means job has not been started. Hence task can
+    // be added to the job
+    if (workflowContext != null && jobContext != null) {
+      TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+      if (jobState != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+        throw new HelixException("Job " + nameSpaceJobName
+            + " is in illegal state for task addition. Job State is " + jobState);
+      }
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.error("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    updateTaskInJobConfig(workflowName, jobName, updater);
+
+    workflowContext =
+        _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+    jobContext =
+        _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+
     if (workflowContext == null || jobContext == null) {
-      // Workflow context or job context is null. It means job has not been started. Hence task can
-      // be added to the job
-      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
       return;
     }
 
-    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      jobContext =
+          _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+      workflowContext =
+          _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+      if (jobContext.getTaskIdPartitionMap().containsKey(taskID)
+          && workflowContext.getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+        return;
+      }
 
-    if (ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
-      throw new HelixException(
-          String.format("Job %s is in illegal state to accept new task. Job State is %s",
-              nameSpaceJobName, jobState));
+      Thread.sleep(DEFAULT_SLEEP);
+    }
+    throw new TimeoutException("An unexpected issue happened while task being added to the job!");
+  }
+
+  /**
+   * Delete an existing task from a running (IN-PROGRESS) job or a job which has not started yet.
+   * Timeout for this operation is the default timeout which is 5 minutes.
+   * {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be deleted from the job which is in an illegal state. Task can be deleted
+   * from the job if the job is in-progress or it has not started yet.
+   * Note2: The tasks can only be deleted from non-targeted jobs.
+   * Note3: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully deleted or not.
+   * Note4: timeout is the time that this API checks whether the task has been successfully deleted
+   * or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskID
+   * @throws TimeoutException if the outcome of the task deletion is unknown and cannot be verified
+   * @throws IllegalArgumentException if the inputs are invalid
+   * @throws HelixException if the job is not in the states to accept a new task or if there is any
+   *           issue in updating jobConfig.
+   */
+  public void deleteTask(String workflowName, String jobName, String taskID)
+      throws TimeoutException, InterruptedException {
+    deleteTask(workflowName, jobName, taskID, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Delete an existing task from a running (IN-PROGRESS) job or a job which has not started yet.
+   * Note1: Task cannot be deleted from the job which is in an illegal state. Task can be deleted
+   * from the job if the job is in-progress or it has not started yet.
+   * Note2: The tasks can only be deleted from non-targeted jobs.
+   * Note3: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully deleted or not.
+   * Note4: timeout is the time that this API checks whether the task has been successfully deleted
+   * or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskID
+   * @param timeoutMs
+   * @throws TimeoutException if the outcome of the task deletion is unknown and cannot be verified
+   * @throws IllegalArgumentException if the inputs are invalid
+   * @throws HelixException if the job is not in the states to accept a new task or if there is any
+   *           issue in updating jobConfig.
+   */
+  public void deleteTask(String workflowName, String jobName, String taskID, long timeoutMs)
+      throws TimeoutException, InterruptedException {
+    long endTime = System.currentTimeMillis() + timeoutMs;
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = getJobConfig(nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new IllegalArgumentException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    TaskConfig taskConfig = null;
+    Map<String, TaskConfig> allTaskConfigs = jobConfig.getTaskConfigMap();
+    for (Map.Entry<String, TaskConfig> entry : allTaskConfigs.entrySet()) {
+      if (entry.getKey().equals(taskID)) {
+        taskConfig = entry.getValue();
+      }
+    }
+
+    validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    JobContext jobContext = getJobContext(nameSpaceJobName);
+    // If workflow context or job context is null. It means job has not been started. Hence task can
+    // be deleted from the job
+    if (workflowContext != null && jobContext != null) {
+      TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+      if (jobState != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+        throw new HelixException("Job " + nameSpaceJobName
+            + " is in illegal state for task deletion. Job State is " + jobState);
+      }
+    }
+
+    DataUpdater<ZNRecord> taskRemover = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          Map<String, Map<String, String>> taskMap = currentData.getMapFields();
+          if (taskMap == null) {
+            LOG.warn("Could not update the jobConfig: " + jobName + " Znode MapField is null.");
+            return null;
+          }
+          Map<String, Map<String, String>> newTaskMap = new HashMap<String, Map<String, String>>();
+          for (Map.Entry<String, Map<String, String>> entry : taskMap.entrySet()) {
+            if (!entry.getKey().equals(taskID)) {
+              newTaskMap.put(entry.getKey(), entry.getValue());
+            }
+          }
+          currentData.setMapFields(newTaskMap);
+        }
+        return currentData;
+      }
+    };
+
+    updateTaskInJobConfig(workflowName, jobName, taskRemover);
+
+    workflowContext =
+        _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+    jobContext =
+        _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+
+    if (workflowContext == null || jobContext == null) {
+      return;
     }
-    addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+
+    while (System.currentTimeMillis() <= endTime) {
+      jobContext =
+          _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+      workflowContext =
+          _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+      if (!jobContext.getTaskIdPartitionMap().containsKey(taskID)
+          && workflowContext.getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {

Review comment:
       Yeah, I agree. removed the IN_PROGRESS condition.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -585,36 +585,195 @@ public void addTask(String workflowName, String jobName, TaskConfig taskConfig,
 
     long endTime = System.currentTimeMillis() + timeoutMs;
 
-    validateAddTaskConfigs(workflowName, jobName, taskConfig);
+    validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
 
     String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
     WorkflowContext workflowContext = getWorkflowContext(workflowName);
     JobContext jobContext = getJobContext(nameSpaceJobName);
+    // If workflow context or job context is null. It means job has not been started. Hence task can
+    // be added to the job
+    if (workflowContext != null && jobContext != null) {
+      TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+      if (jobState != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+        throw new HelixException("Job " + nameSpaceJobName
+            + " is in illegal state for task addition. Job State is " + jobState);
+      }
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.error("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    updateTaskInJobConfig(workflowName, jobName, updater);
+
+    workflowContext =
+        _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+    jobContext =
+        _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+
     if (workflowContext == null || jobContext == null) {
-      // Workflow context or job context is null. It means job has not been started. Hence task can
-      // be added to the job
-      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
       return;
     }
 
-    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      jobContext =
+          _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName));
+      workflowContext =
+          _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+      if (jobContext.getTaskIdPartitionMap().containsKey(taskID)
+          && workflowContext.getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {

Review comment:
       Added some java docs note. Thanks.

##########
File path: helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
##########
@@ -95,6 +97,20 @@
     return placement.computeMapping(jobCfg, jobContext, partitionNums, resourceId);
   }
 
+  @Override
+  public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext jobContext, Set<Integer> allPartitions) {
+    // Get all partitions existed in the context
+    Set<Integer> deletedPartitions = new HashSet<>();
+    // Check whether the tasks have been deleted from jobConfig
+    for (Integer partition : jobContext.getPartitionSet()) {
+      String partitionID = jobContext.getTaskIdForPartition(partition);
+      if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
+        deletedPartitions.add(partition);
+      }
+    }

Review comment:
       Synced offline. Resolved.

##########
File path: helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
##########
@@ -146,6 +147,20 @@ public ThreadCountBasedTaskAssignmentCalculator(TaskAssigner taskAssigner,
     return taskAssignment;
   }
 
+  @Override
+  public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext jobContext, Set<Integer> allPartitions) {

Review comment:
       As we discussed offline, I will address this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org