You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/27 23:34:09 UTC
helix git commit: [HELIX-687] Add synchronized delete for workflows
Repository: helix
Updated Branches:
refs/heads/master ccdc0dd7f -> 667b1887f
[HELIX-687] Add synchronized delete for workflows
This commit adds the method deleteAndWaitForCompletion that deletes and returns only after the delete operation has completed. The pre-existing delete does not guarantee that the operation is complete; however, deleteAndWaitForCompletion does.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/667b1887
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/667b1887
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/667b1887
Branch: refs/heads/master
Commit: 667b1887f524fb499e472026f746bde4b5ceacca
Parents: ccdc0dd
Author: narendly <na...@gmail.com>
Authored: Mon Mar 26 14:49:43 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Tue Mar 27 16:32:30 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 60 ++++++++++++++++--
.../integration/task/TestDeleteWorkflow.java | 67 ++++++++++++++++++++
2 files changed, 120 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/667b1887/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 44aa930..957333c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -257,12 +257,12 @@ public class TaskDriver {
deleteJobFromQueue(queue, job);
}
- /**
- * delete a job from a scheduled (non-recurrent) queue.
- *
- * @param queue
- * @param job
- */
+ /**
+ * delete a job from a scheduled (non-recurrent) queue.
+ *
+ * @param queue
+ * @param job
+ */
private void deleteJobFromQueue(final String queue, final String job) {
WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
@@ -571,6 +571,52 @@ public class TaskDriver {
}
/**
+ * Public synchronized method to wait for a delete operation to fully complete with timeout.
+ * When this method returns, it means that a queue (workflow) has been completely deleted, meaning
+ * its IdealState, WorkflowConfig, and WorkflowContext have all been deleted.
+ *
+ * @param workflow workflow/jobqueue name
+ * @param timeout duration to give to delete operation to completion
+ */
+ public void deleteAndWaitForCompletion(String workflow, long timeout) throws InterruptedException {
+ delete(workflow);
+ long endTime = System.currentTimeMillis() + timeout;
+
+ // For checking whether delete completed
+ BaseDataAccessor baseDataAccessor = _accessor.getBaseDataAccessor();
+ PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+
+ String idealStatePath = keyBuilder.idealStates(workflow).getPath();
+ String workflowConfigPath = keyBuilder.resourceConfig(workflow).getPath();
+ String workflowContextPath = keyBuilder.workflowContext(workflow).getPath();
+
+ while (System.currentTimeMillis() <= endTime) {
+ if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)
+ || baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)
+ || baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
+ Thread.sleep(1000);
+ } else {
+ return;
+ }
+ }
+
+ // Deletion failed: check which step of deletion failed to complete and create an error message
+ StringBuilder failed = new StringBuilder();
+ if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)) {
+ failed.append("IdealState ");
+ }
+ if (baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)) {
+ failed.append("WorkflowConfig ");
+ }
+ if (baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
+ failed.append("WorkflowContext ");
+ }
+ throw new HelixException(String
+ .format("Failed to delete the workflow/queue %s within %d milliseconds. "
+ + "The following components still remain: %s", workflow, timeout, failed.toString()));
+ }
+
+ /**
* Helper function to change target state for a given workflow
*/
private void setWorkflowTargetState(String workflow, TargetState state) {
@@ -810,4 +856,4 @@ public class TaskDriver {
"Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS.");
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/667b1887/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
new file mode 100644
index 0000000..91b7f32
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
@@ -0,0 +1,67 @@
+package org.apache.helix.integration.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestDeleteWorkflow extends TaskTestBase {
+ private static final int DELETE_DELAY = 3000;
+
+ private HelixAdmin admin;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _numParitions = 1;
+ admin = _gSetupTool.getClusterManagementTool();
+ super.beforeClass();
+ }
+
+ @Test
+ public void testDeleteWorkflow() throws InterruptedException {
+ String jobQueueName = TestHelper.getTestMethodName();
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+ .setMaxAttemptsPerTask(1)
+ .setWorkflow(jobQueueName)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000"));
+
+ JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+ jobQueue.enqueueJob("job1", jobBuilder);
+ _driver.start(jobQueue.build());
+ _driver.pollForJobState(jobQueueName,
+ TaskUtil.getNamespacedJobName(jobQueueName, "job1"), TaskState.IN_PROGRESS);
+
+ // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this job queue
+ Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName));
+ Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName));
+ Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+
+ // Pause the Controller so that the job queue won't get deleted
+ admin.enableCluster(CLUSTER_NAME, false);
+
+ // Attempt the deletion and time out
+ try {
+ _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY);
+ Assert.fail("Delete must time out and throw a HelixException with the Controller paused, but did not!");
+ } catch (HelixException e) {
+ // Pass
+ }
+
+ // Resume the Controller and call delete again
+ admin.enableCluster(CLUSTER_NAME, true);
+ _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY);
+
+ // Check that the deletion operation completed
+ Assert.assertNull(_driver.getWorkflowConfig(jobQueueName));
+ Assert.assertNull(_driver.getWorkflowContext(jobQueueName));
+ Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+ }
+}
\ No newline at end of file