You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/25 22:45:14 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-978] Use job start time instead of flow start time to kill jobs stuck in ORCHESTRATED state[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 10b7333  [GOBBLIN-978] Use job start time instead of flow start time to kill jobs stuck in ORCHESTRATED state[]
10b7333 is described below

commit 10b73339187e85c03924143d923699e9bfc9b770
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Nov 25 14:45:06 2019 -0800

    [GOBBLIN-978] Use job start time instead of flow start time to kill jobs stuck in ORCHESTRATED state[]
    
    Closes #2825 from sv2000/killOrphanedJob
---
 .../gobblin/configuration/ConfigurationKeys.java   |  8 +++---
 .../apache/gobblin/runtime/api/SpecProducer.java   |  4 +++
 .../service/modules/orchestration/DagManager.java  | 19 +++++++++----
 .../modules/orchestration/DagManagerUtils.java     | 32 ++++++++++++++++------
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  2 +-
 .../modules/orchestration/DagManagerFlowTest.java  |  4 +--
 .../modules/orchestration/DagManagerTest.java      |  2 +-
 7 files changed, 49 insertions(+), 22 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index bc810a6..79d1280 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -916,10 +916,10 @@ public class ConfigurationKeys {
   public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
   public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = "gobblin.flow.sla.timeunit";
   public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
-  public static final String GOBBLIN_FLOW_START_SLA_TIME = "gobblin.flow.start.sla.time";
-  public static final String GOBBLIN_FLOW_START_SLA_TIME_UNIT = "gobblin.flow.start.sla.timeunit";
-  public static final long DEFAULT_GOBBLIN_FLOW_START_SLA = 10L;
-  public static final String DEFAULT_GOBBLIN_FLOW_START_SLA_TIME_UNIT = "MINUTES";
+  public static final String GOBBLIN_JOB_START_SLA_TIME = "gobblin.job.start.sla.time";
+  public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = "gobblin.job.start.sla.timeunit";
+  public static final long DEFAULT_GOBBLIN_JOB_START_SLA_TIME = 10L;
+  public static final String DEFAULT_GOBBLIN_JOB_START_SLA_TIME_UNIT = "MINUTES";
   public static final String DATASET_SUBPATHS_KEY = "gobblin.flow.dataset.subPaths";
   public static final String DATASET_BASE_INPUT_PATH_KEY = "gobblin.flow.dataset.baseInputPath";
   public static final String DATASET_BASE_OUTPUT_PATH_KEY = "gobblin.flow.dataset.baseOutputPath";
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
index 790e1f4..558827f 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
@@ -50,6 +50,10 @@ public interface SpecProducer<V> {
   /** List all {@link Spec} being executed on {@link SpecExecutor}. */
   Future<? extends List<V>> listSpecs();
 
+  default String getExecutionLink(Future<?> future, String specExecutorUri) {
+    return "";
+  }
+
   default String serializeAddSpecResponse(Future<?> response) {
     return "";
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a5bcd4a..048365e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -554,7 +554,8 @@ public class DagManager extends AbstractIdleService {
      * {@link ExecutionStatus} for some specific amount of time.
      * @param node {@link DagNode} representing the job
      * @param jobStatus current {@link JobStatus} of the job
-     * @return true if the job status remains ORCHESTRATED for some specific time
+     * @return true if the total time that the job remains in the ORCHESTRATED state exceeds
+     * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
      */
     private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, JobStatus jobStatus)
         throws ExecutionException, InterruptedException {
@@ -562,10 +563,14 @@ public class DagManager extends AbstractIdleService {
         return false;
       }
       ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
-      long timeoutForFlowStart = DagManagerUtils.getFlowStartSLA(node);
-      long flowStartTime = jobStatus.getFlowExecutionId();
-
-      if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - flowStartTime > timeoutForFlowStart) {
+      long timeOutForJobStart = DagManagerUtils.getJobStartSla(node);
+      long jobStartTime = jobStatus.getStartTime();
+
+      if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobStartTime > timeOutForJobStart) {
+        log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing the job now...",
+            DagManagerUtils.getJobName(node),
+            DagManagerUtils.getFullyQualifiedDagName(node),
+            timeOutForJobStart);
         cancelDagNode(node);
         return true;
       } else {
@@ -695,6 +700,10 @@ public class DagManager extends AbstractIdleService {
           getRunningJobsCounter(dagNode).inc();
         }
 
+        addSpecFuture.get();
+
+        jobMetadata.put(TimingEvent.METADATA_MESSAGE, producer.getExecutionLink(addSpecFuture, specExecutorUri));
+
         if (jobOrchestrationTimer != null) {
           jobOrchestrationTimer.stop(jobMetadata);
         }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 08c6c8f..98e6fa4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -47,7 +46,11 @@ public class DagManagerUtils {
   static long NO_SLA = -1L;
 
   static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
-    Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+    return getFlowId(dag.getStartNodes().get(0));
+  }
+
+  static FlowId getFlowId(DagNode<JobExecutionPlan> dagNode) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
     String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
     String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
     return new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -113,6 +116,17 @@ public class DagManagerUtils {
     return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " + flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
   }
 
+  /**
+   * Returns a fully-qualified {@link Dag} name that includes: (flowGroup, flowName, flowExecutionId).
+   * @param dagNode
+   * @return fully qualified name of the underlying {@link Dag}.
+   */
+  static String getFullyQualifiedDagName(DagNode<JobExecutionPlan> dagNode) {
+    FlowId flowid = getFlowId(dagNode);
+    long flowExecutionId = getFlowExecId(dagNode);
+    return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " + flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
+  }
+
   static String getJobName(DagNode<JobExecutionPlan> dagNode) {
     return dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
   }
@@ -239,19 +253,19 @@ public class DagManagerUtils {
   }
 
   /**
-   * get the flow start sla from the dag node config.
+   * get the job start sla from the dag node config.
    * if time unit is not provided, it assumes time unit is minute.
    * @param dagNode dag node for which flow start sla is to be retrieved
-   * @return flow start sla in ms
+   * @return job start sla in ms
    */
-  static long getFlowStartSLA(DagNode<JobExecutionPlan> dagNode) {
+  static long getJobStartSla(DagNode<JobExecutionPlan> dagNode) {
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
     TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
-        jobConfig, ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME_UNIT, ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_START_SLA_TIME_UNIT));
+        jobConfig, ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, ConfigurationKeys.DEFAULT_GOBBLIN_JOB_START_SLA_TIME_UNIT));
 
-    return slaTimeUnit.toMillis(jobConfig.hasPath(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME)
-        ? jobConfig.getLong(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME)
-        : ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_START_SLA);
+    return slaTimeUnit.toMillis(jobConfig.hasPath(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
+        ? jobConfig.getLong(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
+        : ConfigurationKeys.DEFAULT_GOBBLIN_JOB_START_SLA_TIME);
   }
 
   static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 9cc3674..c670d65 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -131,11 +131,11 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
         break;
       case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
+        properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
       case TimingEvent.LauncherTimings.JOB_PREPARE:
       case TimingEvent.LauncherTimings.JOB_START:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
-        properties.put(TimingEvent.JOB_START_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
       case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
       case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index ae9877f..edc881f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -226,8 +226,8 @@ public class DagManagerFlowTest {
     // change config to set a small sla
     Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
     jobConfig = jobConfig
-        .withValue(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME, ConfigValueFactory.fromAnyRef("7"))
-        .withValue(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+        .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, ConfigValueFactory.fromAnyRef("7"))
+        .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
     dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
 
     // mock add spec
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c250a49..04f48fb 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -139,7 +139,7 @@ public class DagManagerTest {
 
   private static Iterator<JobStatus> getMockJobStatus(String flowName, String flowGroup,  Long flowExecutionId, String jobGroup, String jobName, String eventName, boolean shouldRetry) {
     return Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId).
-        message("Test message").eventName(eventName).startTime(5000L).shouldRetry(shouldRetry).build());
+        message("Test message").eventName(eventName).startTime(flowExecutionId + 10).shouldRetry(shouldRetry).build());
   }
 
   @Test