You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/08/30 01:04:01 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1252] Provide a
default flow SLA for Gobblin Service flows[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 05f5640 [GOBBLIN-1252] Provide a default flow SLA for Gobblin Service flows[]
05f5640 is described below
commit 05f5640d557c4d6c4a9ddf4d986dcad6563e7a2f
Author: sv2000 <su...@gmail.com>
AuthorDate: Sat Aug 29 18:03:54 2020 -0700
[GOBBLIN-1252] Provide a default flow SLA for Gobblin Service flows[]
Closes #3093 from sv2000/flowSla
---
.../apache/gobblin/service/ServiceConfigKeys.java | 4 ++
.../service/modules/orchestration/DagManager.java | 2 +-
.../modules/orchestration/DagManagerUtils.java | 6 +--
.../modules/orchestration/Orchestrator.java | 6 ++-
.../modules/orchestration/DagManagerFlowTest.java | 44 +++++++++++-----------
5 files changed, 36 insertions(+), 26 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 9b458ee..e161b78 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -118,4 +118,8 @@ public class ServiceConfigKeys {
// Prefix for config to ServiceBasedAppLauncher that will only be used by GaaS and not orchestrated jobs
public static final String GOBBLIN_SERVICE_APP_LAUNCHER_PREFIX = "gobblinServiceAppLauncher";
+
+ //Flow concurrency config key to control default service behavior.
+ public static final String FLOW_CONCURRENCY_ALLOWED = GOBBLIN_SERVICE_PREFIX + "flowConcurrencyAllowed";
+ public static final Boolean DEFAULT_FLOW_CONCURRENCY_ALLOWED = true;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index db1145a..906251d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -686,7 +686,7 @@ public class DagManager extends AbstractIdleService {
dagToSLA.put(dagId, flowSla);
}
- if (flowSla != DagManagerUtils.NO_SLA && currentTime > flowStartTime + flowSla) {
+ if (currentTime > flowStartTime + flowSla) {
log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), flowSla,
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 8ca8966..248fe4d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -45,7 +45,7 @@ import org.apache.gobblin.util.ConfigUtils;
public class DagManagerUtils {
- static long NO_SLA = -1L;
+ static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
static String QUOTA_KEY_SEPERATOR = ",";
static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
@@ -255,7 +255,7 @@ public class DagManagerUtils {
* get the sla from the dag node config.
* if time unit is not provided, it assumes time unit is minute.
* @param dagNode dag node for which sla is to be retrieved
- * @return sla if it is provided, {@value NO_SLA} otherwise
+ * @return sla if it is provided, DEFAULT_FLOW_SLA_MILLIS otherwise
*/
static long getFlowSLA(DagNode<JobExecutionPlan> dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
@@ -264,7 +264,7 @@ public class DagManagerUtils {
return jobConfig.hasPath(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME)
? slaTimeUnit.toMillis(jobConfig.getLong(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME))
- : NO_SLA;
+ : DEFAULT_FLOW_SLA_MILLIS;
}
/**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ecd16de..ae0daea 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -90,6 +90,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
protected final MetricContext metricContext;
protected final Optional<EventSubmitter> eventSubmitter;
+ private final boolean flowConcurrencyFlag;
@Getter
private Optional<Meter> flowOrchestrationSuccessFulMeter;
@Getter
@@ -141,6 +142,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
this.flowOrchestrationTimer = Optional.absent();
this.eventSubmitter = Optional.absent();
}
+ this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+ ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
}
public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) {
@@ -237,7 +240,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
//If the FlowSpec disallows concurrent executions, then check if another instance of the flow is already
//running. If so, return immediately.
- boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, true);
+ boolean allowConcurrentExecution = ConfigUtils
+ .getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
_log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 85b8bca..ecd90fa 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -70,9 +70,13 @@ public class DagManagerFlowTest {
@Test
void testAddDeleteSpec() throws Exception {
- Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("0", 123456780L, "FINISH_RUNNING", 1);
- Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", 123456781L, "FINISH_RUNNING", 1);
- Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", 123456782L, "FINISH_RUNNING", 1);
+ Long flowExecutionId1 = System.currentTimeMillis();
+ Long flowExecutionId2 = flowExecutionId1 + 1;
+ Long flowExecutionId3 = flowExecutionId1 + 2;
+
+ Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("0", flowExecutionId1, "FINISH_RUNNING", 1);
+ Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", flowExecutionId2, "FINISH_RUNNING", 1);
+ Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", flowExecutionId3, "FINISH_RUNNING", 1);
String dagId1 = DagManagerUtils.generateDagId(dag1);
String dagId2 = DagManagerUtils.generateDagId(dag2);
@@ -83,11 +87,11 @@ public class DagManagerFlowTest {
int queue3 = DagManagerUtils.getDagQueueId(dag3, dagNumThreads);
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow0"), eq("group0"), anyInt()))
- .thenReturn(Collections.singletonList(123456780L));
+ .thenReturn(Collections.singletonList(flowExecutionId1));
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow1"), eq("group1"), anyInt()))
- .thenReturn(Collections.singletonList(123456781L));
+ .thenReturn(Collections.singletonList(flowExecutionId2));
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow2"), eq("group2"), anyInt()))
- .thenReturn(Collections.singletonList(123456782L));
+ .thenReturn(Collections.singletonList(flowExecutionId3));
// mock add spec
dagManager.addDag(dag1, true);
@@ -113,20 +117,20 @@ public class DagManagerFlowTest {
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag3), ERROR_MESSAGE);
// mock flow cancellation tracking event
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", 123456780L,
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", flowExecutionId1,
"group0", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0", "group0",
- 123456780L, "job0", "group0");
+ flowExecutionId1, "job0", "group0");
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1", 123456781L,
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1", flowExecutionId2,
"group1", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1", "group1",
- 123456781L, "job0", "group1");
+ flowExecutionId2, "job0", "group1");
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2", 123456782L,
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2", flowExecutionId3,
"group2", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2", "group2",
- 123456782L, "job0", "group2");
+ flowExecutionId3, "job0", "group2");
// check removal of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -159,7 +163,7 @@ public class DagManagerFlowTest {
assertTrue(input -> dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
// check the SLA value
- Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), DagManagerUtils.NO_SLA);
+ Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
// verify deleteSpec() of the specProducer is not called once
// which means job cancellation was triggered
@@ -216,13 +220,11 @@ public class DagManagerFlowTest {
@Test()
void testOrphanFlowKill() throws Exception {
- Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", 234567891L, "FINISH_RUNNING", 1);
+ Long flowExecutionId = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10);
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", flowExecutionId, "FINISH_RUNNING", 1);
String dagId = DagManagerUtils.generateDagId(dag);
int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
- when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow4"), eq("group4"), anyInt()))
- .thenReturn(Collections.singletonList(234567891L));
-
// change config to set a small sla
Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
jobConfig = jobConfig
@@ -237,10 +239,10 @@ public class DagManagerFlowTest {
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
assertTrue(input -> dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow6", "group6", 234567891L,
- "group0", "job0", String.valueOf(ExecutionStatus.ORCHESTRATED)))
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow6", "group6", flowExecutionId,
+ "group6", "job0", String.valueOf(ExecutionStatus.ORCHESTRATED)))
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow6", "group6",
- 234567891L, "job0", "group6");
+ flowExecutionId, "job0", "group6");
// check existence of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -258,7 +260,7 @@ public class DagManagerFlowTest {
@Test
void slaConfigCheck() throws Exception {
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("5", 123456783L, "FINISH_RUNNING", 1);
- Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), -1L);
+ Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
jobConfig = jobConfig