You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/14 04:01:45 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-847] Flow level sla

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8006ac9  [GOBBLIN-847] Flow level sla
8006ac9 is described below

commit 8006ac900554df0a33ba1c1aac9205ef30841d17
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Aug 13 21:01:34 2019 -0700

    [GOBBLIN-847] Flow level sla
    
    Closes #2702 from arjun4084346/flowLevelSla
---
 .../gobblin/configuration/ConfigurationKeys.java   |   8 +
 .../apache/gobblin/metrics/event/TimingEvent.java  |   3 +
 .../spec_executorInstance/MockedSpecExecutor.java  |   2 +
 .../service/modules/orchestration/DagManager.java  | 167 +++++++++++++++-----
 .../modules/orchestration/DagManagerUtils.java     |  60 ++++++-
 .../modules/orchestration/DagManagerFlowTest.java  | 172 ++++++++++++++++++---
 6 files changed, 342 insertions(+), 70 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c438e77..f873a0d 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -890,6 +890,9 @@ public class ConfigurationKeys {
    * Configuration properties related to Flows
    */
   public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
+  public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
+  public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = "gobblin.flow.sla.timeunit";
+  public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
 
   /***
    * Configuration properties related to TopologySpec Store
@@ -905,6 +908,11 @@ public class ConfigurationKeys {
   public static final String SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY = "specExecInstance.capabilities";
 
   /***
+   * Configuration properties related to Spec Producer
+   */
+  public static final String SPEC_PRODUCER_SERIALIZED_FUTURE = "specProducer.serialized.future";
+
+  /***
    * Configuration properties related to Compaction Suite
    */
   public static final String COMPACTION_PREFIX = "compaction.";
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index db30229..044eba2 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -67,6 +67,9 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
   public static class FlowTimings {
     public static final String FLOW_COMPILED = "FlowCompiled";
     public static final String FLOW_COMPILE_FAILED = "FlowCompileFailed";
+    public static final String FLOW_CANCEL = "FlowCancelled";
+    public static final String FLOW_SUCCEEDED = "FlowSucceeded";
+    public static final String FLOW_FAILED = "FlowFailed";
   }
 
   public static class FlowEventConstants {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 10f7786..4a31a6e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -43,6 +43,8 @@ public class MockedSpecExecutor extends InMemorySpecExecutor {
     super(config);
     this.mockedSpecProducer = Mockito.mock(SpecProducer.class);
     when(mockedSpecProducer.addSpec(any())).thenReturn(new CompletedFuture(Boolean.TRUE, null));
+    when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn("");
+    when(mockedSpecProducer.deserializeAddSpecResponse(any())).thenReturn(new CompletedFuture(Boolean.TRUE, null));
     }
 
   public static SpecExecutor createDummySpecExecutor(URI uri) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index ab4f73f..a8c2d19 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -75,6 +74,7 @@ import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
 import static org.apache.gobblin.service.ExecutionStatus.COMPLETE;
 import static org.apache.gobblin.service.ExecutionStatus.FAILED;
 import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
@@ -98,7 +98,7 @@ import static org.apache.gobblin.service.ExecutionStatus.valueOf;
  *
  * For deleteSpec/cancellation requests for a flow URI, {@link DagManager} finds out the flowExecutionId using
  * {@link JobStatusRetriever}, and forwards the request to the {@link DagManagerThread} which handled the addSpec request
- * for this flow. We need separate {@value queue} and {@value cancelQueue} for each {@link DagManagerThread} because
+ * for this flow. We need separate {@link BlockingQueue}s for each {@link DagManagerThread} because
  * cancellation needs the information which is stored only in the same {@link DagManagerThread}.
  *
  * The {@link DagManager} is active only in the leader mode. To ensure, each {@link Dag} managed by a {@link DagManager} is
@@ -157,6 +157,7 @@ public class DagManager extends AbstractIdleService {
   private DagStateStore dagStateStore;
   private Map<URI, TopologySpec> topologySpecMap;
 
+  @Getter
   private final Integer numThreads;
   private final Integer pollingInterval;
   @Getter
@@ -238,8 +239,7 @@ public class DagManager extends AbstractIdleService {
   synchronized void addDag(Dag<JobExecutionPlan> dag) throws IOException {
     //Persist the dag
     this.dagStateStore.writeCheckpoint(dag);
-    long flowExecutionId = DagManagerUtils.getFlowExecId(dag);
-    int queueId = (int) (flowExecutionId % this.numThreads);
+    int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
     // Add the dag to the specific queue determined by flowExecutionId
     // Flow cancellation request has to be forwarded to the same DagManagerThread where the
     // flow create request was forwarded. This is because Azkaban Exec Id is stored in the DagNode of the
@@ -273,7 +273,7 @@ public class DagManager extends AbstractIdleService {
     log.info("Found {} flows to cancel.", flowExecutionIds.size());
 
     for (long flowExecutionId : flowExecutionIds) {
-      int queueId = (int) (flowExecutionId % this.numThreads);
+      int queueId =  DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
       String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
       if (!this.cancelQueue[queueId].offer(dagId)) {
         throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
@@ -347,7 +347,9 @@ public class DagManager extends AbstractIdleService {
   public static class DagManagerThread implements Runnable {
     private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
     private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
+    // dagToJobs holds a map of dagId to running jobs of that dag
     final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
+    final Map<String, Long> dagToSLA = new HashMap<>();
     private final Set<String> failedDagIdsFinishRunning = new HashSet<>();
     private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
     private final MetricContext metricContext;
@@ -382,6 +384,7 @@ public class DagManager extends AbstractIdleService {
     /**
      * Main body of the {@link DagManagerThread}. Deque the next item from the queue and poll job statuses of currently
      * running jobs.
+     * Because this thread runs in a regular interval, we should avoid doing repetitive work inside it.
      */
     @Override
     public void run() {
@@ -414,27 +417,41 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
+    /**
+     * Cancels the dag and sends a cancellation tracking event.
+     * @param dagToCancel dag node to cancel
+     * @throws ExecutionException executionException
+     * @throws InterruptedException interruptedException
+     */
     private void cancelDag(String dagToCancel) throws ExecutionException, InterruptedException {
       log.info("Cancel flow with DagId {}", dagToCancel);
       if (this.dagToJobs.containsKey(dagToCancel)) {
         List<DagNode<JobExecutionPlan>> dagNodesToCancel = this.dagToJobs.get(dagToCancel);
         log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
         for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
-          Properties props = new Properties();
-          if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
-            Future future = dagNodeToCancel.getValue().getJobFuture().get();
-            if (future instanceof CompletableFuture &&
-              future.get() instanceof AzkabanExecuteFlowStatus.ExecuteId) {
-              CompletableFuture<AzkabanExecuteFlowStatus.ExecuteId> completableFuture = (CompletableFuture) future;
-              String azkabanExecId = completableFuture.get().getExecId();
-              props.put(ConfigurationKeys.AZKABAN_EXEC_ID, azkabanExecId);
-              log.info("Cancel job with azkaban exec id {}.", azkabanExecId);
-            }
-          }
-          DagManagerUtils.getSpecProducer(dagNodeToCancel).deleteSpec(null, props);
+          cancelDagNode(dagNodeToCancel);
         }
       } else {
-        log.warn("Did not find Dag with id {}, it might be already cancelled.", dagToCancel);
+        log.warn("Did not find Dag with id {}, it might be already cancelled/finished.", dagToCancel);
+      }
+    }
+
+    private void cancelDagNode(DagNode<JobExecutionPlan> dagNodeToCancel) throws ExecutionException, InterruptedException {
+      Properties props = new Properties();
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
+        sendCancellationEvent(dagNodeToCancel.getValue());
+      }
+      DagManagerUtils.getSpecProducer(dagNodeToCancel).deleteSpec(null, props);
+    }
+
+    private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+      if (this.eventSubmitter.isPresent()) {
+        Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+        this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+        jobExecutionPlan.setExecutionStatus(CANCELLED);
       }
     }
 
@@ -473,22 +490,20 @@ public class DagManager extends AbstractIdleService {
     /**
      * Proceed the execution of each dag node based on job status.
      */
-    private void pollAndAdvanceDag()
-        throws IOException {
+    private void pollAndAdvanceDag() throws IOException, ExecutionException, InterruptedException {
       this.failedDagIdsFinishRunning.clear();
-
       Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = Maps.newHashMap();
       List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
+
       for (DagNode<JobExecutionPlan> node: this.jobToDag.keySet()) {
-        long pollStartTime = System.nanoTime();
+        boolean slaKilled = slaKillIfNeeded(node);
+
         JobStatus jobStatus = pollJobStatus(node);
-        Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);
-        if (jobStatus == null) {
-          continue;
-        }
+
+        ExecutionStatus status = getJobExecutionStatus(slaKilled, jobStatus);
+
         JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(node);
 
-        ExecutionStatus status = valueOf(jobStatus.getEventName());
         switch (status) {
           case COMPLETE:
             jobExecutionPlan.setExecutionStatus(COMPLETE);
@@ -496,11 +511,15 @@ public class DagManager extends AbstractIdleService {
             nodesToCleanUp.add(node);
             break;
           case FAILED:
-          case CANCELLED:
             jobExecutionPlan.setExecutionStatus(FAILED);
             nextSubmitted.putAll(onJobFinish(node));
             nodesToCleanUp.add(node);
             break;
+          case CANCELLED:
+            jobExecutionPlan.setExecutionStatus(CANCELLED);
+            nextSubmitted.putAll(onJobFinish(node));
+            nodesToCleanUp.add(node);
+            break;
           case PENDING:
             jobExecutionPlan.setExecutionStatus(PENDING);
             break;
@@ -509,7 +528,7 @@ public class DagManager extends AbstractIdleService {
             break;
         }
 
-        if (jobStatus.isShouldRetry()) {
+        if (jobStatus != null && jobStatus.isShouldRetry()) {
           log.info("Retrying job: {}, current attempts: {}, max attempts: {}", DagManagerUtils.getFullyQualifiedJobName(node),
               jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
           submitJob(node);
@@ -519,17 +538,62 @@ public class DagManager extends AbstractIdleService {
       for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: nextSubmitted.entrySet()) {
         String dagId = entry.getKey();
         Set<DagNode<JobExecutionPlan>> dagNodes = entry.getValue();
-        for (DagNode dagNode: dagNodes) {
+        for (DagNode<JobExecutionPlan> dagNode: dagNodes) {
           addJobState(dagId, dagNode);
         }
       }
 
       for (DagNode<JobExecutionPlan> dagNode: nodesToCleanUp) {
-        String dagId = DagManagerUtils.generateDagId(this.jobToDag.get(dagNode));
+        String dagId = DagManagerUtils.generateDagId(dagNode);
         deleteJobState(dagId, dagNode);
       }
     }
 
+    private ExecutionStatus getJobExecutionStatus(boolean slaKilled, JobStatus jobStatus) {
+      if (slaKilled) {
+        return CANCELLED;
+      } else {
+        if (jobStatus == null) {
+          return PENDING;
+        } else {
+          return valueOf(jobStatus.getEventName());
+        }
+      }
+    }
+
+    /**
+     * Check if the SLA is configured for the flow this job belongs to.
+     * If it is, this method will try to cancel the job when SLA is reached.
+     *
+     * @param node dag node of the job
+     * @return true if the job is killed because it reached sla
+     * @throws ExecutionException exception
+     * @throws InterruptedException exception
+     */
+    private boolean slaKillIfNeeded(DagNode<JobExecutionPlan> node) throws ExecutionException, InterruptedException {
+      long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+      long currentTime = System.currentTimeMillis();
+      String dagId = DagManagerUtils.generateDagId(node);
+
+      long flowSla;
+      if (dagToSLA.containsKey(dagId)) {
+        flowSla = dagToSLA.get(dagId);
+      } else {
+        flowSla = DagManagerUtils.getFlowSLA(node);
+        dagToSLA.put(dagId, flowSla);
+      }
+
+      if (flowSla != DagManagerUtils.NO_SLA && currentTime > flowStartTime + flowSla) {
+        log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
+            node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+            node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY),
+            flowSla);
+        cancelDagNode(node);
+        return true;
+      }
+      return false;
+    }
+
     /**
      * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
      */
@@ -541,8 +605,11 @@ public class DagManager extends AbstractIdleService {
       String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
       String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
 
+      long pollStartTime = System.nanoTime();
       Iterator<JobStatus> jobStatusIterator =
           this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
+      Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);
+
       if (jobStatusIterator.hasNext()) {
         return jobStatusIterator.next();
       } else {
@@ -620,6 +687,9 @@ public class DagManager extends AbstractIdleService {
     /**
      * Method that defines the actions to be performed when a job finishes either successfully or with failure.
      * This method updates the state of the dag and performs clean up actions as necessary.
+     * TODO : Dag should have a status field, like JobExecutionPlan has. This method should update that field,
+     *        which should be used by cleanup(). It may also remove the need of failedDagIdsFinishRunning,
+     *        failedDagIdsFinishAllPossible.
      */
     private Map<String, Set<DagNode<JobExecutionPlan>>> onJobFinish(DagNode<JobExecutionPlan> dagNode)
         throws IOException {
@@ -633,21 +703,28 @@ public class DagManager extends AbstractIdleService {
         getRunningJobsCounter(dagNode).dec();
       }
 
-      if (jobStatus == COMPLETE) {
-        return submitNext(dagId);
-      } else if (jobStatus == FAILED) {
-        if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
-          this.failedDagIdsFinishRunning.add(dagId);
-        } else {
-          this.failedDagIdsFinishAllPossible.add(dagId);
-        }
+      switch (jobStatus) {
+        // TODO : For now treat canceled as failed, till we introduce failure option - CANCEL
+        case CANCELLED:
+        case FAILED:
+          if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
+            this.failedDagIdsFinishRunning.add(dagId);
+          } else {
+            this.failedDagIdsFinishAllPossible.add(dagId);
+          }
+          return Maps.newHashMap();
+        case COMPLETE:
+          return submitNext(dagId);
+        default:
+          log.warn("It should not reach here. Job status is unexpected.");
+          return Maps.newHashMap();
       }
-      return Maps.newHashMap();
     }
 
     private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
       this.jobToDag.remove(dagNode);
       this.dagToJobs.get(dagId).remove(dagNode);
+      this.dagToSLA.remove(dagId);
     }
 
     private void addJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
@@ -688,18 +765,24 @@ public class DagManager extends AbstractIdleService {
           deleteJobState(dagId, dagNode);
         }
         log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
+        // send an event before cleaning up dag
+        JobExecutionPlan jobExecutionPlan = this.dags.get(dagId).getNodes().get(0).getValue();
+        DagManagerUtils.emitFlowEvent(this.eventSubmitter, jobExecutionPlan, TimingEvent.FlowTimings.FLOW_FAILED);
         dagIdstoClean.add(dagId);
       }
 
       //Clean up completed dags
       for (String dagId : this.dags.keySet()) {
         if (!hasRunningJobs(dagId) && !this.failedDagIdsFinishRunning.contains(dagId)) {
-          String status = "COMPLETE";
+          String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
           if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
-            status = "FAILED";
+            status = TimingEvent.FlowTimings.FLOW_FAILED;
             this.failedDagIdsFinishAllPossible.remove(dagId);
           }
           log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
+          // send an event before cleaning up dag
+          JobExecutionPlan jobExecutionPlan = this.dags.get(dagId).getNodes().get(0).getValue();
+          DagManagerUtils.emitFlowEvent(this.eventSubmitter, jobExecutionPlan, status);
           dagIdstoClean.add(dagId);
         }
       }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 46fc59a..5ab74e8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -19,14 +19,19 @@ package org.apache.gobblin.service.modules.orchestration;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
@@ -39,6 +44,7 @@ import org.apache.gobblin.util.ConfigUtils;
 
 
 public class DagManagerUtils {
+  static long NO_SLA = -1L;
 
   static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
     Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
@@ -48,8 +54,15 @@ public class DagManagerUtils {
   }
 
   static long getFlowExecId(Dag<JobExecutionPlan> dag) {
-    Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
-    return jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    return getFlowExecId(dag.getStartNodes().get(0));
+  }
+
+  static long getFlowExecId(DagNode<JobExecutionPlan> dagNode) {
+    return getFlowExecId(dagNode.getValue().getJobSpec());
+  }
+
+  static long getFlowExecId(JobSpec jobSpec) {
+    return jobSpec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
   }
 
   /**
@@ -188,7 +201,8 @@ public class DagManagerUtils {
       return null;
     }
     DagNode<JobExecutionPlan> dagNode = dag.getStartNodes().get(0);
-    String failureOption = ConfigUtils.getString(getJobConfig(dagNode), ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
+    String failureOption = ConfigUtils.getString(getJobConfig(dagNode),
+        ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
     return FailureOption.valueOf(failureOption);
   }
 
@@ -198,4 +212,44 @@ public class DagManagerUtils {
   static void incrementJobAttempt(DagNode<JobExecutionPlan> dagNode) {
     dagNode.getValue().setCurrentAttempts(dagNode.getValue().getCurrentAttempts() + 1);
   }
+
+  /**
+   * flow start time is assumed to be same the flow execution id which is timestamp flow request was received
+   * @param dagNode dag node in context
+   * @return flow execution id
+   */
+  static long getFlowStartTime(DagNode<JobExecutionPlan> dagNode) {
+    return getFlowExecId(dagNode);
+  }
+
+  /**
+   * get the sla from the dag node config.
+   * if time unit is not provided, it assumes time unit is minute.
+   * @param dagNode dag node for which sla is to be retrieved
+   * @return sla if it is provided, {@value NO_SLA} otherwise
+   */
+  static long getFlowSLA(DagNode<JobExecutionPlan> dagNode) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
+        jobConfig, ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT));
+
+    return jobConfig.hasPath(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME)
+        ? slaTimeUnit.toMillis(jobConfig.getLong(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME))
+        : NO_SLA;
+  }
+
+  static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
+    return getDagQueueId(DagManagerUtils.getFlowExecId(dag), numThreads);
+  }
+
+  static int getDagQueueId(long flowExecutionId, int numThreads) {
+    return (int) (flowExecutionId % numThreads);
+  }
+
+  static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter, JobExecutionPlan jobExecutionPlan, String flowEvent) {
+    if (eventSubmitter.isPresent()) {
+      Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+      eventSubmitter.get().getTimingEvent(flowEvent).stop(jobMetadata);
+    }
+  }
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 1300458..60ac979 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -22,16 +22,21 @@ import java.net.URI;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Predicate;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nullable;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
@@ -50,7 +55,9 @@ import static org.mockito.Mockito.when;
 
 
 public class DagManagerFlowTest {
-  DagManager dagManager;
+  MockedDagManager dagManager;
+  int dagNumThreads;
+  static final String ERROR_MESSAGE = "Waiting for the map to update";
 
   @BeforeClass
   public void setUp() {
@@ -58,6 +65,7 @@ public class DagManagerFlowTest {
     props.put(DagManager.JOB_STATUS_POLLING_INTERVAL_KEY, 1);
     dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), false);
     dagManager.setActive(true);
+    this.dagNumThreads = dagManager.getNumThreads();
   }
 
   @Test
@@ -66,6 +74,21 @@ public class DagManagerFlowTest {
     Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", 123456781L, "FINISH_RUNNING", 1);
     Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", 123456782L, "FINISH_RUNNING", 1);
 
+    String dagId1 = DagManagerUtils.generateDagId(dag1);
+    String dagId2 = DagManagerUtils.generateDagId(dag2);
+    String dagId3 = DagManagerUtils.generateDagId(dag3);
+
+    int queue1 = DagManagerUtils.getDagQueueId(dag1, dagNumThreads);
+    int queue2 = DagManagerUtils.getDagQueueId(dag2, dagNumThreads);
+    int queue3 = DagManagerUtils.getDagQueueId(dag3, dagNumThreads);
+
+    when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow0"), eq("group0"), anyInt()))
+        .thenReturn(Collections.singletonList(123456780L));
+    when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow1"), eq("group1"), anyInt()))
+        .thenReturn(Collections.singletonList(123456781L));
+    when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow2"), eq("group2"), anyInt()))
+        .thenReturn(Collections.singletonList(123456782L));
+
     // mock add spec
     dagManager.addDag(dag1);
     dagManager.addDag(dag2);
@@ -73,11 +96,11 @@ public class DagManagerFlowTest {
 
     // check existence of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
-        assertTrue(input -> dagManager.dagManagerThreads[0].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag1)), "Waiting for the map to update");
+        assertTrue(input -> dagManager.dagManagerThreads[queue1].dagToJobs.containsKey(dagId1), ERROR_MESSAGE);
     AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
-        assertTrue(input -> dagManager.dagManagerThreads[1].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag2)), "Waiting for the map to update");
+        assertTrue(input -> dagManager.dagManagerThreads[queue2].dagToJobs.containsKey(dagId2), ERROR_MESSAGE);
     AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
-        assertTrue(input -> dagManager.dagManagerThreads[2].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag3)), "Waiting for the map to update");
+        assertTrue(input -> dagManager.dagManagerThreads[queue3].dagToJobs.containsKey(dagId3), ERROR_MESSAGE);
 
     // mock delete spec
     dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group0").setFlowName("flow0")));
@@ -85,41 +108,142 @@ public class DagManagerFlowTest {
     dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group2").setFlowName("flow2")));
 
     // verify deleteSpec() of specProducer is called once
-    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag1), "Waiting for the map to update");
-    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag2), "Waiting for the map to update");
-    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag3), "Waiting for the map to update");
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag1), ERROR_MESSAGE);
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag2), ERROR_MESSAGE);
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag3), ERROR_MESSAGE);
 
     // mock flow cancellation tracking event
-    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", 123456780L, "group0", "job0", String.valueOf(
-        ExecutionStatus.CANCELLED)))
-        .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0", "group0", 123456780L, "job0", "group0");
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", 123456780L,
+        "group0", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
+        .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0", "group0",
+        123456780L, "job0", "group0");
 
-    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1", 123456781L, "group1", "job0", String.valueOf(
-        ExecutionStatus.CANCELLED)))
-        .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1", "group1", 123456781L, "job0", "group1");
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1", 123456781L,
+        "group1", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
+        .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1", "group1",
+        123456781L, "job0", "group1");
 
-    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2", 123456782L, "group2", "job0", String.valueOf(
-        ExecutionStatus.CANCELLED)))
-        .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2", "group2", 123456782L, "job0", "group2");
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2", 123456782L,
+        "group2", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
+        .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2", "group2",
+        123456782L, "job0", "group2");
 
     // check removal of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
-        assertTrue(input -> !dagManager.dagManagerThreads[0].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag1)), "Waiting for the map to update");
+        assertTrue(input -> !dagManager.dagManagerThreads[queue1].dagToJobs.containsKey(dagId1), ERROR_MESSAGE);
     AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
-        assertTrue(input -> !dagManager.dagManagerThreads[1].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag2)), "Waiting for the map to update");
+        assertTrue(input -> !dagManager.dagManagerThreads[queue2].dagToJobs.containsKey(dagId2), ERROR_MESSAGE);
     AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
-        assertTrue(input -> !dagManager.dagManagerThreads[2].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag3)), "Waiting for the map to update");
+        assertTrue(input -> !dagManager.dagManagerThreads[queue3].dagToJobs.containsKey(dagId3), ERROR_MESSAGE);
+  }
+
+  @Test
+  void testFlowSlaWithoutConfig() throws Exception {
+    long flowExecutionId = System.currentTimeMillis();
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("3", flowExecutionId, "FINISH_RUNNING", 1);
+    String dagId = DagManagerUtils.generateDagId(dag);
+    int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
+
+    when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow3"), eq("group3"), anyInt()))
+        .thenReturn(Collections.singletonList(flowExecutionId));
+
+    // mock add spec
+    dagManager.addDag(dag);
+
+    // check existence of dag in dagToJobs map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
+
+    // check existence of dag in dagToSLA map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
+
+    // check the SLA value
+    Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), DagManagerUtils.NO_SLA);
+
+    // verify deleteSpec() of the specProducer is not called once
+    // which means job cancellation was triggered
+    try {
+      AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+    } catch (TimeoutException e) {
+      AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+          assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
+      return;
+    }
+
+    Assert.fail("Job cancellation was not triggered.");
+  }
+
+  @Test()
+  void testFlowSlaWithConfig() throws Exception {
+    long flowExecutionId = System.currentTimeMillis();
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("4", flowExecutionId, "FINISH_RUNNING", 1);
+    String dagId = DagManagerUtils.generateDagId(dag);
+    int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
+
+    when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow4"), eq("group4"), anyInt()))
+        .thenReturn(Collections.singletonList(flowExecutionId));
+
+    // change config to set a small sla
+    Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+    jobConfig = jobConfig
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef("7"))
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+    dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+
+    // mock add spec
+    dagManager.addDag(dag);
+
+    // check existence of dag in dagToSLA map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
+
+    // check the SLA value
+    Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), TimeUnit.SECONDS.toMillis(7L));
+
+
+    // check existence of dag in dagToJobs map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
+
+    // verify deleteSpec() of specProducer is called once
+    // which means job cancellation was triggered
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+
+    // check removal of dag from dagToSLA map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> !dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
+
+  }
+
+  @Test
+  void slaConfigCheck() throws Exception {
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("5", 123456783L, "FINISH_RUNNING", 1);
+    Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), -1L);
+
+    Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+    jobConfig = jobConfig
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef("7"))
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+    dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+    Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), TimeUnit.SECONDS.toMillis(7L));
+
+    jobConfig = jobConfig
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef("8"))
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.name()));
+    dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+    Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), TimeUnit.MINUTES.toMillis(8L));
   }
 }
 
-class DeletePredicate implements Predicate {
+class DeletePredicate implements Predicate<Void> {
   private final Dag<JobExecutionPlan> dag;
   public DeletePredicate(Dag<JobExecutionPlan> dag) {
     this.dag = dag;
   }
 
   @Override
-  public boolean apply(@Nullable Object input) {
+  public boolean apply(@Nullable Void input) {
     try {
       verify(dag.getNodes().get(0).getValue().getSpecExecutor().getProducer().get()).deleteSpec(any(), any());
     } catch (Throwable e) {
@@ -138,10 +262,8 @@ class MockedDagManager extends DagManager {
   @Override
   JobStatusRetriever createJobStatusRetriever(Config config) {
     JobStatusRetriever mockedJbStatusRetriever = Mockito.mock(JobStatusRetriever.class);
-    Mockito.doReturn(Collections.emptyIterator()).when(mockedJbStatusRetriever).getJobStatusesForFlowExecution(anyString(), anyString(), anyLong(), anyString(), anyString());
-    when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow0"), eq("group0"), anyInt())).thenReturn(Collections.singletonList(123456780L));
-    when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow1"), eq("group1"), anyInt())).thenReturn(Collections.singletonList(123456781L));
-    when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow2"), eq("group2"), anyInt())).thenReturn(Collections.singletonList(123456782L));
+    Mockito.doReturn(Collections.emptyIterator()).when(mockedJbStatusRetriever).
+        getJobStatusesForFlowExecution(anyString(), anyString(), anyLong(), anyString(), anyString());
     return  mockedJbStatusRetriever;
   }