You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/27 23:34:09 UTC

helix git commit: [HELIX-687] Add synchronized delete for workflows

Repository: helix
Updated Branches:
  refs/heads/master ccdc0dd7f -> 667b1887f


[HELIX-687] Add synchronized delete for workflows

This commit adds the method deleteAndWaitForCompletion that deletes and returns only after the delete operation has completed. The pre-existing delete does not guarantee that the operation is complete; however, deleteAndWaitForCompletion does.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/667b1887
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/667b1887
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/667b1887

Branch: refs/heads/master
Commit: 667b1887f524fb499e472026f746bde4b5ceacca
Parents: ccdc0dd
Author: narendly <na...@gmail.com>
Authored: Mon Mar 26 14:49:43 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Tue Mar 27 16:32:30 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 60 ++++++++++++++++--
 .../integration/task/TestDeleteWorkflow.java    | 67 ++++++++++++++++++++
 2 files changed, 120 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/667b1887/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 44aa930..957333c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -257,12 +257,12 @@ public class TaskDriver {
     deleteJobFromQueue(queue, job);
   }
 
-    /**
-     * delete a job from a scheduled (non-recurrent) queue.
-     *
-     * @param queue
-     * @param job
-     */
+  /**
+   * delete a job from a scheduled (non-recurrent) queue.
+   *
+   * @param queue
+   * @param job
+   */
 
   private void deleteJobFromQueue(final String queue, final String job) {
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
@@ -571,6 +571,52 @@ public class TaskDriver {
   }
 
   /**
+   * Public synchronized method to wait for a delete operation to fully complete with timeout.
+   * When this method returns, it means that a queue (workflow) has been completely deleted, meaning
+   * its IdealState, WorkflowConfig, and WorkflowContext have all been deleted.
+   *
+   * @param workflow workflow/jobqueue name
+   * @param timeout duration to give to delete operation to completion
+   */
+  public void deleteAndWaitForCompletion(String workflow, long timeout) throws InterruptedException {
+    delete(workflow);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    // For checking whether delete completed
+    BaseDataAccessor baseDataAccessor = _accessor.getBaseDataAccessor();
+    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+
+    String idealStatePath = keyBuilder.idealStates(workflow).getPath();
+    String workflowConfigPath = keyBuilder.resourceConfig(workflow).getPath();
+    String workflowContextPath = keyBuilder.workflowContext(workflow).getPath();
+
+    while (System.currentTimeMillis() <= endTime) {
+      if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)
+          || baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)
+          || baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
+        Thread.sleep(1000);
+      } else {
+        return;
+      }
+    }
+
+    // Deletion failed: check which step of deletion failed to complete and create an error message
+    StringBuilder failed = new StringBuilder();
+    if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)) {
+      failed.append("IdealState ");
+    }
+    if (baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)) {
+      failed.append("WorkflowConfig ");
+    }
+    if (baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
+      failed.append("WorkflowContext ");
+    }
+    throw new HelixException(String
+        .format("Failed to delete the workflow/queue %s within %d milliseconds. "
+            + "The following components still remain: %s", workflow, timeout, failed.toString()));
+  }
+
+  /**
    * Helper function to change target state for a given workflow
    */
   private void setWorkflowTargetState(String workflow, TargetState state) {
@@ -810,4 +856,4 @@ public class TaskDriver {
           "Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS.");
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/667b1887/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
new file mode 100644
index 0000000..91b7f32
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
@@ -0,0 +1,67 @@
+package org.apache.helix.integration.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestDeleteWorkflow extends TaskTestBase  {
+  private static final int DELETE_DELAY = 3000;
+
+  private HelixAdmin admin;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numParitions = 1;
+    admin = _gSetupTool.getClusterManagementTool();
+    super.beforeClass();
+  }
+
+  @Test
+  public void testDeleteWorkflow() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1)
+        .setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1", jobBuilder);
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName,
+        TaskUtil.getNamespacedJobName(jobQueueName, "job1"), TaskState.IN_PROGRESS);
+
+    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this job queue
+    Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName));
+    Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+
+    // Pause the Controller so that the job queue won't get deleted
+    admin.enableCluster(CLUSTER_NAME, false);
+
+    // Attempt the deletion and time out
+    try {
+      _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY);
+      Assert.fail("Delete must time out and throw a HelixException with the Controller paused, but did not!");
+    } catch (HelixException e) {
+      // Pass
+    }
+
+    // Resume the Controller and call delete again
+    admin.enableCluster(CLUSTER_NAME, true);
+    _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY);
+
+    // Check that the deletion operation completed
+    Assert.assertNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNull(_driver.getWorkflowContext(jobQueueName));
+    Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+  }
+}
\ No newline at end of file