You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/08/12 18:14:14 UTC

[gobblin] branch master updated: [GOBBLIN-1509] Announce flow failure on DagManager::addDag error (#3357)

This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f54d16c  [GOBBLIN-1509] Announce flow failure on DagManager::addDag error (#3357)
f54d16c is described below

commit f54d16ca9397f41c588d7e18cd8e8524ab253718
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Thu Aug 12 11:14:10 2021 -0700

    [GOBBLIN-1509] Announce flow failure on DagManager::addDag error (#3357)
    
    Announce flow failure on DagManager::addDag error
    
    Additionally, migrate Orchestrator overall away from deprecated EventSubmitter::getTimingEvent factory method.
    
    Presently, addDag failure leaves the flow marooned in the COMPILED state, as the warranted FLOW_FAILED event is never sent. Particularly insidious is that scheduled flows with their execution stuck in COMPILED miss their next execution, unless flow.allowConcurrentExecutions is set. Thus the scheduled flow is stuck in its entirety, not merely a single execution.
    
    One observed cause of addDag failure is when the DagStateStore is backed by a replicated DB (e.g. MySqlDagStateStore) that just switched leaders. Cached connections in the pool may suddenly point to a read-only follower unable to DagStateStore::writeCheckpoint.
---
 .../modules/orchestration/Orchestrator.java        | 41 +++++++++++++---------
 1 file changed, 25 insertions(+), 16 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 9058fb0..b4ea001 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -245,14 +245,13 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
         flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
             + "executions are disabled. Set flow.allowConcurrentExecution to true in the flow spec to change this behaviour.");
         if (this.eventSubmitter.isPresent()) {
-          this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+          new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
         }
         return;
       }
 
-      TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
-          ? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
-          : null;
+      Optional<TimingEvent> flowCompilationTimer = this.eventSubmitter.transform(submitter ->
+          new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
 
       Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
 
@@ -270,13 +269,13 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
         }
         flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
 
-        TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get()
-            .getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) : null;
+        Optional<TimingEvent> flowCompileFailedTimer = this.eventSubmitter.transform(submitter ->
+            new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
         Instrumented.markMeter(this.flowOrchestrationFailedMeter);
         flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
         _log.warn("Cannot determine an executor to run on for Spec: " + spec);
-        if (flowCompileFailedTimer != null) {
-          flowCompileFailedTimer.stop(flowMetadata);
+        if (flowCompileFailedTimer.isPresent()) {
+          flowCompileFailedTimer.get().stop(flowMetadata);
         }
         return;
       } else {
@@ -288,13 +287,23 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
       flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
           jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
 
-      if (flowCompilationTimer != null) {
-        flowCompilationTimer.stop(flowMetadata);
+      if (flowCompilationTimer.isPresent()) {
+        flowCompilationTimer.get().stop(flowMetadata);
       }
 
       if (this.dagManager.isPresent()) {
-        //Send the dag to the DagManager.
-        this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+        try {
+          //Send the dag to the DagManager.
+          this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+        } catch (Exception ex) {
+          if (this.eventSubmitter.isPresent()) {
+            // pronounce failed before stack unwinds, to ensure flow not marooned in `COMPILED` state; (failure likely attributable to DB connection/failover)
+            String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
+            flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
+            new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+          }
+          throw ex;
+        }
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
         for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
@@ -314,13 +323,13 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
             Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
             _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer));
 
-            TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
-                getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
+            Optional<TimingEvent> jobOrchestrationTimer = this.eventSubmitter.transform(submitter ->
+                new TimingEvent(submitter, TimingEvent.LauncherTimings.JOB_ORCHESTRATED));
 
             producer.addSpec(jobSpec);
 
-            if (jobOrchestrationTimer != null) {
-              jobOrchestrationTimer.stop(jobMetadata);
+            if (jobOrchestrationTimer.isPresent()) {
+              jobOrchestrationTimer.get().stop(jobMetadata);
             }
           } catch (Exception e) {
             _log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer