You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/09/20 18:26:02 UTC

helix git commit: Record workflow scheduling history in recurrent workflows.

Repository: helix
Updated Branches:
  refs/heads/master 3da3e319a -> f8ee313ee


Record workflow scheduling history in recurrent workflows.

Add records of scheduling history.
When deleting a recurrent workflow, also remove all scheduled workflows that are finished.

Also add test case for deleting recurrent workflows with scheduling history.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f8ee313e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f8ee313e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f8ee313e

Branch: refs/heads/master
Commit: f8ee313ee5f6e0eb6fcbc584773f9ec3c1b01c6c
Parents: 3da3e31
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Jul 28 17:14:39 2017 -0700
Committer: Jiajun Wang <jj...@linkedin.com>
Committed: Wed Sep 20 11:24:47 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |  41 ++++--
 .../org/apache/helix/task/WorkflowContext.java  |  69 ++++++----
 .../integration/task/TestRecurringJobQueue.java | 134 ++++++++++++-------
 3 files changed, 154 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f8ee313e/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index c922b18..d3dba5b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -329,13 +329,12 @@ public class TaskDriver {
     _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
 
     // Now atomically clear the results
-    path =
-        Joiner.on("/")
-            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
+    path = Joiner.on("/")
+        .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
     updater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES);
+      @Override public ZNRecord update(ZNRecord currentData) {
+        Map<String, String> states =
+            currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
         if (states != null) {
           states.keySet().removeAll(toRemove);
         }
@@ -505,10 +504,10 @@ public class TaskDriver {
             .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
 
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
+      @Override public ZNRecord update(ZNRecord currentData) {
         if (currentData != null) {
-          Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES);
+          Map<String, String> states =
+              currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
           if (states != null && states.containsKey(namespacedJobName)) {
             states.keySet().remove(namespacedJobName);
           }
@@ -738,7 +737,21 @@ public class TaskDriver {
    * @param workflow
    */
   public void delete(String workflow) {
+    // After set DELETE state, rebalancer may remove the workflow instantly.
+    // So record context before set DELETE state.
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+
     setWorkflowTargetState(workflow, TargetState.DELETE);
+
+    // Delete all finished scheduled workflows.
+    if (wCtx != null && wCtx.getScheduledWorkflows() != null) {
+      for (String scheduledWorkflow : wCtx.getScheduledWorkflows()) {
+        WorkflowContext scheduledWorkflowCtx = TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow);
+        if (scheduledWorkflowCtx != null && scheduledWorkflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
+          setWorkflowTargetState(scheduledWorkflow, TargetState.DELETE);
+        }
+      }
+    }
   }
 
   /**
@@ -756,15 +769,17 @@ public class TaskDriver {
     }
   }
 
-  /** Helper function to change target state for a given workflow */
+  /**
+   * Helper function to change target state for a given workflow
+   */
   private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
     LOG.info("Set " + workflowName + " to target state " + state);
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
+      @Override public ZNRecord update(ZNRecord currentData) {
         if (currentData != null) {
           // Only update target state for non-completed workflows
-          String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
+          String finishTime = currentData
+              .getSimpleField(WorkflowContext.WorkflowContextProperties.FINISH_TIME.name());
           if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
             currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
                 state.name());

http://git-wip-us.apache.org/repos/asf/helix/blob/f8ee313e/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 9c1f77a..cc21ce3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -19,9 +19,7 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
 
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
@@ -31,11 +29,14 @@ import org.apache.helix.ZNRecord;
  * property store
  */
 public class WorkflowContext extends HelixProperty {
-  public static final String WORKFLOW_STATE = "STATE";
-  public static final String START_TIME = "START_TIME";
-  public static final String FINISH_TIME = "FINISH_TIME";
-  public static final String JOB_STATES = "JOB_STATES";
-  public static final String LAST_SCHEDULED_WORKFLOW = "LAST_SCHEDULED_WORKFLOW";
+  protected enum WorkflowContextProperties {
+    STATE,
+    START_TIME,
+    FINISH_TIME,
+    JOB_STATES,
+    LAST_SCHEDULED_WORKFLOW,
+    SCHEDULED_WORKFLOWS,
+  }
   public static final int UNSTARTED = -1;
   public static final int UNFINISHED = -1;
 
@@ -44,16 +45,18 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setWorkflowState(TaskState s) {
-    if (_record.getSimpleField(WORKFLOW_STATE) == null) {
-      _record.setSimpleField(WORKFLOW_STATE, s.name());
-    } else if (!_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.FAILED.name())
-        && !_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.COMPLETED.name())) {
-      _record.setSimpleField(WORKFLOW_STATE, s.name());
+    if (_record.getSimpleField(WorkflowContextProperties.STATE.name()) == null) {
+      _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
+    } else if (!_record.getSimpleField(WorkflowContextProperties.STATE.name())
+        .equals(TaskState.FAILED.name()) && !_record
+        .getSimpleField(WorkflowContextProperties.STATE.name())
+        .equals(TaskState.COMPLETED.name())) {
+      _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
     }
   }
 
   public TaskState getWorkflowState() {
-    String s = _record.getSimpleField(WORKFLOW_STATE);
+    String s = _record.getSimpleField(WorkflowContextProperties.STATE.name());
     if (s == null) {
       return null;
     }
@@ -62,16 +65,16 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setJobState(String jobResource, TaskState s) {
-    Map<String, String> states = _record.getMapField(JOB_STATES);
+    Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (states == null) {
-      states = new TreeMap<String, String>();
-      _record.setMapField(JOB_STATES, states);
+      states = new TreeMap<>();
+      _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
     }
     states.put(jobResource, s.name());
   }
 
   public TaskState getJobState(String jobResource) {
-    Map<String, String> states = _record.getMapField(JOB_STATES);
+    Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (states == null) {
       return null;
     }
@@ -85,8 +88,8 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public Map<String, TaskState> getJobStates() {
-    Map<String, TaskState> jobStates = new HashMap<String, TaskState>();
-    Map<String, String> stateFieldMap = _record.getMapField(JOB_STATES);
+    Map<String, TaskState> jobStates = new HashMap<>();
+    Map<String, String> stateFieldMap = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
     if (stateFieldMap != null) {
       for (Map.Entry<String, String> state : stateFieldMap.entrySet()) {
         jobStates.put(state.getKey(), TaskState.valueOf(state.getValue()));
@@ -97,11 +100,11 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setStartTime(long t) {
-    _record.setSimpleField(START_TIME, String.valueOf(t));
+    _record.setSimpleField(WorkflowContextProperties.START_TIME.name(), String.valueOf(t));
   }
 
   public long getStartTime() {
-    String tStr = _record.getSimpleField(START_TIME);
+    String tStr = _record.getSimpleField(WorkflowContextProperties.START_TIME.name());
     if (tStr == null) {
       return -1;
     }
@@ -110,11 +113,11 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setFinishTime(long t) {
-    _record.setSimpleField(FINISH_TIME, String.valueOf(t));
+    _record.setSimpleField(WorkflowContextProperties.FINISH_TIME.name(), String.valueOf(t));
   }
 
   public long getFinishTime() {
-    String tStr = _record.getSimpleField(FINISH_TIME);
+    String tStr = _record.getSimpleField(WorkflowContextProperties.FINISH_TIME.name());
     if (tStr == null) {
       return UNFINISHED;
     }
@@ -122,11 +125,23 @@ public class WorkflowContext extends HelixProperty {
     return Long.parseLong(tStr);
   }
 
-  public void setLastScheduledSingleWorkflow(String wf) {
-    _record.setSimpleField(LAST_SCHEDULED_WORKFLOW, wf);
+  public void setLastScheduledSingleWorkflow(String workflow) {
+    _record.setSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name(), workflow);
+    // Record scheduled workflow into the history list as well
+    List<String> workflows = getScheduledWorkflows();
+    if (workflows == null) {
+      workflows = new ArrayList<>();
+      _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
+    }
+    workflows.add(workflow);
   }
 
   public String getLastScheduledSingleWorkflow() {
-    return _record.getSimpleField(LAST_SCHEDULED_WORKFLOW);
+    return _record.getSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name());
   }
+
+  public List<String> getScheduledWorkflows() {
+    return _record.getListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f8ee313e/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 4983ed3..a1070d8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -50,19 +50,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
     JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName);
-    // Create and Enqueue jobs
-    List<String> currentJobNames = new ArrayList<String>();
-    for (int i = 0; i <= 1; i++) {
-      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
-      JobConfig.Builder jobConfig =
-          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
-      String jobName = targetPartition.toLowerCase() + "Job" + i;
-      queueBuild.enqueueJob(jobName, jobConfig);
-      currentJobNames.add(jobName);
-    }
+    List<String> currentJobNames = createAndEnqueueJob(queueBuild, 2);
 
     _driver.start(queueBuild.build());
 
@@ -79,17 +67,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
 
     JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
     currentJobNames.clear();
-    for (int i = 0; i <= 1; i++) {
-      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
-      JobConfig.Builder job =
-          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
-      String jobName = targetPartition.toLowerCase() + "Job" + i;
-      queueBuilder.enqueueJob(jobName, job);
-      currentJobNames.add(jobName);
-    }
+    currentJobNames = createAndEnqueueJob(queueBuilder, 2);
 
     _driver.createQueue(queueBuilder.build());
 
@@ -115,20 +93,9 @@ public class TestRecurringJobQueue extends TaskTestBase {
     JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
 
     // Create and Enqueue jobs
-    List<String> currentJobNames = new ArrayList<String>();
     Map<String, String> commandConfig = ImmutableMap.of(MockTask.TIMEOUT_CONFIG, String.valueOf(500));
     Thread.sleep(100);
-    for (int i = 0; i <= 4; i++) {
-      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
-      JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-          .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-          .setTargetPartitionStates(Sets.newHashSet(targetPartition));
-      String jobName = targetPartition.toLowerCase() + "Job" + i;
-      LOG.info("Enqueuing job: " + jobName);
-      queueBuilder.enqueueJob(jobName, job);
-      currentJobNames.add(i, jobName);
-    }
+    List<String> currentJobNames = createAndEnqueueJob(queueBuilder, 5);
     _driver.createQueue(queueBuilder.build());
 
     WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
@@ -258,19 +225,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     LOG.info("Starting job-queue: " + queueName);
     JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000,
         TargetState.STOP);
-    // Create and Enqueue jobs
-    List<String> currentJobNames = new ArrayList<String>();
-    for (int i = 0; i <= 1; i++) {
-      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
-      JobConfig.Builder jobConfig =
-          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
-      String jobName = targetPartition.toLowerCase() + "Job" + i;
-      queueBuild.enqueueJob(jobName, jobConfig);
-      currentJobNames.add(jobName);
-    }
+    createAndEnqueueJob(queueBuild, 2);
 
     _driver.createQueue(queueBuild.build());
     WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName);
@@ -287,6 +242,69 @@ public class TestRecurringJobQueue extends TaskTestBase {
   }
 
   @Test
+  public void testDeletingRecurrentQueueWithHistory() throws Exception {
+    final String queueName = TestHelper.getTestMethodName();
+    int intervalSeconds = 3;
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 60,
+        TargetState.STOP);
+    createAndEnqueueJob(queueBuild, 2);
+
+    _driver.createQueue(queueBuild.build());
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName);
+    Assert.assertEquals(workflowConfig.getTargetState(), TargetState.STOP);
+
+    // reset interval to a smaller number so as to accelerate test
+    workflowConfig.putSimpleConfig(WorkflowConfig.WorkflowConfigProperty.RecurrenceInterval.name(),
+        "" + intervalSeconds);
+    _driver.updateWorkflow(queueName, workflowConfig);
+
+    _driver.resume(queueName);
+
+    WorkflowContext wCtx;
+    // wait until at least 2 workflows are scheduled based on template queue
+    do {
+      Thread.sleep(intervalSeconds);
+      wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+    } while (wCtx.getScheduledWorkflows().size() < 2);
+
+    // Stop recurring workflow
+    _driver.stop(queueName);
+
+    // Record all scheduled workflows
+    wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+    final List<String> scheduledWorkflows = new ArrayList<>(wCtx.getScheduledWorkflows());
+
+    // Delete recurrent workflow
+    _driver.delete(queueName);
+
+    // Wait until everything are cleaned up
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() throws Exception {
+        WorkflowContext currentQueueCtx = _driver.getWorkflowContext(queueName);
+        if (currentQueueCtx == null) {
+          // Queue is removed. Check the recorded scheduledWorkflows.
+          for (String workflow : scheduledWorkflows) {
+            if (_driver.getWorkflowContext(workflow) != null) {
+              return false;
+            }
+          }
+          return true;
+        } else {
+          // Queue is not removed yet, there might be update on the queue.
+          // Update the workflow list.
+          scheduledWorkflows.clear();
+          scheduledWorkflows.addAll(currentQueueCtx.getScheduledWorkflows());
+        }
+        return false;
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(result);
+  }
+
+  @Test
   public void testGetNoExistWorkflowConfig() {
     String randomName = "randomJob";
     WorkflowConfig workflowConfig = _driver.getWorkflowConfig(randomName);
@@ -297,7 +315,6 @@ public class TestRecurringJobQueue extends TaskTestBase {
     Assert.assertNull(workflowContext);
     JobContext jobContext = _driver.getJobContext(randomName);
     Assert.assertNull(jobContext);
-
   }
 
   private void verifyJobDeleted(String queueName, String jobName) throws Exception {
@@ -308,5 +325,22 @@ public class TestRecurringJobQueue extends TaskTestBase {
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
     TaskTestUtil.pollForEmptyJobState(_driver, queueName, jobName);
   }
+
+  private List<String> createAndEnqueueJob(JobQueue.Builder queueBuild, int jobCount) {
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < jobCount; i++) {
+      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+      JobConfig.Builder jobConfig =
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      String jobName = targetPartition.toLowerCase() + "Job" + i;
+      queueBuild.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+    Assert.assertEquals(currentJobNames.size(), jobCount);
+    return currentJobNames;
+  }
 }