You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/23 21:52:51 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-744] Support cancellation of a Helix workflow via a DELETE Spec.

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b450a07  [GOBBLIN-744] Support cancellation of a Helix workflow via a DELETE Spec.
b450a07 is described below

commit b450a077bdb03f42afa9cb8d674159f5b4d507d4
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Apr 23 14:52:43 2019 -0700

    [GOBBLIN-744] Support cancellation of a Helix workflow via a DELETE Spec.
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    ### JIRA
    - [x] My PR addresses the following [Gobblin JIRA]
    (https://issues.apache.org/jira/browse/GOBBLIN/)
    issues and references them in the PR title. For
    example, "[GOBBLIN-XXX] My Gobblin PR"
        -
    https://issues.apache.org/jira/browse/GOBBLIN-744
    
    ### Description
    - [x] Here are some details about my PR, including
    screenshots (if applicable):
    This task supports the ability to interrupt and
    cancel a running job on a Gobblin Helix cluster
    via a DELETE Spec submitted to the
    JobConfigurationManager. The DELETE Spec should
    have
    "gobblin.cluster.shouldCancelRunningJobOnDelete"
    set to true for cancelling a running job. The
    default behavior is to simply delete the
    corresponding JobSpec from the JobCatalog.
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    IntegrationJobCancelViaSpecSuite and
    ClusterIntegrationTest.
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    GOBBLIN-744: Support cancellation of a Helix
    workflow via a DELETE Spec.
    
    GOBBLIN-744: Address reviewer comments.
    
    GOBBLIN-744: Address reviewer comments.
    
    GOBBLIN-744: Addressed reviewer comments.
    
    Closes #2609 from sv2000/helixCancel
---
 .../FsScheduledJobConfigurationManager.java        |  24 ++--
 .../cluster/GobblinClusterConfigurationKeys.java   |   2 +
 .../gobblin/cluster/GobblinHelixJobScheduler.java  |  37 +++++-
 .../org/apache/gobblin/cluster/SleepingTask.java   |  35 ++++-
 .../gobblin/cluster/ClusterIntegrationTest.java    | 129 +++++++++++++++++--
 .../cluster/suite/IntegrationJobCancelSuite.java   |   4 +-
 .../suite/IntegrationJobRestartViaSpecSuite.java   | 142 +++++++++++++++++++++
 .../apache/gobblin/runtime/api/FsSpecConsumer.java |  11 +-
 8 files changed, 357 insertions(+), 27 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
index 21db2ff..e4776ef 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
@@ -18,7 +18,6 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -31,7 +30,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.runtime.api.FsSpecConsumer;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 
 
@@ -53,27 +51,31 @@ public class FsScheduledJobConfigurationManager extends ScheduledJobConfiguratio
 
   @Override
   protected void fetchJobSpecs() throws ExecutionException, InterruptedException {
-    List<Pair<SpecExecutor.Verb, Spec>> jobSpecs =
-        (List<Pair<SpecExecutor.Verb, Spec>>) this._specConsumer.changedSpecs().get();
+    List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
+        (List<Pair<SpecExecutor.Verb, JobSpec>>) this._specConsumer.changedSpecs().get();
 
-    for (Pair<SpecExecutor.Verb, Spec> entry : jobSpecs) {
-      Spec spec = entry.getValue();
+    for (Pair<SpecExecutor.Verb, JobSpec> entry : jobSpecs) {
+      JobSpec jobSpec = entry.getValue();
       SpecExecutor.Verb verb = entry.getKey();
-      if (verb.equals(SpecExecutor.Verb.ADD) || verb.equals(SpecExecutor.Verb.UPDATE)) {
+      if (verb.equals(SpecExecutor.Verb.ADD)) {
         // Handle addition
-        JobSpec jobSpec = (JobSpec) spec;
         this._jobCatalog.put(jobSpec);
         postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
+      } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
+        //Handle update.
+        //Overwrite the jobSpec in the jobCatalog and post an UpdateJobConfigArrivalEvent.
+        this._jobCatalog.put(jobSpec);
+        postUpdateJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
       } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
         // Handle delete
-        this._jobCatalog.remove(spec.getUri());
-        postDeleteJobConfigArrival(spec.getUri().toString(), new Properties());
+        this._jobCatalog.remove(jobSpec.getUri());
+        postDeleteJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
       }
 
       try {
         //Acknowledge the successful consumption of the JobSpec back to the SpecConsumer, so that the
         //SpecConsumer can delete the JobSpec.
-        this._specConsumer.commit(spec);
+        this._specConsumer.commit(jobSpec);
       } catch (IOException e) {
         log.error("Error when committing to FsSpecConsumer: ", e);
       }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 8b33e4e..d1267c5 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -159,4 +159,6 @@ public class GobblinClusterConfigurationKeys {
   public static final String KILL_DUPLICATE_PLANNING_JOB = GOBBLIN_CLUSTER_PREFIX + "kill.duplicate.planningJob";
   public static final boolean DEFAULT_KILL_DUPLICATE_PLANNING_JOB = true;
 
+  public static final String CANCEL_RUNNING_JOB_ON_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
+  public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 79b088b..659ff63 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
+import org.apache.helix.task.TaskDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +39,7 @@ import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.Striped;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
@@ -96,6 +98,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
   final Striped<Lock> locks = Striped.lazyWeakLock(256);
 
   private boolean startServicesCompleted;
+  private final long helixJobStopTimeoutMillis;
 
   public GobblinHelixJobScheduler(Properties properties,
                                   HelixManager jobHelixManager,
@@ -116,7 +119,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
     this.jobCatalog = jobCatalog;
     this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(properties), this.getClass());
 
-    int metricsWindowSizeInMin = ConfigUtils.getInt(ConfigUtils.propertiesToConfig(this.properties),
+    Config jobConfig = ConfigUtils.propertiesToConfig(this.properties);
+    int metricsWindowSizeInMin = ConfigUtils.getInt(jobConfig,
                                                     ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
                                                     ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
 
@@ -141,6 +145,9 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
                                                   metricsWindowSizeInMin);
 
     this.startServicesCompleted = false;
+
+    this.helixJobStopTimeoutMillis = ConfigUtils.getLong(jobConfig, GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
+        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS) * 1000;
   }
 
   @Override
@@ -315,6 +322,10 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
     } catch (Exception je) {
       LOGGER.error("Failed to update job " + updateJobArrival.getJobName(), je);
     }
+
+    //Wait until the cancelled job is complete.
+    waitForJobCompletion(updateJobArrival.getJobName());
+
     try {
       handleNewJobConfigArrival(new NewJobConfigArrivalEvent(updateJobArrival.getJobName(),
           updateJobArrival.getJobConfig()));
@@ -323,16 +334,38 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
     }
   }
 
+  private void waitForJobCompletion(String jobName) {
+    while (this.jobRunningMap.getOrDefault(jobName, false) != false) {
+      LOGGER.info("Waiting for job {} to stop...", jobName);
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOGGER.warn("Interrupted exception encountered: ", e);
+      }
+    }
+  }
+
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) {
+  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
     LOGGER.info("Received delete for job configuration of job " + deleteJobArrival.getJobName());
     try {
       unscheduleJob(deleteJobArrival.getJobName());
+      cancelJobIfRequired(deleteJobArrival);
     } catch (JobException je) {
       LOGGER.error("Failed to unschedule job " + deleteJobArrival.getJobName());
     }
   }
 
+  private void cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
+    Properties jobConfig = deleteJobArrival.getJobConfig();
+    if (PropertiesUtils.getPropAsBoolean(jobConfig, GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
+        GobblinClusterConfigurationKeys.DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE)) {
+      LOGGER.info("Cancelling workflow: {}", deleteJobArrival.getJobName());
+      TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
+      taskDriver.waitToStop(deleteJobArrival.getJobName(), this.helixJobStopTimeoutMillis);
+      LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+    }
+  }
   /**
    * This class is responsible for running non-scheduled jobs.
    */
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
index 55750c0..8d06b70 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
@@ -17,24 +17,52 @@
 
 package org.apache.gobblin.cluster;
 
-import avro.shaded.com.google.common.base.Throwables;
+import java.io.File;
+import java.io.IOException;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.Files;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.task.BaseAbstractTask;
 
 @Slf4j
 public class SleepingTask extends BaseAbstractTask {
+  public static final String TASK_STATE_FILE_KEY = "task.state.file.path";
+
   private final long sleepTime;
+  private File taskStateFile;
 
   public SleepingTask(TaskContext taskContext) {
     super(taskContext);
-    sleepTime = taskContext.getTaskState().getPropAsLong("data.publisher.sleep.time.in.seconds", 10L);
+    TaskState taskState = taskContext.getTaskState();
+    sleepTime = taskState.getPropAsLong("data.publisher.sleep.time.in.seconds", 10L);
+    taskStateFile = new File(taskState.getProp(TASK_STATE_FILE_KEY));
+    try {
+      if (taskStateFile.exists()) {
+        if (!taskStateFile.delete()) {
+          log.error("Unable to delete {}", taskStateFile);
+          throw new IOException("File Delete Exception");
+        }
+      } else {
+        Files.createParentDirs(taskStateFile);
+      }
+    } catch (IOException e) {
+      log.error("Unable to create directory: ", taskStateFile.getParent());
+      Throwables.propagate(e);
+    }
+    taskStateFile.deleteOnExit();
   }
 
   @Override
   public void run() {
     try {
+      if (!taskStateFile.createNewFile()) {
+        throw new IOException("File creation error: " + taskStateFile.getName());
+      }
       long endTime = System.currentTimeMillis() + sleepTime * 1000;
       while (System.currentTimeMillis() <= endTime) {
         Thread.sleep(1000L);
@@ -46,6 +74,9 @@ public class SleepingTask extends BaseAbstractTask {
       log.error("Sleep interrupted.");
       Thread.currentThread().interrupt();
       Throwables.propagate(e);
+    } catch (IOException e) {
+      log.error("IOException encountered when creating {}", taskStateFile.getName(), e);
+      Throwables.propagate(e);
     }
   }
 }
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 9d5d602..7e810c3 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -17,15 +17,25 @@
 
 package org.apache.gobblin.cluster;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ChainedPathZkSerializer;
+import org.apache.helix.manager.zk.PathBasedZkSerializer;
+import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicate;
 import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
@@ -35,14 +45,19 @@ import org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite;
 import org.apache.gobblin.cluster.suite.IntegrationDedicatedTaskDriverClusterSuite;
 import org.apache.gobblin.cluster.suite.IntegrationJobCancelSuite;
 import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite;
+import org.apache.gobblin.cluster.suite.IntegrationJobRestartViaSpecSuite;
 import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite;
 import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 
+
 @Slf4j
 public class ClusterIntegrationTest {
 
   private IntegrationBasicSuite suite;
+  private String zkConnectString;
 
   @Test
   public void testJobShouldComplete()
@@ -51,28 +66,31 @@ public class ClusterIntegrationTest {
     runAndVerify();
   }
 
-  @Test void testJobShouldGetCancelled() throws Exception {
-    this.suite =new IntegrationJobCancelSuite();
+  private HelixManager getHelixManager() {
     Config helixConfig = this.suite.getManagerConfig();
     String clusterName = helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
     String instanceName = ConfigUtils.getString(helixConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
         GobblinClusterManager.class.getSimpleName());
-    String zkConnectString = helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    this.zkConnectString = helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
     HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.CONTROLLER, zkConnectString);
+    return helixManager;
+  }
 
+  @Test void testJobShouldGetCancelled() throws Exception {
+    this.suite =new IntegrationJobCancelSuite();
+    HelixManager helixManager = getHelixManager();
     suite.startCluster();
-
     helixManager.connect();
 
     TaskDriver taskDriver = new TaskDriver(helixManager);
 
-    while (TaskDriver.getWorkflowContext(helixManager, IntegrationJobCancelSuite.JOB_ID) == null) {
-      log.warn("Waiting for the job to start...");
-      Thread.sleep(1000L);
-    }
+    //Ensure that Helix has created a workflow
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+        assertTrue(isTaskStarted(helixManager, IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
 
-    // Give the job some time to reach writer, where it sleeps
-    Thread.sleep(2000L);
+    //Ensure that the SleepingTask is running
+    AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
+        assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE),"Waiting for the task to enter running state");
 
     log.info("Stopping the job");
     taskDriver.stop(IntegrationJobCancelSuite.JOB_ID);
@@ -82,6 +100,97 @@ public class ClusterIntegrationTest {
     suite.waitForAndVerifyOutputFiles();
   }
 
+  /**
+   * An integration test for restarting a Helix workflow via a JobSpec. This test case starts a Helix cluster with
+   * a {@link FsScheduledJobConfigurationManager}. The test case does the following:
+   * <ul>
+   *   <li> add a {@link org.apache.gobblin.runtime.api.JobSpec} that uses a {@link org.apache.gobblin.cluster.SleepingCustomTaskSource})
+   *   to {@link IntegrationJobRestartViaSpecSuite#FS_SPEC_CONSUMER_DIR}.  which is picked by the JobConfigurationManager. </li>
+   *   <li> the JobConfigurationManager sends a notification to the GobblinHelixJobScheduler which schedules the job for execution. The JobSpec is
+   *   also added to the JobCatalog for persistence. Helix starts a Workflow for this JobSpec. </li>
+   *   <li> We then add a {@link org.apache.gobblin.runtime.api.JobSpec} with UPDATE Verb to {@link IntegrationJobRestartViaSpecSuite#FS_SPEC_CONSUMER_DIR}.
+   *   This signals GobblinHelixJobScheduler (and, Helix) to first cancel the running job (i.e., Helix Workflow) started in the previous step.
+   *   <li> We inspect the state of the zNode corresponding to the Workflow resource in Zookeeper to ensure that its {@link org.apache.helix.task.TargetState}
+   *   is STOP. </li>
+   *   <li> Once the cancelled job from the previous steps is completed, the job will be re-launched for execution by the GobblinHelixJobScheduler.
+   *   We confirm the execution by again inspecting the zNode and ensuring its TargetState is START. </li>
+   * </ul>
+   */
+  @Test (dependsOnMethods = { "testJobShouldGetCancelled" })
+  public void testJobRestartViaSpec() throws Exception {
+    this.suite = new IntegrationJobRestartViaSpecSuite();
+    HelixManager helixManager = getHelixManager();
+
+    IntegrationJobRestartViaSpecSuite restartViaSpecSuite = (IntegrationJobRestartViaSpecSuite) this.suite;
+
+    //Add a new JobSpec to the path monitored by the SpecConsumer
+    restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID, SpecExecutor.Verb.ADD.name());
+
+    //Start the cluster
+    restartViaSpecSuite.startCluster();
+
+    helixManager.connect();
+
+    AssertWithBackoff.create().timeoutMs(30000).maxSleepMs(1000).backoffFactor(1).
+        assertTrue(isTaskStarted(helixManager, IntegrationJobRestartViaSpecSuite.JOB_ID), "Waiting for the job to start...");
+
+    AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
+        assertTrue(isTaskRunning(IntegrationJobRestartViaSpecSuite.TASK_STATE_FILE), "Waiting for the task to enter running state");
+
+    ZkClient zkClient = new ZkClient(this.zkConnectString);
+    PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
+    zkClient.setZkSerializer(zkSerializer);
+
+    String clusterName = getHelixManager().getClusterName();
+    String zNodePath = Paths.get("/", clusterName, "CONFIGS", "RESOURCE", IntegrationJobRestartViaSpecSuite.JOB_ID).toString();
+
+    //Ensure that the Workflow is started
+    ZNRecord record = zkClient.readData(zNodePath);
+    String targetState = record.getSimpleField("TargetState");
+    Assert.assertEquals(targetState, TargetState.START.name());
+
+    //Add a JobSpec with UPDATE verb signalling the Helix cluster to restart the workflow
+    restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID, SpecExecutor.Verb.UPDATE.name());
+
+    AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(5000).backoffFactor(1).assertTrue(input -> {
+      //Inspect the zNode at the path corresponding to the Workflow resource. Ensure the target state of the resource is in
+      // the STOP state or that the zNode has been deleted.
+      ZNRecord recordNew = zkClient.readData(zNodePath, true);
+      String targetStateNew = null;
+      if (recordNew != null) {
+        targetStateNew = recordNew.getSimpleField("TargetState");
+      }
+      return recordNew == null || targetStateNew.equals(TargetState.STOP.name());
+    }, "Waiting for Workflow TargetState to be STOP");
+
+    //Ensure that the SleepingTask did not terminate normally i.e. it was interrupted. We check this by ensuring
+    // that the line "Hello World!" is not present in the logged output.
+    suite.waitForAndVerifyOutputFiles();
+
+    AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(120000).backoffFactor(1).assertTrue(input -> {
+      //Inspect the zNode at the path corresponding to the Workflow resource. Ensure the target state of the resource is in
+      // the START state.
+      ZNRecord recordNew = zkClient.readData(zNodePath, true);
+      String targetStateNew = null;
+      if (recordNew != null) {
+        targetStateNew = recordNew.getSimpleField("TargetState");
+        return targetStateNew.equals(TargetState.START.name());
+      }
+      return false;
+    }, "Waiting for Workflow TargetState to be START");
+  }
+
+  private Predicate<Void> isTaskStarted(HelixManager helixManager, String jobId) {
+    return input -> TaskDriver.getWorkflowContext(helixManager, jobId) != null;
+  }
+
+  private Predicate<Void> isTaskRunning(String taskStateFileName) {
+    return input -> {
+      File taskStateFile = new File(taskStateFileName);
+      return taskStateFile.exists();
+    };
+  }
+
   @Test
   public void testSeparateProcessMode()
       throws Exception {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
index 7961bf8..62b3634 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
@@ -26,11 +26,13 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.SleepingTask;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 
 
 public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
   public static final String JOB_ID = "job_HelloWorldTestJob_1234";
+  public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelSuite/taskState/_RUNNING";
 
   @Override
   protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
@@ -38,7 +40,7 @@ public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
         ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
         ConfigurationKeys.JOB_ID_KEY, JOB_ID,
         GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
-        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L))
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
         .withFallback(rawJobConfig);
     return ImmutableMap.of("HelloWorldJob", newConfig);
   }
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
new file mode 100644
index 0000000..eb47536
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.suite;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigSyntax;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.cluster.FsScheduledJobConfigurationManager;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.SleepingTask;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite {
+  public static final String JOB_ID = "job_HelloWorldTestJob_1235";
+  public static final String JOB_CATALOG_DIR = "/tmp/IntegrationJobCancelViaSpecSuite/jobCatalog";
+  public static final String FS_SPEC_CONSUMER_DIR = "/tmp/IntegrationJobCancelViaSpecSuite/jobSpecs";
+  public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelViaSpecSuite/taskState/_RUNNING";
+
+  public IntegrationJobRestartViaSpecSuite() throws IOException {
+    super();
+    Path jobCatalogDirPath = new Path(JOB_CATALOG_DIR);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    if (!fs.exists(jobCatalogDirPath)) {
+      fs.mkdirs(jobCatalogDirPath);
+    }
+  }
+
+  private Map<String,String> getJobConfig() throws IOException {
+    try (InputStream resourceStream = Resources.getResource(JOB_CONF_NAME).openStream()) {
+      Reader reader = new InputStreamReader(resourceStream);
+      Config rawJobConfig =
+          ConfigFactory.parseReader(reader, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF));
+      rawJobConfig = rawJobConfig.withFallback(getClusterConfig());
+
+      Config newConfig = ConfigFactory.parseMap(ImmutableMap
+          .of(ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
+              ConfigurationKeys.JOB_ID_KEY, JOB_ID,
+              GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
+              GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 100L,
+              ConfigurationKeys.JOB_NAME_KEY, JOB_ID));
+
+      newConfig = newConfig.withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(TASK_STATE_FILE));
+      newConfig = newConfig.withFallback(rawJobConfig);
+
+      Properties jobProperties = ConfigUtils.configToProperties(newConfig);
+      Map<String, String> jobPropertiesAsMap = new HashMap<>();
+      for (String name : jobProperties.stringPropertyNames()) {
+        jobPropertiesAsMap.put(name, jobProperties.getProperty(name));
+      }
+      return jobPropertiesAsMap;
+    }
+  }
+
+  @Override
+  public Config getManagerConfig() {
+    Config managerConfig = super.getManagerConfig();
+    managerConfig = managerConfig.withValue(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY,
+        ConfigValueFactory.fromAnyRef(FsScheduledJobConfigurationManager.class.getName()))
+        .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            ConfigValueFactory.fromAnyRef(JOB_CATALOG_DIR))
+    .withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
+        .withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, ConfigValueFactory.fromAnyRef(5L))
+    .withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR));
+    return managerConfig;
+  }
+
+  public void addJobSpec(String jobSpecName, String verb) throws IOException {
+    Map<String, String> metadataMap = new HashMap<>();
+    metadataMap.put(FsSpecConsumer.VERB_KEY, verb);
+
+    Map<String, String> jobProperties = new HashMap<>();
+    if (SpecExecutor.Verb.ADD.name().equals(verb)) {
+      jobProperties = getJobConfig();
+    } else if (SpecExecutor.Verb.DELETE.name().equals(verb)) {
+      jobProperties.put(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    } else if (SpecExecutor.Verb.UPDATE.name().equals(verb)) {
+      jobProperties = getJobConfig();
+      jobProperties.put(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    }
+
+    AvroJobSpec jobSpec = AvroJobSpec.newBuilder().
+        setUri(Files.getNameWithoutExtension(jobSpecName)).
+        setProperties(jobProperties).
+        setTemplateUri("FS:///").
+        setDescription("HelloWorldTestJob").
+        setVersion("1").
+        setMetadata(metadataMap).build();
+
+    DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
+    DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
+
+    Path fsSpecConsumerPath = new Path(FS_SPEC_CONSUMER_DIR, jobSpecName);
+    FileSystem fs = fsSpecConsumerPath.getFileSystem(new Configuration());
+    OutputStream out = fs.create(fsSpecConsumerPath);
+
+    dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
+    dataFileWriter.append(jobSpec);
+    dataFileWriter.close();
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
index 58a2f99..a18be23 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -62,7 +64,9 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
     }
   }
 
-  /** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}. */
+  /** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}.
+   * The {@link Spec}s are returned in the increasing order of their modification times.
+   */
   @Override
   public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
     List<Pair<SpecExecutor.Verb, Spec>> specList = new ArrayList<>();
@@ -74,6 +78,11 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
       return null;
     }
 
+    //Sort the {@link JobSpec}s in increasing order of their modification times.
+    //This is done so that the {JobSpec}s can be handled in FIFO order by the
+    //JobConfigurationManager and eventually, the GobblinHelixJobScheduler.
+    Arrays.sort(fileStatuses, Comparator.comparingLong(FileStatus::getModificationTime));
+
     for (FileStatus fileStatus : fileStatuses) {
       DataFileReader<AvroJobSpec> dataFileReader;
       try {