You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/10/06 20:36:18 UTC

[GitHub] [helix] jiajunwang commented on a change in pull request #1439: Implement addTask API

jiajunwang commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500574998



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);

Review comment:
       The default timeout is 5 mins. It is pretty long. Shall we just provide one method that returns immediately and one that waits for a timeout?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =

Review comment:
       Please define this as a private static field of the class.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);

Review comment:
       It could be a trick issue, please verify.
   It is possible that the caller pass accessor and propertyStore backed by different ZkClient. In this case, the accessor is used to update and the propertyStore is used to get context here. Given there might be a propagation latency on ZK server-side, this check here might be invalid. We need to set and read using the same ZkClient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org