You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/09/20 18:26:02 UTC
helix git commit: Record workflow scheduling history in recurrent
workflows.
Repository: helix
Updated Branches:
refs/heads/master 3da3e319a -> f8ee313ee
Record workflow scheduling history in recurrent workflows.
Add records of scheduling history.
When deleting a recurrent workflow, also remove all scheduled workflows that are finished.
Also add test case for deleting recurrent workflows with scheduling history.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f8ee313e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f8ee313e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f8ee313e
Branch: refs/heads/master
Commit: f8ee313ee5f6e0eb6fcbc584773f9ec3c1b01c6c
Parents: 3da3e31
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Jul 28 17:14:39 2017 -0700
Committer: Jiajun Wang <jj...@linkedin.com>
Committed: Wed Sep 20 11:24:47 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 41 ++++--
.../org/apache/helix/task/WorkflowContext.java | 69 ++++++----
.../integration/task/TestRecurringJobQueue.java | 134 ++++++++++++-------
3 files changed, 154 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/f8ee313e/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index c922b18..d3dba5b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -329,13 +329,12 @@ public class TaskDriver {
_accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
// Now atomically clear the results
- path =
- Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
+ path = Joiner.on("/")
+ .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
updater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES);
+ @Override public ZNRecord update(ZNRecord currentData) {
+ Map<String, String> states =
+ currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
if (states != null) {
states.keySet().removeAll(toRemove);
}
@@ -505,10 +504,10 @@ public class TaskDriver {
.join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
+ @Override public ZNRecord update(ZNRecord currentData) {
if (currentData != null) {
- Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES);
+ Map<String, String> states =
+ currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
if (states != null && states.containsKey(namespacedJobName)) {
states.keySet().remove(namespacedJobName);
}
@@ -738,7 +737,21 @@ public class TaskDriver {
* @param workflow
*/
public void delete(String workflow) {
+ // After set DELETE state, rebalancer may remove the workflow instantly.
+ // So record context before set DELETE state.
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+
setWorkflowTargetState(workflow, TargetState.DELETE);
+
+ // Delete all finished scheduled workflows.
+ if (wCtx != null && wCtx.getScheduledWorkflows() != null) {
+ for (String scheduledWorkflow : wCtx.getScheduledWorkflows()) {
+ WorkflowContext scheduledWorkflowCtx = TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow);
+ if (scheduledWorkflowCtx != null && scheduledWorkflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
+ setWorkflowTargetState(scheduledWorkflow, TargetState.DELETE);
+ }
+ }
+ }
}
/**
@@ -756,15 +769,17 @@ public class TaskDriver {
}
}
- /** Helper function to change target state for a given workflow */
+ /**
+ * Helper function to change target state for a given workflow
+ */
private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
LOG.info("Set " + workflowName + " to target state " + state);
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
+ @Override public ZNRecord update(ZNRecord currentData) {
if (currentData != null) {
// Only update target state for non-completed workflows
- String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
+ String finishTime = currentData
+ .getSimpleField(WorkflowContext.WorkflowContextProperties.FINISH_TIME.name());
if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
state.name());
http://git-wip-us.apache.org/repos/asf/helix/blob/f8ee313e/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 9c1f77a..cc21ce3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -19,9 +19,7 @@ package org.apache.helix.task;
* under the License.
*/
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
@@ -31,11 +29,14 @@ import org.apache.helix.ZNRecord;
* property store
*/
public class WorkflowContext extends HelixProperty {
- public static final String WORKFLOW_STATE = "STATE";
- public static final String START_TIME = "START_TIME";
- public static final String FINISH_TIME = "FINISH_TIME";
- public static final String JOB_STATES = "JOB_STATES";
- public static final String LAST_SCHEDULED_WORKFLOW = "LAST_SCHEDULED_WORKFLOW";
+ protected enum WorkflowContextProperties {
+ STATE,
+ START_TIME,
+ FINISH_TIME,
+ JOB_STATES,
+ LAST_SCHEDULED_WORKFLOW,
+ SCHEDULED_WORKFLOWS,
+ }
public static final int UNSTARTED = -1;
public static final int UNFINISHED = -1;
@@ -44,16 +45,18 @@ public class WorkflowContext extends HelixProperty {
}
public void setWorkflowState(TaskState s) {
- if (_record.getSimpleField(WORKFLOW_STATE) == null) {
- _record.setSimpleField(WORKFLOW_STATE, s.name());
- } else if (!_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.FAILED.name())
- && !_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.COMPLETED.name())) {
- _record.setSimpleField(WORKFLOW_STATE, s.name());
+ if (_record.getSimpleField(WorkflowContextProperties.STATE.name()) == null) {
+ _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
+ } else if (!_record.getSimpleField(WorkflowContextProperties.STATE.name())
+ .equals(TaskState.FAILED.name()) && !_record
+ .getSimpleField(WorkflowContextProperties.STATE.name())
+ .equals(TaskState.COMPLETED.name())) {
+ _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
}
}
public TaskState getWorkflowState() {
- String s = _record.getSimpleField(WORKFLOW_STATE);
+ String s = _record.getSimpleField(WorkflowContextProperties.STATE.name());
if (s == null) {
return null;
}
@@ -62,16 +65,16 @@ public class WorkflowContext extends HelixProperty {
}
public void setJobState(String jobResource, TaskState s) {
- Map<String, String> states = _record.getMapField(JOB_STATES);
+ Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (states == null) {
- states = new TreeMap<String, String>();
- _record.setMapField(JOB_STATES, states);
+ states = new TreeMap<>();
+ _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
}
states.put(jobResource, s.name());
}
public TaskState getJobState(String jobResource) {
- Map<String, String> states = _record.getMapField(JOB_STATES);
+ Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (states == null) {
return null;
}
@@ -85,8 +88,8 @@ public class WorkflowContext extends HelixProperty {
}
public Map<String, TaskState> getJobStates() {
- Map<String, TaskState> jobStates = new HashMap<String, TaskState>();
- Map<String, String> stateFieldMap = _record.getMapField(JOB_STATES);
+ Map<String, TaskState> jobStates = new HashMap<>();
+ Map<String, String> stateFieldMap = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (stateFieldMap != null) {
for (Map.Entry<String, String> state : stateFieldMap.entrySet()) {
jobStates.put(state.getKey(), TaskState.valueOf(state.getValue()));
@@ -97,11 +100,11 @@ public class WorkflowContext extends HelixProperty {
}
public void setStartTime(long t) {
- _record.setSimpleField(START_TIME, String.valueOf(t));
+ _record.setSimpleField(WorkflowContextProperties.START_TIME.name(), String.valueOf(t));
}
public long getStartTime() {
- String tStr = _record.getSimpleField(START_TIME);
+ String tStr = _record.getSimpleField(WorkflowContextProperties.START_TIME.name());
if (tStr == null) {
return -1;
}
@@ -110,11 +113,11 @@ public class WorkflowContext extends HelixProperty {
}
public void setFinishTime(long t) {
- _record.setSimpleField(FINISH_TIME, String.valueOf(t));
+ _record.setSimpleField(WorkflowContextProperties.FINISH_TIME.name(), String.valueOf(t));
}
public long getFinishTime() {
- String tStr = _record.getSimpleField(FINISH_TIME);
+ String tStr = _record.getSimpleField(WorkflowContextProperties.FINISH_TIME.name());
if (tStr == null) {
return UNFINISHED;
}
@@ -122,11 +125,23 @@ public class WorkflowContext extends HelixProperty {
return Long.parseLong(tStr);
}
- public void setLastScheduledSingleWorkflow(String wf) {
- _record.setSimpleField(LAST_SCHEDULED_WORKFLOW, wf);
+ public void setLastScheduledSingleWorkflow(String workflow) {
+ _record.setSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name(), workflow);
+ // Record scheduled workflow into the history list as well
+ List<String> workflows = getScheduledWorkflows();
+ if (workflows == null) {
+ workflows = new ArrayList<>();
+ _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
+ }
+ workflows.add(workflow);
}
public String getLastScheduledSingleWorkflow() {
- return _record.getSimpleField(LAST_SCHEDULED_WORKFLOW);
+ return _record.getSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name());
}
+
+ public List<String> getScheduledWorkflows() {
+ return _record.getListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f8ee313e/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 4983ed3..a1070d8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -50,19 +50,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName);
- // Create and Enqueue jobs
- List<String> currentJobNames = new ArrayList<String>();
- for (int i = 0; i <= 1; i++) {
- String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
- JobConfig.Builder jobConfig =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet(targetPartition));
- String jobName = targetPartition.toLowerCase() + "Job" + i;
- queueBuild.enqueueJob(jobName, jobConfig);
- currentJobNames.add(jobName);
- }
+ List<String> currentJobNames = createAndEnqueueJob(queueBuild, 2);
_driver.start(queueBuild.build());
@@ -79,17 +67,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
currentJobNames.clear();
- for (int i = 0; i <= 1; i++) {
- String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
- JobConfig.Builder job =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet(targetPartition));
- String jobName = targetPartition.toLowerCase() + "Job" + i;
- queueBuilder.enqueueJob(jobName, job);
- currentJobNames.add(jobName);
- }
+ currentJobNames = createAndEnqueueJob(queueBuilder, 2);
_driver.createQueue(queueBuilder.build());
@@ -115,20 +93,9 @@ public class TestRecurringJobQueue extends TaskTestBase {
JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
// Create and Enqueue jobs
- List<String> currentJobNames = new ArrayList<String>();
Map<String, String> commandConfig = ImmutableMap.of(MockTask.TIMEOUT_CONFIG, String.valueOf(500));
Thread.sleep(100);
- for (int i = 0; i <= 4; i++) {
- String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
- JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet(targetPartition));
- String jobName = targetPartition.toLowerCase() + "Job" + i;
- LOG.info("Enqueuing job: " + jobName);
- queueBuilder.enqueueJob(jobName, job);
- currentJobNames.add(i, jobName);
- }
+ List<String> currentJobNames = createAndEnqueueJob(queueBuilder, 5);
_driver.createQueue(queueBuilder.build());
WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
@@ -258,19 +225,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
LOG.info("Starting job-queue: " + queueName);
JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000,
TargetState.STOP);
- // Create and Enqueue jobs
- List<String> currentJobNames = new ArrayList<String>();
- for (int i = 0; i <= 1; i++) {
- String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
- JobConfig.Builder jobConfig =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet(targetPartition));
- String jobName = targetPartition.toLowerCase() + "Job" + i;
- queueBuild.enqueueJob(jobName, jobConfig);
- currentJobNames.add(jobName);
- }
+ createAndEnqueueJob(queueBuild, 2);
_driver.createQueue(queueBuild.build());
WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName);
@@ -287,6 +242,69 @@ public class TestRecurringJobQueue extends TaskTestBase {
}
@Test
+ public void testDeletingRecurrentQueueWithHistory() throws Exception {
+ final String queueName = TestHelper.getTestMethodName();
+ int intervalSeconds = 3;
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 60,
+ TargetState.STOP);
+ createAndEnqueueJob(queueBuild, 2);
+
+ _driver.createQueue(queueBuild.build());
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName);
+ Assert.assertEquals(workflowConfig.getTargetState(), TargetState.STOP);
+
+ // reset interval to a smaller number so as to accelerate test
+ workflowConfig.putSimpleConfig(WorkflowConfig.WorkflowConfigProperty.RecurrenceInterval.name(),
+ "" + intervalSeconds);
+ _driver.updateWorkflow(queueName, workflowConfig);
+
+ _driver.resume(queueName);
+
+ WorkflowContext wCtx;
+ // wait until at least 2 workflows are scheduled based on template queue
+ do {
+ Thread.sleep(intervalSeconds);
+ wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+ } while (wCtx.getScheduledWorkflows().size() < 2);
+
+ // Stop recurring workflow
+ _driver.stop(queueName);
+
+ // Record all scheduled workflows
+ wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+ final List<String> scheduledWorkflows = new ArrayList<>(wCtx.getScheduledWorkflows());
+
+ // Delete recurrent workflow
+ _driver.delete(queueName);
+
+ // Wait until everything are cleaned up
+ boolean result = TestHelper.verify(new TestHelper.Verifier() {
+ @Override public boolean verify() throws Exception {
+ WorkflowContext currentQueueCtx = _driver.getWorkflowContext(queueName);
+ if (currentQueueCtx == null) {
+ // Queue is removed. Check the recorded scheduledWorkflows.
+ for (String workflow : scheduledWorkflows) {
+ if (_driver.getWorkflowContext(workflow) != null) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ // Queue is not removed yet, there might be update on the queue.
+ // Update the workflow list.
+ scheduledWorkflows.clear();
+ scheduledWorkflows.addAll(currentQueueCtx.getScheduledWorkflows());
+ }
+ return false;
+ }
+ }, 5 * 1000);
+ Assert.assertTrue(result);
+ }
+
+ @Test
public void testGetNoExistWorkflowConfig() {
String randomName = "randomJob";
WorkflowConfig workflowConfig = _driver.getWorkflowConfig(randomName);
@@ -297,7 +315,6 @@ public class TestRecurringJobQueue extends TaskTestBase {
Assert.assertNull(workflowContext);
JobContext jobContext = _driver.getJobContext(randomName);
Assert.assertNull(jobContext);
-
}
private void verifyJobDeleted(String queueName, String jobName) throws Exception {
@@ -308,5 +325,22 @@ public class TestRecurringJobQueue extends TaskTestBase {
Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
TaskTestUtil.pollForEmptyJobState(_driver, queueName, jobName);
}
+
+ private List<String> createAndEnqueueJob(JobQueue.Builder queueBuild, int jobCount) {
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i < jobCount; i++) {
+ String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+ String jobName = targetPartition.toLowerCase() + "Job" + i;
+ queueBuild.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+ Assert.assertEquals(currentJobNames.size(), jobCount);
+ return currentJobNames;
+ }
}