You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/25 22:45:14 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-978] Use job
start time instead of flow start time to kill jobs stuck in ORCHESTRATED
state[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 10b7333 [GOBBLIN-978] Use job start time instead of flow start time to kill jobs stuck in ORCHESTRATED state[]
10b7333 is described below
commit 10b73339187e85c03924143d923699e9bfc9b770
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Nov 25 14:45:06 2019 -0800
[GOBBLIN-978] Use job start time instead of flow start time to kill jobs stuck in ORCHESTRATED state[]
Closes #2825 from sv2000/killOrphanedJob
---
.../gobblin/configuration/ConfigurationKeys.java | 8 +++---
.../apache/gobblin/runtime/api/SpecProducer.java | 4 +++
.../service/modules/orchestration/DagManager.java | 19 +++++++++----
.../modules/orchestration/DagManagerUtils.java | 32 ++++++++++++++++------
.../monitoring/KafkaAvroJobStatusMonitor.java | 2 +-
.../modules/orchestration/DagManagerFlowTest.java | 4 +--
.../modules/orchestration/DagManagerTest.java | 2 +-
7 files changed, 49 insertions(+), 22 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index bc810a6..79d1280 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -916,10 +916,10 @@ public class ConfigurationKeys {
public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = "gobblin.flow.sla.timeunit";
public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
- public static final String GOBBLIN_FLOW_START_SLA_TIME = "gobblin.flow.start.sla.time";
- public static final String GOBBLIN_FLOW_START_SLA_TIME_UNIT = "gobblin.flow.start.sla.timeunit";
- public static final long DEFAULT_GOBBLIN_FLOW_START_SLA = 10L;
- public static final String DEFAULT_GOBBLIN_FLOW_START_SLA_TIME_UNIT = "MINUTES";
+ public static final String GOBBLIN_JOB_START_SLA_TIME = "gobblin.job.start.sla.time";
+ public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = "gobblin.job.start.sla.timeunit";
+ public static final long DEFAULT_GOBBLIN_JOB_START_SLA_TIME = 10L;
+ public static final String DEFAULT_GOBBLIN_JOB_START_SLA_TIME_UNIT = "MINUTES";
public static final String DATASET_SUBPATHS_KEY = "gobblin.flow.dataset.subPaths";
public static final String DATASET_BASE_INPUT_PATH_KEY = "gobblin.flow.dataset.baseInputPath";
public static final String DATASET_BASE_OUTPUT_PATH_KEY = "gobblin.flow.dataset.baseOutputPath";
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
index 790e1f4..558827f 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
@@ -50,6 +50,10 @@ public interface SpecProducer<V> {
/** List all {@link Spec} being executed on {@link SpecExecutor}. */
Future<? extends List<V>> listSpecs();
+ default String getExecutionLink(Future<?> future, String specExecutorUri) {
+ return "";
+ }
+
default String serializeAddSpecResponse(Future<?> response) {
return "";
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a5bcd4a..048365e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -554,7 +554,8 @@ public class DagManager extends AbstractIdleService {
* {@link ExecutionStatus} for some specific amount of time.
* @param node {@link DagNode} representing the job
* @param jobStatus current {@link JobStatus} of the job
- * @return true if the job status remains ORCHESTRATED for some specific time
+ * @return true if the total time that the job remains in the ORCHESTRATED state exceeds
+ * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
*/
private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, JobStatus jobStatus)
throws ExecutionException, InterruptedException {
@@ -562,10 +563,14 @@ public class DagManager extends AbstractIdleService {
return false;
}
ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
- long timeoutForFlowStart = DagManagerUtils.getFlowStartSLA(node);
- long flowStartTime = jobStatus.getFlowExecutionId();
-
- if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - flowStartTime > timeoutForFlowStart) {
+ long timeOutForJobStart = DagManagerUtils.getJobStartSla(node);
+ long jobStartTime = jobStatus.getStartTime();
+
+ if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobStartTime > timeOutForJobStart) {
+ log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing the job now...",
+ DagManagerUtils.getJobName(node),
+ DagManagerUtils.getFullyQualifiedDagName(node),
+ timeOutForJobStart);
cancelDagNode(node);
return true;
} else {
@@ -695,6 +700,10 @@ public class DagManager extends AbstractIdleService {
getRunningJobsCounter(dagNode).inc();
}
+ addSpecFuture.get();
+
+ jobMetadata.put(TimingEvent.METADATA_MESSAGE, producer.getExecutionLink(addSpecFuture, specExecutorUri));
+
if (jobOrchestrationTimer != null) {
jobOrchestrationTimer.stop(jobMetadata);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 08c6c8f..98e6fa4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -47,7 +46,11 @@ public class DagManagerUtils {
static long NO_SLA = -1L;
static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
- Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ return getFlowId(dag.getStartNodes().get(0));
+ }
+
+ static FlowId getFlowId(DagNode<JobExecutionPlan> dagNode) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
return new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -113,6 +116,17 @@ public class DagManagerUtils {
return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " + flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
}
+ /**
+ * Returns a fully-qualified {@link Dag} name that includes: (flowGroup, flowName, flowExecutionId).
+ * @param dagNode
+ * @return fully qualified name of the underlying {@link Dag}.
+ */
+ static String getFullyQualifiedDagName(DagNode<JobExecutionPlan> dagNode) {
+ FlowId flowid = getFlowId(dagNode);
+ long flowExecutionId = getFlowExecId(dagNode);
+ return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " + flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
+ }
+
static String getJobName(DagNode<JobExecutionPlan> dagNode) {
return dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
}
@@ -239,19 +253,19 @@ public class DagManagerUtils {
}
/**
- * get the flow start sla from the dag node config.
+ * get the job start sla from the dag node config.
* if time unit is not provided, it assumes time unit is minute.
* @param dagNode dag node for which flow start sla is to be retrieved
- * @return flow start sla in ms
+ * @return job start sla in ms
*/
- static long getFlowStartSLA(DagNode<JobExecutionPlan> dagNode) {
+ static long getJobStartSla(DagNode<JobExecutionPlan> dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
- jobConfig, ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME_UNIT, ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_START_SLA_TIME_UNIT));
+ jobConfig, ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, ConfigurationKeys.DEFAULT_GOBBLIN_JOB_START_SLA_TIME_UNIT));
- return slaTimeUnit.toMillis(jobConfig.hasPath(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME)
- ? jobConfig.getLong(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME)
- : ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_START_SLA);
+ return slaTimeUnit.toMillis(jobConfig.hasPath(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
+ ? jobConfig.getLong(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
+ : ConfigurationKeys.DEFAULT_GOBBLIN_JOB_START_SLA_TIME);
}
static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 9cc3674..c670d65 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -131,11 +131,11 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
break;
case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
+ properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
case TimingEvent.LauncherTimings.JOB_PREPARE:
case TimingEvent.LauncherTimings.JOB_START:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
- properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index ae9877f..edc881f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -226,8 +226,8 @@ public class DagManagerFlowTest {
// change config to set a small sla
Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
jobConfig = jobConfig
- .withValue(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME, ConfigValueFactory.fromAnyRef("7"))
- .withValue(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, ConfigValueFactory.fromAnyRef("7"))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
// mock add spec
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c250a49..04f48fb 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -139,7 +139,7 @@ public class DagManagerTest {
private static Iterator<JobStatus> getMockJobStatus(String flowName, String flowGroup, Long flowExecutionId, String jobGroup, String jobName, String eventName, boolean shouldRetry) {
return Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId).
- message("Test message").eventName(eventName).startTime(5000L).shouldRetry(shouldRetry).build());
+ message("Test message").eventName(eventName).startTime(flowExecutionId + 10).shouldRetry(shouldRetry).build());
}
@Test