You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/14 04:01:45 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-847] Flow level
sla
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8006ac9 [GOBBLIN-847] Flow level sla
8006ac9 is described below
commit 8006ac900554df0a33ba1c1aac9205ef30841d17
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Aug 13 21:01:34 2019 -0700
[GOBBLIN-847] Flow level sla
Closes #2702 from arjun4084346/flowLevelSla
---
.../gobblin/configuration/ConfigurationKeys.java | 8 +
.../apache/gobblin/metrics/event/TimingEvent.java | 3 +
.../spec_executorInstance/MockedSpecExecutor.java | 2 +
.../service/modules/orchestration/DagManager.java | 167 +++++++++++++++-----
.../modules/orchestration/DagManagerUtils.java | 60 ++++++-
.../modules/orchestration/DagManagerFlowTest.java | 172 ++++++++++++++++++---
6 files changed, 342 insertions(+), 70 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c438e77..f873a0d 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -890,6 +890,9 @@ public class ConfigurationKeys {
* Configuration properties related to Flows
*/
public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
+ public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
+ public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = "gobblin.flow.sla.timeunit";
+ public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
/***
* Configuration properties related to TopologySpec Store
@@ -905,6 +908,11 @@ public class ConfigurationKeys {
public static final String SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY = "specExecInstance.capabilities";
/***
+ * Configuration properties related to Spec Producer
+ */
+ public static final String SPEC_PRODUCER_SERIALIZED_FUTURE = "specProducer.serialized.future";
+
+ /***
* Configuration properties related to Compaction Suite
*/
public static final String COMPACTION_PREFIX = "compaction.";
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index db30229..044eba2 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -67,6 +67,9 @@ public class TimingEvent extends GobblinEventBuilder implements Closeable {
public static class FlowTimings {
public static final String FLOW_COMPILED = "FlowCompiled";
public static final String FLOW_COMPILE_FAILED = "FlowCompileFailed";
+ public static final String FLOW_CANCEL = "FlowCancelled";
+ public static final String FLOW_SUCCEEDED = "FlowSucceeded";
+ public static final String FLOW_FAILED = "FlowFailed";
}
public static class FlowEventConstants {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 10f7786..4a31a6e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -43,6 +43,8 @@ public class MockedSpecExecutor extends InMemorySpecExecutor {
super(config);
this.mockedSpecProducer = Mockito.mock(SpecProducer.class);
when(mockedSpecProducer.addSpec(any())).thenReturn(new CompletedFuture(Boolean.TRUE, null));
+ when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn("");
+ when(mockedSpecProducer.deserializeAddSpecResponse(any())).thenReturn(new CompletedFuture(Boolean.TRUE, null));
}
public static SpecExecutor createDummySpecExecutor(URI uri) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index ab4f73f..a8c2d19 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -75,6 +74,7 @@ import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
import static org.apache.gobblin.service.ExecutionStatus.COMPLETE;
import static org.apache.gobblin.service.ExecutionStatus.FAILED;
import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
@@ -98,7 +98,7 @@ import static org.apache.gobblin.service.ExecutionStatus.valueOf;
*
* For deleteSpec/cancellation requests for a flow URI, {@link DagManager} finds out the flowExecutionId using
* {@link JobStatusRetriever}, and forwards the request to the {@link DagManagerThread} which handled the addSpec request
- * for this flow. We need separate {@value queue} and {@value cancelQueue} for each {@link DagManagerThread} because
+ * for this flow. We need separate {@link BlockingQueue}s for each {@link DagManagerThread} because
* cancellation needs the information which is stored only in the same {@link DagManagerThread}.
*
* The {@link DagManager} is active only in the leader mode. To ensure, each {@link Dag} managed by a {@link DagManager} is
@@ -157,6 +157,7 @@ public class DagManager extends AbstractIdleService {
private DagStateStore dagStateStore;
private Map<URI, TopologySpec> topologySpecMap;
+ @Getter
private final Integer numThreads;
private final Integer pollingInterval;
@Getter
@@ -238,8 +239,7 @@ public class DagManager extends AbstractIdleService {
synchronized void addDag(Dag<JobExecutionPlan> dag) throws IOException {
//Persist the dag
this.dagStateStore.writeCheckpoint(dag);
- long flowExecutionId = DagManagerUtils.getFlowExecId(dag);
- int queueId = (int) (flowExecutionId % this.numThreads);
+ int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
// Add the dag to the specific queue determined by flowExecutionId
// Flow cancellation request has to be forwarded to the same DagManagerThread where the
// flow create request was forwarded. This is because Azkaban Exec Id is stored in the DagNode of the
@@ -273,7 +273,7 @@ public class DagManager extends AbstractIdleService {
log.info("Found {} flows to cancel.", flowExecutionIds.size());
for (long flowExecutionId : flowExecutionIds) {
- int queueId = (int) (flowExecutionId % this.numThreads);
+ int queueId = DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
if (!this.cancelQueue[queueId].offer(dagId)) {
throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
@@ -347,7 +347,9 @@ public class DagManager extends AbstractIdleService {
public static class DagManagerThread implements Runnable {
private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
+ // dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
+ final Map<String, Long> dagToSLA = new HashMap<>();
private final Set<String> failedDagIdsFinishRunning = new HashSet<>();
private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
private final MetricContext metricContext;
@@ -382,6 +384,7 @@ public class DagManager extends AbstractIdleService {
/**
* Main body of the {@link DagManagerThread}. Deque the next item from the queue and poll job statuses of currently
* running jobs.
+ * Because this thread runs in a regular interval, we should avoid doing repetitive work inside it.
*/
@Override
public void run() {
@@ -414,27 +417,41 @@ public class DagManager extends AbstractIdleService {
}
}
+ /**
+ * Cancels the dag and sends a cancellation tracking event.
+ * @param dagToCancel dag node to cancel
+ * @throws ExecutionException executionException
+ * @throws InterruptedException interruptedException
+ */
private void cancelDag(String dagToCancel) throws ExecutionException, InterruptedException {
log.info("Cancel flow with DagId {}", dagToCancel);
if (this.dagToJobs.containsKey(dagToCancel)) {
List<DagNode<JobExecutionPlan>> dagNodesToCancel = this.dagToJobs.get(dagToCancel);
log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
- Properties props = new Properties();
- if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
- Future future = dagNodeToCancel.getValue().getJobFuture().get();
- if (future instanceof CompletableFuture &&
- future.get() instanceof AzkabanExecuteFlowStatus.ExecuteId) {
- CompletableFuture<AzkabanExecuteFlowStatus.ExecuteId> completableFuture = (CompletableFuture) future;
- String azkabanExecId = completableFuture.get().getExecId();
- props.put(ConfigurationKeys.AZKABAN_EXEC_ID, azkabanExecId);
- log.info("Cancel job with azkaban exec id {}.", azkabanExecId);
- }
- }
- DagManagerUtils.getSpecProducer(dagNodeToCancel).deleteSpec(null, props);
+ cancelDagNode(dagNodeToCancel);
}
} else {
- log.warn("Did not find Dag with id {}, it might be already cancelled.", dagToCancel);
+ log.warn("Did not find Dag with id {}, it might be already cancelled/finished.", dagToCancel);
+ }
+ }
+
+ private void cancelDagNode(DagNode<JobExecutionPlan> dagNodeToCancel) throws ExecutionException, InterruptedException {
+ Properties props = new Properties();
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ String serializedFuture = DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
+ sendCancellationEvent(dagNodeToCancel.getValue());
+ }
+ DagManagerUtils.getSpecProducer(dagNodeToCancel).deleteSpec(null, props);
+ }
+
+ private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+ if (this.eventSubmitter.isPresent()) {
+ Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+ this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
}
}
@@ -473,22 +490,20 @@ public class DagManager extends AbstractIdleService {
/**
* Proceed the execution of each dag node based on job status.
*/
- private void pollAndAdvanceDag()
- throws IOException {
+ private void pollAndAdvanceDag() throws IOException, ExecutionException, InterruptedException {
this.failedDagIdsFinishRunning.clear();
-
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = Maps.newHashMap();
List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
+
for (DagNode<JobExecutionPlan> node: this.jobToDag.keySet()) {
- long pollStartTime = System.nanoTime();
+ boolean slaKilled = slaKillIfNeeded(node);
+
JobStatus jobStatus = pollJobStatus(node);
- Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);
- if (jobStatus == null) {
- continue;
- }
+
+ ExecutionStatus status = getJobExecutionStatus(slaKilled, jobStatus);
+
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(node);
- ExecutionStatus status = valueOf(jobStatus.getEventName());
switch (status) {
case COMPLETE:
jobExecutionPlan.setExecutionStatus(COMPLETE);
@@ -496,11 +511,15 @@ public class DagManager extends AbstractIdleService {
nodesToCleanUp.add(node);
break;
case FAILED:
- case CANCELLED:
jobExecutionPlan.setExecutionStatus(FAILED);
nextSubmitted.putAll(onJobFinish(node));
nodesToCleanUp.add(node);
break;
+ case CANCELLED:
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
+ nextSubmitted.putAll(onJobFinish(node));
+ nodesToCleanUp.add(node);
+ break;
case PENDING:
jobExecutionPlan.setExecutionStatus(PENDING);
break;
@@ -509,7 +528,7 @@ public class DagManager extends AbstractIdleService {
break;
}
- if (jobStatus.isShouldRetry()) {
+ if (jobStatus != null && jobStatus.isShouldRetry()) {
log.info("Retrying job: {}, current attempts: {}, max attempts: {}", DagManagerUtils.getFullyQualifiedJobName(node),
jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
submitJob(node);
@@ -519,17 +538,62 @@ public class DagManager extends AbstractIdleService {
for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: nextSubmitted.entrySet()) {
String dagId = entry.getKey();
Set<DagNode<JobExecutionPlan>> dagNodes = entry.getValue();
- for (DagNode dagNode: dagNodes) {
+ for (DagNode<JobExecutionPlan> dagNode: dagNodes) {
addJobState(dagId, dagNode);
}
}
for (DagNode<JobExecutionPlan> dagNode: nodesToCleanUp) {
- String dagId = DagManagerUtils.generateDagId(this.jobToDag.get(dagNode));
+ String dagId = DagManagerUtils.generateDagId(dagNode);
deleteJobState(dagId, dagNode);
}
}
+ private ExecutionStatus getJobExecutionStatus(boolean slaKilled, JobStatus jobStatus) {
+ if (slaKilled) {
+ return CANCELLED;
+ } else {
+ if (jobStatus == null) {
+ return PENDING;
+ } else {
+ return valueOf(jobStatus.getEventName());
+ }
+ }
+ }
+
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will try to cancel the job when SLA is reached.
+ *
+ * @param node dag node of the job
+ * @return true if the job is killed because it reached sla
+ * @throws ExecutionException exception
+ * @throws InterruptedException exception
+ */
+ private boolean slaKillIfNeeded(DagNode<JobExecutionPlan> node) throws ExecutionException, InterruptedException {
+ long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+ long currentTime = System.currentTimeMillis();
+ String dagId = DagManagerUtils.generateDagId(node);
+
+ long flowSla;
+ if (dagToSLA.containsKey(dagId)) {
+ flowSla = dagToSLA.get(dagId);
+ } else {
+ flowSla = DagManagerUtils.getFlowSLA(node);
+ dagToSLA.put(dagId, flowSla);
+ }
+
+ if (flowSla != DagManagerUtils.NO_SLA && currentTime > flowStartTime + flowSla) {
+ log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
+ node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+ node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY),
+ flowSla);
+ cancelDagNode(node);
+ return true;
+ }
+ return false;
+ }
+
/**
* Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
*/
@@ -541,8 +605,11 @@ public class DagManager extends AbstractIdleService {
String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+ long pollStartTime = System.nanoTime();
Iterator<JobStatus> jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
+ Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);
+
if (jobStatusIterator.hasNext()) {
return jobStatusIterator.next();
} else {
@@ -620,6 +687,9 @@ public class DagManager extends AbstractIdleService {
/**
* Method that defines the actions to be performed when a job finishes either successfully or with failure.
* This method updates the state of the dag and performs clean up actions as necessary.
+ * TODO : Dag should have a status field, like JobExecutionPlan has. This method should update that field,
+ * which should be used by cleanup(). It may also remove the need of failedDagIdsFinishRunning,
+ * failedDagIdsFinishAllPossible.
*/
private Map<String, Set<DagNode<JobExecutionPlan>>> onJobFinish(DagNode<JobExecutionPlan> dagNode)
throws IOException {
@@ -633,21 +703,28 @@ public class DagManager extends AbstractIdleService {
getRunningJobsCounter(dagNode).dec();
}
- if (jobStatus == COMPLETE) {
- return submitNext(dagId);
- } else if (jobStatus == FAILED) {
- if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
- this.failedDagIdsFinishRunning.add(dagId);
- } else {
- this.failedDagIdsFinishAllPossible.add(dagId);
- }
+ switch (jobStatus) {
+ // TODO : For now treat canceled as failed, till we introduce failure option - CANCEL
+ case CANCELLED:
+ case FAILED:
+ if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
+ this.failedDagIdsFinishRunning.add(dagId);
+ } else {
+ this.failedDagIdsFinishAllPossible.add(dagId);
+ }
+ return Maps.newHashMap();
+ case COMPLETE:
+ return submitNext(dagId);
+ default:
+ log.warn("It should not reach here. Job status is unexpected.");
+ return Maps.newHashMap();
}
- return Maps.newHashMap();
}
private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
this.jobToDag.remove(dagNode);
this.dagToJobs.get(dagId).remove(dagNode);
+ this.dagToSLA.remove(dagId);
}
private void addJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
@@ -688,18 +765,24 @@ public class DagManager extends AbstractIdleService {
deleteJobState(dagId, dagNode);
}
log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
+ // send an event before cleaning up dag
+ JobExecutionPlan jobExecutionPlan = this.dags.get(dagId).getNodes().get(0).getValue();
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, jobExecutionPlan, TimingEvent.FlowTimings.FLOW_FAILED);
dagIdstoClean.add(dagId);
}
//Clean up completed dags
for (String dagId : this.dags.keySet()) {
if (!hasRunningJobs(dagId) && !this.failedDagIdsFinishRunning.contains(dagId)) {
- String status = "COMPLETE";
+ String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
- status = "FAILED";
+ status = TimingEvent.FlowTimings.FLOW_FAILED;
this.failedDagIdsFinishAllPossible.remove(dagId);
}
log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
+ // send an event before cleaning up dag
+ JobExecutionPlan jobExecutionPlan = this.dags.get(dagId).getNodes().get(0).getValue();
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, jobExecutionPlan, status);
dagIdstoClean.add(dagId);
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 46fc59a..5ab74e8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -19,14 +19,19 @@ package org.apache.gobblin.service.modules.orchestration;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
@@ -39,6 +44,7 @@ import org.apache.gobblin.util.ConfigUtils;
public class DagManagerUtils {
+ static long NO_SLA = -1L;
static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
@@ -48,8 +54,15 @@ public class DagManagerUtils {
}
static long getFlowExecId(Dag<JobExecutionPlan> dag) {
- Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
- return jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ return getFlowExecId(dag.getStartNodes().get(0));
+ }
+
+ static long getFlowExecId(DagNode<JobExecutionPlan> dagNode) {
+ return getFlowExecId(dagNode.getValue().getJobSpec());
+ }
+
+ static long getFlowExecId(JobSpec jobSpec) {
+ return jobSpec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
}
/**
@@ -188,7 +201,8 @@ public class DagManagerUtils {
return null;
}
DagNode<JobExecutionPlan> dagNode = dag.getStartNodes().get(0);
- String failureOption = ConfigUtils.getString(getJobConfig(dagNode), ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
+ String failureOption = ConfigUtils.getString(getJobConfig(dagNode),
+ ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
return FailureOption.valueOf(failureOption);
}
@@ -198,4 +212,44 @@ public class DagManagerUtils {
static void incrementJobAttempt(DagNode<JobExecutionPlan> dagNode) {
dagNode.getValue().setCurrentAttempts(dagNode.getValue().getCurrentAttempts() + 1);
}
+
+ /**
+ * flow start time is assumed to be same the flow execution id which is timestamp flow request was received
+ * @param dagNode dag node in context
+ * @return flow execution id
+ */
+ static long getFlowStartTime(DagNode<JobExecutionPlan> dagNode) {
+ return getFlowExecId(dagNode);
+ }
+
+ /**
+ * get the sla from the dag node config.
+ * if time unit is not provided, it assumes time unit is minute.
+ * @param dagNode dag node for which sla is to be retrieved
+ * @return sla if it is provided, {@value NO_SLA} otherwise
+ */
+ static long getFlowSLA(DagNode<JobExecutionPlan> dagNode) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
+ jobConfig, ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT));
+
+ return jobConfig.hasPath(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME)
+ ? slaTimeUnit.toMillis(jobConfig.getLong(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME))
+ : NO_SLA;
+ }
+
+ static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
+ return getDagQueueId(DagManagerUtils.getFlowExecId(dag), numThreads);
+ }
+
+ static int getDagQueueId(long flowExecutionId, int numThreads) {
+ return (int) (flowExecutionId % numThreads);
+ }
+
+ static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter, JobExecutionPlan jobExecutionPlan, String flowEvent) {
+ if (eventSubmitter.isPresent()) {
+ Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+ eventSubmitter.get().getTimingEvent(flowEvent).stop(jobMetadata);
+ }
+ }
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 1300458..60ac979 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -22,16 +22,21 @@ import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Predicate;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
import javax.annotation.Nullable;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
@@ -50,7 +55,9 @@ import static org.mockito.Mockito.when;
public class DagManagerFlowTest {
- DagManager dagManager;
+ MockedDagManager dagManager;
+ int dagNumThreads;
+ static final String ERROR_MESSAGE = "Waiting for the map to update";
@BeforeClass
public void setUp() {
@@ -58,6 +65,7 @@ public class DagManagerFlowTest {
props.put(DagManager.JOB_STATUS_POLLING_INTERVAL_KEY, 1);
dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), false);
dagManager.setActive(true);
+ this.dagNumThreads = dagManager.getNumThreads();
}
@Test
@@ -66,6 +74,21 @@ public class DagManagerFlowTest {
Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", 123456781L, "FINISH_RUNNING", 1);
Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", 123456782L, "FINISH_RUNNING", 1);
+ String dagId1 = DagManagerUtils.generateDagId(dag1);
+ String dagId2 = DagManagerUtils.generateDagId(dag2);
+ String dagId3 = DagManagerUtils.generateDagId(dag3);
+
+ int queue1 = DagManagerUtils.getDagQueueId(dag1, dagNumThreads);
+ int queue2 = DagManagerUtils.getDagQueueId(dag2, dagNumThreads);
+ int queue3 = DagManagerUtils.getDagQueueId(dag3, dagNumThreads);
+
+ when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow0"), eq("group0"), anyInt()))
+ .thenReturn(Collections.singletonList(123456780L));
+ when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow1"), eq("group1"), anyInt()))
+ .thenReturn(Collections.singletonList(123456781L));
+ when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow2"), eq("group2"), anyInt()))
+ .thenReturn(Collections.singletonList(123456782L));
+
// mock add spec
dagManager.addDag(dag1);
dagManager.addDag(dag2);
@@ -73,11 +96,11 @@ public class DagManagerFlowTest {
// check existence of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
- assertTrue(input -> dagManager.dagManagerThreads[0].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag1)), "Waiting for the map to update");
+ assertTrue(input -> dagManager.dagManagerThreads[queue1].dagToJobs.containsKey(dagId1), ERROR_MESSAGE);
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
- assertTrue(input -> dagManager.dagManagerThreads[1].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag2)), "Waiting for the map to update");
+ assertTrue(input -> dagManager.dagManagerThreads[queue2].dagToJobs.containsKey(dagId2), ERROR_MESSAGE);
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
- assertTrue(input -> dagManager.dagManagerThreads[2].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag3)), "Waiting for the map to update");
+ assertTrue(input -> dagManager.dagManagerThreads[queue3].dagToJobs.containsKey(dagId3), ERROR_MESSAGE);
// mock delete spec
dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group0").setFlowName("flow0")));
@@ -85,41 +108,142 @@ public class DagManagerFlowTest {
dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group2").setFlowName("flow2")));
// verify deleteSpec() of specProducer is called once
- AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag1), "Waiting for the map to update");
- AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag2), "Waiting for the map to update");
- AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag3), "Waiting for the map to update");
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag1), ERROR_MESSAGE);
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag2), ERROR_MESSAGE);
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag3), ERROR_MESSAGE);
// mock flow cancellation tracking event
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", 123456780L, "group0", "job0", String.valueOf(
- ExecutionStatus.CANCELLED)))
- .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0", "group0", 123456780L, "job0", "group0");
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", 123456780L,
+ "group0", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
+ .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0", "group0",
+ 123456780L, "job0", "group0");
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1", 123456781L, "group1", "job0", String.valueOf(
- ExecutionStatus.CANCELLED)))
- .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1", "group1", 123456781L, "job0", "group1");
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1", 123456781L,
+ "group1", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
+ .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1", "group1",
+ 123456781L, "job0", "group1");
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2", 123456782L, "group2", "job0", String.valueOf(
- ExecutionStatus.CANCELLED)))
- .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2", "group2", 123456782L, "job0", "group2");
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2", 123456782L,
+ "group2", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
+ .when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2", "group2",
+ 123456782L, "job0", "group2");
// check removal of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
- assertTrue(input -> !dagManager.dagManagerThreads[0].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag1)), "Waiting for the map to update");
+ assertTrue(input -> !dagManager.dagManagerThreads[queue1].dagToJobs.containsKey(dagId1), ERROR_MESSAGE);
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
- assertTrue(input -> !dagManager.dagManagerThreads[1].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag2)), "Waiting for the map to update");
+ assertTrue(input -> !dagManager.dagManagerThreads[queue2].dagToJobs.containsKey(dagId2), ERROR_MESSAGE);
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
- assertTrue(input -> !dagManager.dagManagerThreads[2].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag3)), "Waiting for the map to update");
+ assertTrue(input -> !dagManager.dagManagerThreads[queue3].dagToJobs.containsKey(dagId3), ERROR_MESSAGE);
+ }
+
+ @Test
+ void testFlowSlaWithoutConfig() throws Exception {
+ long flowExecutionId = System.currentTimeMillis();
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("3", flowExecutionId, "FINISH_RUNNING", 1);
+ String dagId = DagManagerUtils.generateDagId(dag);
+ int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
+
+ when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow3"), eq("group3"), anyInt()))
+ .thenReturn(Collections.singletonList(flowExecutionId));
+
+ // mock add spec
+ dagManager.addDag(dag);
+
+ // check existence of dag in dagToJobs map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
+
+ // check existence of dag in dagToSLA map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input -> dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
+
+ // check the SLA value
+ Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), DagManagerUtils.NO_SLA);
+
+ // verify deleteSpec() of the specProducer is not called once
+ // which means job cancellation was triggered
+ try {
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+ } catch (TimeoutException e) {
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
+ return;
+ }
+
+ Assert.fail("Job cancellation was not triggered.");
+ }
+
+ @Test()
+ void testFlowSlaWithConfig() throws Exception {
+ long flowExecutionId = System.currentTimeMillis();
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("4", flowExecutionId, "FINISH_RUNNING", 1);
+ String dagId = DagManagerUtils.generateDagId(dag);
+ int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
+
+ when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow4"), eq("group4"), anyInt()))
+ .thenReturn(Collections.singletonList(flowExecutionId));
+
+ // change config to set a small sla
+ Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ jobConfig = jobConfig
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef("7"))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+ dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+
+ // mock add spec
+ dagManager.addDag(dag);
+
+ // check existence of dag in dagToSLA map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input -> dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
+
+ // check the SLA value
+ Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), TimeUnit.SECONDS.toMillis(7L));
+
+
+ // check existence of dag in dagToJobs map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
+
+ // verify deleteSpec() of specProducer is called once
+ // which means job cancellation was triggered
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+
+ // check removal of dag from dagToSLA map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input -> !dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
+
+ }
+
+ @Test
+ void slaConfigCheck() throws Exception {
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("5", 123456783L, "FINISH_RUNNING", 1);
+ Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), -1L);
+
+ Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ jobConfig = jobConfig
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef("7"))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+ dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+ Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), TimeUnit.SECONDS.toMillis(7L));
+
+ jobConfig = jobConfig
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef("8"))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.name()));
+ dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+ Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)), TimeUnit.MINUTES.toMillis(8L));
}
}
-class DeletePredicate implements Predicate {
+class DeletePredicate implements Predicate<Void> {
private final Dag<JobExecutionPlan> dag;
public DeletePredicate(Dag<JobExecutionPlan> dag) {
this.dag = dag;
}
@Override
- public boolean apply(@Nullable Object input) {
+ public boolean apply(@Nullable Void input) {
try {
verify(dag.getNodes().get(0).getValue().getSpecExecutor().getProducer().get()).deleteSpec(any(), any());
} catch (Throwable e) {
@@ -138,10 +262,8 @@ class MockedDagManager extends DagManager {
@Override
JobStatusRetriever createJobStatusRetriever(Config config) {
JobStatusRetriever mockedJbStatusRetriever = Mockito.mock(JobStatusRetriever.class);
- Mockito.doReturn(Collections.emptyIterator()).when(mockedJbStatusRetriever).getJobStatusesForFlowExecution(anyString(), anyString(), anyLong(), anyString(), anyString());
- when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow0"), eq("group0"), anyInt())).thenReturn(Collections.singletonList(123456780L));
- when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow1"), eq("group1"), anyInt())).thenReturn(Collections.singletonList(123456781L));
- when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow2"), eq("group2"), anyInt())).thenReturn(Collections.singletonList(123456782L));
+ Mockito.doReturn(Collections.emptyIterator()).when(mockedJbStatusRetriever).
+ getJobStatusesForFlowExecution(anyString(), anyString(), anyLong(), anyString(), anyString());
return mockedJbStatusRetriever;
}