You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/05/25 01:19:49 UTC

[helix] 15/44: TASK: Add deleteJob namespaced job name support

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 6b4ecc79cd0240071ce94d006c454be13d8b138e
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Apr 8 21:44:40 2019 -0700

    TASK: Add deleteJob namespaced job name support
    
    Current deletion of jobs from JobQueues only support denamespaced job names. This makes it impossible for users to list all jobs and delete them because they cannot recover denamespaced names sometimes.
    Changelist:
    1. Add support for namespaced job names for deletion
    
    RB=1624395
    G=helix-reviewers
    A=jxue
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../java/org/apache/helix/task/TaskDriver.java     | 57 ++++++++++++----------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index baa5467..5f4ac14 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -244,32 +244,40 @@ public class TaskDriver {
   /**
    * Delete a job from an existing named queue,
    * the queue has to be stopped prior to this call
-   *
    * @param queue queue name
-   * @param job  job name
+   * @param job job name, denamespaced
    */
   public void deleteJob(final String queue, final String job) {
-    deleteJob(queue, job, false);
+    deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), false);
   }
 
   /**
    * Delete a job from an existing named queue,
    * the queue has to be stopped prior to this call
-   *
    * @param queue queue name
-   * @param job  job name
-   * @param forceDelete  CAUTION: if set true, all job's related zk nodes will
-   *                     be clean up from zookeeper even if its workflow information can not be found.
+   * @param job job name, denamespaced
+   * @param forceDelete
    */
   public void deleteJob(final String queue, final String job, boolean forceDelete) {
+    deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), forceDelete);
+  }
+
+  /**
+   * Delete a job from an existing named queue,
+   * the queue has to be stopped prior to this call
+   * @param queue queue name
+   * @param job job name: namespaced job name
+   * @param forceDelete CAUTION: if set true, all job's related zk nodes will
+   *          be clean up from zookeeper even if its workflow information can not be found.
+   */
+  public void deleteNamespacedJob(final String queue, final String job, boolean forceDelete) {
     WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_accessor, queue);
 
     if (workflowCfg == null) {
       if (forceDelete) {
         // remove all job znodes if its original workflow config was already gone.
         LOG.info("Forcefully removing job: " + job + " from queue: " + queue);
-        boolean success = TaskUtil
-            .removeJob(_accessor, _propertyStore, TaskUtil.getNamespacedJobName(queue, job));
+        boolean success = TaskUtil.removeJob(_accessor, _propertyStore, job);
         if (!success) {
           LOG.info("Failed to delete job: " + job + " from queue: " + queue);
           throw new HelixException("Failed to delete job: " + job + " from queue: " + queue);
@@ -280,13 +288,14 @@ public class TaskDriver {
       return;
     }
 
-    if (workflowCfg.isTerminable()) {
+    if (!workflowCfg.isJobQueue()) {
       throw new IllegalArgumentException(queue + " is not a queue!");
     }
 
     boolean isRecurringWorkflow =
         (workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring());
 
+    String denamespacedJob = TaskUtil.getDenamespacedJobName(queue, job);
     if (isRecurringWorkflow) {
       // delete job from the last scheduled queue if there exists one.
       WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
@@ -297,46 +306,42 @@ public class TaskDriver {
       if (lastScheduledQueue != null) {
         WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, lastScheduledQueue);
         if (lastWorkflowCfg != null) {
-          deleteJobFromQueue(lastScheduledQueue, job);
+          deleteJobFromQueue(lastScheduledQueue, denamespacedJob);
         }
       }
     }
-
-    deleteJobFromQueue(queue, job);
+    deleteJobFromQueue(queue, denamespacedJob);
   }
 
   /**
-   * delete a job from a scheduled (non-recurrent) queue.
-   *
+   * Delete a job from a scheduled (non-recurrent) queue.
    * @param queue
-   * @param job
+   * @param job this must be a namespaced job name
    */
-
   private void deleteJobFromQueue(final String queue, final String job) {
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
-    String workflowState = (workflowCtx != null)
-        ? workflowCtx.getWorkflowState().name()
+    String workflowState = (workflowCtx != null) ? workflowCtx.getWorkflowState().name()
         : TaskState.NOT_STARTED.name();
 
     if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
       throw new IllegalStateException("Queue " + queue + " is still running!");
     }
 
-    if (workflowState.equals(TaskState.COMPLETED.name()) || workflowState.equals(
-        TaskState.FAILED.name()) || workflowState.equals(TaskState.ABORTED.name())) {
-      LOG.warn("Queue " + queue + " has already reached its final state, skip deleting job from it.");
+    if (workflowState.equals(TaskState.COMPLETED.name())
+        || workflowState.equals(TaskState.FAILED.name())
+        || workflowState.equals(TaskState.ABORTED.name())) {
+      LOG.warn(
+          "Queue " + queue + " has already reached its final state, skip deleting job from it.");
       return;
     }
 
-    String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
-    Set<String> jobs = new HashSet<>(Arrays.asList(namespacedJobName));
-    if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true)) {
+    if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue,
+        Collections.singleton(TaskUtil.getNamespacedJobName(queue, job)), true)) {
       LOG.error("Failed to delete job " + job + " from queue " + queue);
       throw new HelixException("Failed to delete job " + job + " from queue " + queue);
     }
   }
 
-
   /**
    * Adds a new job to the end an existing named queue.
    *