You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/10/06 06:37:34 UTC

[GitHub] [helix] alirezazamani opened a new pull request #1439: Implement addTask API

alirezazamani opened a new pull request #1439:
URL: https://github.com/apache/helix/pull/1439


   ### Issues
   
   - [X] My PR addresses the following Helix issues and references them in the PR description:
   Fixes #1437 
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI changes:
   In this PR, addTask API has been implemented which adds a new task to running (IN-PROGRESS) jobs.
   
   
   ### Tests
   
   - [x] The following tests are written for this issue:
   TestAddTask
   
   - [x] The following is the result of the "mvn test" command on the appropriate module:
   ```
   [INFO] Tests run: 1218, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4,345.519 s - in TestSuite
   [INFO] 
   [INFO] Results:
   [INFO] 
   [INFO] Tests run: 1218, Failures: 0, Errors: 0, Skipped: 0
   [INFO] 
   [INFO] ------------------------------------------------------------------------
   [INFO] BUILD SUCCESS
   [INFO] ------------------------------------------------------------------------
   [INFO] Total time:  01:12 h
   [INFO] Finished at: 2020-10-05T17:21:53-07:00
   [INFO] ------------------------------------------------------------------------
   ```
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r504190982



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +533,185 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig, long timeoutMs)
+      throws Exception {
+
+    if (timeoutMs < DEFAULT_SLEEP) {
+      throw new IllegalArgumentException(
+          String.format("Timeout is less than the minimum acceptable timeout value which is %s ms",
+              DEFAULT_SLEEP));
+    }
+
+    long endTime = System.currentTimeMillis() + timeoutMs;
+
+    validateAddTaskConfigs(workflowName, jobName, taskConfig);
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    JobContext jobContext = getJobContext(nameSpaceJobName);
+    if (workflowContext == null || jobContext == null) {
+      // Workflow context or job context is null. It means job has not been started. Hence task can
+      // be added to the job
+      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+      return;
+    }
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      // Null job state means the job has not started yet
+      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+      return;
+    }
+
+    if (ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+      throw new HelixException(
+          String.format("Job %s is in illegal state to accept new task. Job State is %s",
+              nameSpaceJobName, jobState));
+    }
+    addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+  }
+
+  /**
+   * The helper method which check the workflow, job and task configs to determine if new task can
+   * be added to the job
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   */
+  private void validateAddTaskConfigs(String workflowName, String jobName, TaskConfig taskConfig) {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+
+    if (workflowConfig == null) {
+      throw new IllegalArgumentException(
+          String.format("Workflow config for workflow %s does not exist!", workflowName));
+    }
+
+    if (jobConfig == null) {
+      throw new IllegalArgumentException(
+          String.format("Job config for job %s does not exist!", nameSpaceJobName));
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("TaskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException("Task cannot be added because taskID is null!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(String.format(
+          "Job %s is a targeted job. New task cannot be added to this job!", nameSpaceJobName));
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {

Review comment:
       Good suggestion. Changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500574998



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);

Review comment:
       The default timeout is 5 mins. It is pretty long. Shall we just provide one method that returns immediately and one that waits for a timeout?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =

Review comment:
       Please define this as a private static field of the class.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);

Review comment:
       It could be a trick issue, please verify.
   It is possible that the caller pass accessor and propertyStore backed by different ZkClient. In this case, the accessor is used to update and the propertyStore is used to get context here. Given there might be a propagation latency on ZK server-side, this check here might be invalid. We need to set and read using the same ZkClient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r501289249



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);

Review comment:
       I think the agreement in the design doc was to have a timeout to check if the task has been added to not. Also, we are exposing another API with a non-default timeout.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500473048



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+

Review comment:
       Can we wrap this logic to a separate (private) method, such as validateTaskConfig(). Also, you may want to put this check at the beginning of the method (to avoid non-necessary ZK read if the input itself is not valid).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r504190399



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +533,185 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws Exception if there is an issue with the request or the operation. 1-

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r501355404



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);

Review comment:
       I don't think it is the case here. Here is how we initialize TaskDriver
   ```
     public TaskDriver(HelixManager manager) {
       this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(),
           manager.getHelixPropertyStore(), manager.getClusterName());
     }
   ```
   all of the input parameters are backed by the same manager.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r502645764



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);

Review comment:
       However, the following public constructor is not in this case.
   
     public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor,
         HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500474062



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {

Review comment:
       Could the JobState be null when the job was just created (has not be scheduled by the controller)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r501288753



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500474415



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");

Review comment:
       LOG.ERROR?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r501222449



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);
+      for (Map.Entry<String, Integer> entry : jobContext.getTaskIdPartitionMap().entrySet()) {
+        if (entry.getKey().equals(taskID) && getWorkflowContext(workflowName)
+            .getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+          return;
+        }
+      }
+      Thread.sleep(1000L);
+    }
+    throw new HelixException("An unexpected issue happened while task being added to the job!");

Review comment:
       Changed it to timeout exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r501213585



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);
+      for (Map.Entry<String, Integer> entry : jobContext.getTaskIdPartitionMap().entrySet()) {
+        if (entry.getKey().equals(taskID) && getWorkflowContext(workflowName)
+            .getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+          return;
+        }
+      }
+      Thread.sleep(1000L);

Review comment:
       This is the behaviour we are using in the other functions of the TaskDriver. I think we can stick to the same behaviour here as well. Let me know your thoughts.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r502677457



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +530,163 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or the job which is not started yet. Timeout for this
+   * operation is default timeout

Review comment:
       Done.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -77,6 +76,11 @@
   /** Default time out for monitoring workflow or job state */
   private final static int DEFAULT_TIMEOUT = 5 * 60 * 1000; /* 5 mins */
 
+  /** The illegal job states for the jobs to accept new task */
+  private static Set<TaskState> illegalJobStatesForTaskAddition = new HashSet<>(

Review comment:
       Done.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r502646181



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -77,6 +76,11 @@
   /** Default time out for monitoring workflow or job state */
   private final static int DEFAULT_TIMEOUT = 5 * 60 * 1000; /* 5 mins */
 
+  /** The illegal job states for the jobs to accept new task */
+  private static Set<TaskState> illegalJobStatesForTaskAddition = new HashSet<>(

Review comment:
       nit, private final static... please
   Also, in this case, the var name should be ILLEGAL_JOB_STATES_FOR_TASK_ADDITION (or TASKS_MODIFICATION?)

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +530,163 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or the job which is not started yet. Timeout for this
+   * operation is default timeout

Review comment:
       "default timeout" is not clear enough.
   Please mention the real timeout here. Maybe use java doc feature to link with the variable would be a good idea. But I'm not sure if it works fine. please have a try.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +530,163 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or the job which is not started yet. Timeout for this
+   * operation is default timeout
+   * Note1: Task cannot be added if the job is in an illegal state. The states that job can accept
+   * new task is if the job is in progress or the job has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws and exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check whether the task has
+   * been successfully added ot not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or the job which is not started yet
+   * Note1: Task may cannot be added if the job is in an illegal state. The states that job can
+   * accept new task is if the job is in progress or the job has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws and exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check whether the task has
+   * been successfully added ot not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeout

Review comment:
       unit

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);
+      for (Map.Entry<String, Integer> entry : jobContext.getTaskIdPartitionMap().entrySet()) {
+        if (entry.getKey().equals(taskID) && getWorkflowContext(workflowName)
+            .getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+          return;
+        }
+      }
+      Thread.sleep(1000L);

Review comment:
       If there are other places, then please don't hardcode the time.
   In addition, IMO, the timeout less than 1000ms does not make sense, so we shall reject the request. Obviously, we shall mention in the public API that timeout less than 1000ms means no wait, or just being rejected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500469860



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout

Review comment:
       Add a task to a job, if a job is already completed, this operation has no effect?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r501212264



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout

Review comment:
       Added not started also. Yes if it is completed, then the add operation will be failed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r501223478



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {

Review comment:
       Added the ability to add a task to a job that is not started. If the job is not started or context is null, we can add tasks to the job.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r502678253



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);
+      for (Map.Entry<String, Integer> entry : jobContext.getTaskIdPartitionMap().entrySet()) {
+        if (entry.getKey().equals(taskID) && getWorkflowContext(workflowName)
+            .getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+          return;
+        }
+      }
+      Thread.sleep(1000L);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500471844



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");

Review comment:
       If the workflow has not started, the context does not exist, right?  But do we allow them to add a task to newly created workflow/job?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r504190322



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +533,177 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check whether the task has
+   * been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check whether the task has
+   * been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws Exception

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500479505



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);
+      for (Map.Entry<String, Integer> entry : jobContext.getTaskIdPartitionMap().entrySet()) {
+        if (entry.getKey().equals(taskID) && getWorkflowContext(workflowName)
+            .getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+          return;
+        }
+      }
+      Thread.sleep(1000L);
+    }
+    throw new HelixException("An unexpected issue happened while task being added to the job!");

Review comment:
       Should we throw a specific TimeoutException instead of a general exception. What is the caller expected to do if it is timeout, call another method to verify whether the task has been successfully added, or retry the addTask()?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500469320



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout

Review comment:
       Not necessarily a running job, people can add task to NOT_STARTED jobs too, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on pull request #1439:
URL: https://github.com/apache/helix/pull/1439#issuecomment-707985239


   This PR is ready to be merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500476813



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);
+      for (Map.Entry<String, Integer> entry : jobContext.getTaskIdPartitionMap().entrySet()) {
+        if (entry.getKey().equals(taskID) && getWorkflowContext(workflowName)
+            .getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+          return;
+        }
+      }
+      Thread.sleep(1000L);

Review comment:
       What will happen if timeout is less than 1000ms?  You may want to adjust the sleep time here based on the given timeout?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r501212516



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");

Review comment:
       Yes. Added this behavior.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+

Review comment:
       Done.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null.");

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r501212339



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r502677255



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +530,163 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or the job which is not started yet. Timeout for this
+   * operation is default timeout
+   * Note1: Task cannot be added if the job is in an illegal state. The states that job can accept
+   * new task is if the job is in progress or the job has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws and exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check whether the task has
+   * been successfully added ot not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or the job which is not started yet
+   * Note1: Task may cannot be added if the job is in an illegal state. The states that job can
+   * accept new task is if the job is in progress or the job has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws and exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check whether the task has
+   * been successfully added ot not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeout

Review comment:
       Done.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -28,6 +28,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import java.util.concurrent.TimeoutException;

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] pkuwm commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
pkuwm commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r503786356



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +533,177 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check whether the task has
+   * been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check whether the task has
+   * been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws Exception

Review comment:
       Since Exception is pretty generic, I don't see a clear clue where it is from. Can you clarify in what case this Exception is thrown?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500477831



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)

Review comment:
       Please add more javaDoc here to describe 1) the behaviors and expectations (task may not be added in different senarios), 2) what does the timeout here mean? 3) The possible Exceptions (and what does each mean) could be thrown.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r504115658



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +533,185 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig, long timeoutMs)
+      throws Exception {
+
+    if (timeoutMs < DEFAULT_SLEEP) {
+      throw new IllegalArgumentException(
+          String.format("Timeout is less than the minimum acceptable timeout value which is %s ms",
+              DEFAULT_SLEEP));
+    }
+
+    long endTime = System.currentTimeMillis() + timeoutMs;
+
+    validateAddTaskConfigs(workflowName, jobName, taskConfig);
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    JobContext jobContext = getJobContext(nameSpaceJobName);
+    if (workflowContext == null || jobContext == null) {
+      // Workflow context or job context is null. It means job has not been started. Hence task can
+      // be added to the job
+      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+      return;
+    }
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {

Review comment:
       This if condition is already covered by the one below. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r502648326



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -28,6 +28,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import java.util.concurrent.TimeoutException;

Review comment:
       nit, format the imports?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500470981



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)

Review comment:
       I would suggest  to change the parameters order to this (String workflowName, String jobName, TaskConfig, timeout), just to follow the conventions in other methods in the TaskDriver.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r504115658



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +533,185 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig, long timeoutMs)
+      throws Exception {
+
+    if (timeoutMs < DEFAULT_SLEEP) {
+      throw new IllegalArgumentException(
+          String.format("Timeout is less than the minimum acceptable timeout value which is %s ms",
+              DEFAULT_SLEEP));
+    }
+
+    long endTime = System.currentTimeMillis() + timeoutMs;
+
+    validateAddTaskConfigs(workflowName, jobName, taskConfig);
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    JobContext jobContext = getJobContext(nameSpaceJobName);
+    if (workflowContext == null || jobContext == null) {
+      // Workflow context or job context is null. It means job has not been started. Hence task can
+      // be added to the job
+      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+      return;
+    }
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {

Review comment:
       This if condition can be combined with the one below. 

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +533,185 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig, long timeoutMs)
+      throws Exception {
+
+    if (timeoutMs < DEFAULT_SLEEP) {
+      throw new IllegalArgumentException(
+          String.format("Timeout is less than the minimum acceptable timeout value which is %s ms",
+              DEFAULT_SLEEP));
+    }
+
+    long endTime = System.currentTimeMillis() + timeoutMs;
+
+    validateAddTaskConfigs(workflowName, jobName, taskConfig);
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    JobContext jobContext = getJobContext(nameSpaceJobName);
+    if (workflowContext == null || jobContext == null) {
+      // Workflow context or job context is null. It means job has not been started. Hence task can
+      // be added to the job
+      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+      return;
+    }
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      // Null job state means the job has not started yet
+      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+      return;
+    }
+
+    if (ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+      throw new HelixException(
+          String.format("Job %s is in illegal state to accept new task. Job State is %s",
+              nameSpaceJobName, jobState));
+    }
+    addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+  }
+
+  /**
+   * The helper method which check the workflow, job and task configs to determine if new task can
+   * be added to the job
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   */
+  private void validateAddTaskConfigs(String workflowName, String jobName, TaskConfig taskConfig) {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName);
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+
+    if (workflowConfig == null) {
+      throw new IllegalArgumentException(
+          String.format("Workflow config for workflow %s does not exist!", workflowName));
+    }
+
+    if (jobConfig == null) {
+      throw new IllegalArgumentException(
+          String.format("Job config for job %s does not exist!", nameSpaceJobName));
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("TaskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException("Task cannot be added because taskID is null!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(String.format(
+          "Job %s is a targeted job. New task cannot be added to this job!", nameSpaceJobName));
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {

Review comment:
       nit: could combine into (taskConfig.getCommand() == null != jobConfig.getCommand() == null) and say "Command must exist in either job or task, not both". 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] lei-xia commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500477831



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String jobName, long timeout)

Review comment:
       Please add more javaDoc here to describe 1) the behaviors and expectations (task may not be added in different senarios), 2) what does the timeout here mean? and what the caller should do if it is timeout? 3) The possible Exceptions (and what does each mean) could be thrown.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] pkuwm commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
pkuwm commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r504153922



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +533,185 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws Exception if there is an issue with the request or the operation. 1-

Review comment:
       It doesn't seem a general `Exception` is thrown, right? Since this is an API, to make the API signature clearer for users to use, instead of throwing a general `Exception`, shall we just throw an accurate exception that may be thrown: `TimeoutException`? Then it looks much clearer and easy to understand. RuntimeException doesn't need to be in a method signature.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r504190529



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +533,185 @@ public void enqueueJobs(final String queue, final List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this
+   * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT}
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or a job which has not started yet
+   * Note1: Task cannot be added if the job is in an illegal state. A job can accept
+   * new task if the job is in-progress or it has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for the new task should be unique. If not, this API throws an exception.
+   * Note4: In case of timeout exception, it is the user's responsibility to check whether the task
+   * has been successfully added or not.
+   * Note5: timeout is the time that this API checks whether the task has been successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeoutMs
+   * @throws Exception if there is an issue with the request or the operation. 1-
+   *           IllegalArgumentException will be thrown if the inputs are invalid. 2- HelixException
+   *           will be thrown if the job is not in the states to accept a new task or if there is
+   *           any issue in updating jobConfig. 3- TimeoutException will be thrown if the outcome of
+   *           the task addition is unknown and cannot be verified.
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig taskConfig, long timeoutMs)
+      throws Exception {
+
+    if (timeoutMs < DEFAULT_SLEEP) {
+      throw new IllegalArgumentException(
+          String.format("Timeout is less than the minimum acceptable timeout value which is %s ms",
+              DEFAULT_SLEEP));
+    }
+
+    long endTime = System.currentTimeMillis() + timeoutMs;
+
+    validateAddTaskConfigs(workflowName, jobName, taskConfig);
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    JobContext jobContext = getJobContext(nameSpaceJobName);
+    if (workflowContext == null || jobContext == null) {
+      // Workflow context or job context is null. It means job has not been started. Hence task can
+      // be added to the job
+      addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+      return;
+    }
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {

Review comment:
       removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani merged pull request #1439: Implement addTask API

Posted by GitBox <gi...@apache.org>.
alirezazamani merged pull request #1439:
URL: https://github.com/apache/helix/pull/1439


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org