You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/08/30 01:04:01 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1252] Provide a default flow SLA for Gobblin Service flows[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 05f5640  [GOBBLIN-1252] Provide a default flow SLA for Gobblin Service flows[]
05f5640 is described below

commit 05f5640d557c4d6c4a9ddf4d986dcad6563e7a2f
Author: sv2000 <su...@gmail.com>
AuthorDate: Sat Aug 29 18:03:54 2020 -0700

    [GOBBLIN-1252] Provide a default flow SLA for Gobblin Service flows[]
    
    Closes #3093 from sv2000/flowSla
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  4 ++
 .../service/modules/orchestration/DagManager.java  |  2 +-
 .../modules/orchestration/DagManagerUtils.java     |  6 +--
 .../modules/orchestration/Orchestrator.java        |  6 ++-
 .../modules/orchestration/DagManagerFlowTest.java  | 44 +++++++++++-----------
 5 files changed, 36 insertions(+), 26 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 9b458ee..e161b78 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -118,4 +118,8 @@ public class ServiceConfigKeys {
 
   // Prefix for config to ServiceBasedAppLauncher that will only be used by GaaS and not orchestrated jobs
   public static final String GOBBLIN_SERVICE_APP_LAUNCHER_PREFIX = "gobblinServiceAppLauncher";
+
+  //Flow concurrency config key to control default service behavior.
+  public static final String FLOW_CONCURRENCY_ALLOWED = GOBBLIN_SERVICE_PREFIX + "flowConcurrencyAllowed";
+  public static final Boolean DEFAULT_FLOW_CONCURRENCY_ALLOWED = true;
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index db1145a..906251d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -686,7 +686,7 @@ public class DagManager extends AbstractIdleService {
         dagToSLA.put(dagId, flowSla);
       }
 
-      if (flowSla != DagManagerUtils.NO_SLA && currentTime > flowStartTime + flowSla) {
+      if (currentTime > flowStartTime + flowSla) {
         log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
             node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), flowSla,
             node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 8ca8966..248fe4d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -45,7 +45,7 @@ import org.apache.gobblin.util.ConfigUtils;
 
 
 public class DagManagerUtils {
-  static long NO_SLA = -1L;
+  static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
   static String QUOTA_KEY_SEPERATOR = ",";
 
   static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
@@ -255,7 +255,7 @@ public class DagManagerUtils {
    * get the sla from the dag node config.
    * if time unit is not provided, it assumes time unit is minute.
    * @param dagNode dag node for which sla is to be retrieved
-   * @return sla if it is provided, {@value NO_SLA} otherwise
+   * @return sla if it is provided, DEFAULT_FLOW_SLA_MILLIS otherwise
    */
   static long getFlowSLA(DagNode<JobExecutionPlan> dagNode) {
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
@@ -264,7 +264,7 @@ public class DagManagerUtils {
 
     return jobConfig.hasPath(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME)
         ? slaTimeUnit.toMillis(jobConfig.getLong(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME))
-        : NO_SLA;
+        : DEFAULT_FLOW_SLA_MILLIS;
   }
 
   /**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ecd16de..ae0daea 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -90,6 +90,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   protected final MetricContext metricContext;
 
   protected final Optional<EventSubmitter> eventSubmitter;
+  private final boolean flowConcurrencyFlag;
   @Getter
   private Optional<Meter> flowOrchestrationSuccessFulMeter;
   @Getter
@@ -141,6 +142,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
       this.flowOrchestrationTimer = Optional.absent();
       this.eventSubmitter = Optional.absent();
     }
+    this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+        ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
   }
 
   public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) {
@@ -237,7 +240,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
 
       //If the FlowSpec disallows concurrent executions, then check if another instance of the flow is already
       //running. If so, return immediately.
-      boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, true);
+      boolean allowConcurrentExecution = ConfigUtils
+          .getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
 
       if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
         _log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 85b8bca..ecd90fa 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -70,9 +70,13 @@ public class DagManagerFlowTest {
 
   @Test
   void testAddDeleteSpec() throws Exception {
-    Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("0", 123456780L, "FINISH_RUNNING", 1);
-    Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", 123456781L, "FINISH_RUNNING", 1);
-    Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", 123456782L, "FINISH_RUNNING", 1);
+    Long flowExecutionId1 = System.currentTimeMillis();
+    Long flowExecutionId2 = flowExecutionId1 + 1;
+    Long flowExecutionId3 = flowExecutionId1 + 2;
+
+    Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("0", flowExecutionId1, "FINISH_RUNNING", 1);
+    Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", flowExecutionId2, "FINISH_RUNNING", 1);
+    Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", flowExecutionId3, "FINISH_RUNNING", 1);
 
     String dagId1 = DagManagerUtils.generateDagId(dag1);
     String dagId2 = DagManagerUtils.generateDagId(dag2);
@@ -83,11 +87,11 @@ public class DagManagerFlowTest {
     int queue3 = DagManagerUtils.getDagQueueId(dag3, dagNumThreads);
 
     when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow0"), eq("group0"), anyInt()))
-        .thenReturn(Collections.singletonList(123456780L));
+        .thenReturn(Collections.singletonList(flowExecutionId1));
     when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow1"), eq("group1"), anyInt()))
-        .thenReturn(Collections.singletonList(123456781L));
+        .thenReturn(Collections.singletonList(flowExecutionId2));
     when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow2"), eq("group2"), anyInt()))
-        .thenReturn(Collections.singletonList(123456782L));
+        .thenReturn(Collections.singletonList(flowExecutionId3));
 
     // mock add spec
     dagManager.addDag(dag1, true);
@@ -113,20 +117,20 @@ public class DagManagerFlowTest {
     AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag3), ERROR_MESSAGE);
 
     // mock flow cancellation tracking event
-    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", 123456780L,
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", flowExecutionId1,
         "group0", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
         .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0", "group0",
-        123456780L, "job0", "group0");
+        flowExecutionId1, "job0", "group0");
 
-    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1", 123456781L,
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1", flowExecutionId2,
         "group1", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
         .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1", "group1",
-        123456781L, "job0", "group1");
+        flowExecutionId2, "job0", "group1");
 
-    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2", 123456782L,
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2", flowExecutionId3,
         "group2", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
         .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2", "group2",
-        123456782L, "job0", "group2");
+        flowExecutionId3, "job0", "group2");
 
     // check removal of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -159,7 +163,7 @@ public class DagManagerFlowTest {
         assertTrue(input -> dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
 
     // check the SLA value
-    Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), DagManagerUtils.NO_SLA);
+    Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
 
     // verify deleteSpec() of the specProducer is not called once
     // which means job cancellation was triggered
@@ -216,13 +220,11 @@ public class DagManagerFlowTest {
 
   @Test()
   void testOrphanFlowKill() throws Exception {
-    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", 234567891L, "FINISH_RUNNING", 1);
+    Long flowExecutionId = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10);
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", flowExecutionId, "FINISH_RUNNING", 1);
     String dagId = DagManagerUtils.generateDagId(dag);
     int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
 
-    when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow4"), eq("group4"), anyInt()))
-        .thenReturn(Collections.singletonList(234567891L));
-
     // change config to set a small sla
     Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
     jobConfig = jobConfig
@@ -237,10 +239,10 @@ public class DagManagerFlowTest {
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
         assertTrue(input -> dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
 
-    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow6", "group6", 234567891L,
-        "group0", "job0", String.valueOf(ExecutionStatus.ORCHESTRATED)))
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow6", "group6", flowExecutionId,
+        "group6", "job0", String.valueOf(ExecutionStatus.ORCHESTRATED)))
         .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow6", "group6",
-        234567891L, "job0", "group6");
+        flowExecutionId, "job0", "group6");
 
     // check existence of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -258,7 +260,7 @@ public class DagManagerFlowTest {
   @Test
   void slaConfigCheck() throws Exception {
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("5", 123456783L, "FINISH_RUNNING", 1);
-    Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), -1L);
+    Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
 
     Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
     jobConfig = jobConfig