You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ku...@apache.org on 2020/03/19 03:58:24 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1090] send compiled_skip metrics

This is an automated email from the ASF dual-hosted git repository.

kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 65e2a64  [GOBBLIN-1090] send compiled_skip metrics
65e2a64 is described below

commit 65e2a64d2e40f46203837f4785c2426eae29a12e
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed Mar 18 20:58:13 2020 -0700

    [GOBBLIN-1090] send compiled_skip metrics
    
    Closes #2931 from arjun4084346/compiledMetrics
---
 .../service/modules/orchestration/Orchestrator.java   | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7a1b611..b7b6593 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -225,10 +225,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
 
     long startTime = System.nanoTime();
     if (spec instanceof FlowSpec) {
-      TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
-          ? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
-          : null;
-
       Config flowConfig = ((FlowSpec) spec).getConfig();
       String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
@@ -247,12 +243,14 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
       if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
         _log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
             + "concurrent executions are disabled for this flow.", flowGroup, flowName);
-        flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
+        flowGauges.get(spec.getUri().toString()).setState(CompiledState.SKIPPED);
         return;
-      } else {
-        flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
       }
 
+      TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
+          ? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
+          : null;
+
       Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
 
       Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
@@ -265,11 +263,14 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
         TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get()
             .getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) : null;
         Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+        flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
         _log.warn("Cannot determine an executor to run on for Spec: " + spec);
         if (flowCompileFailedTimer != null) {
           flowCompileFailedTimer.stop(flowMetadata);
         }
         return;
+      } else {
+        flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
       }
 
       //If it is a scheduled flow (and hence, does not have flowExecutionId in the FlowSpec) and the flow compilation is successful,
@@ -281,7 +282,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
         flowCompilationTimer.stop(flowMetadata);
       }
 
-
       if (this.dagManager.isPresent()) {
         //Send the dag to the DagManager.
         this.dagManager.get().addDag(jobExecutionPlanDag, true);
@@ -406,7 +406,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   private enum CompiledState {
     FAILED(-1),
     UNKNOWN(0),
-    SUCCESSFUL(1);
+    SUCCESSFUL(1),
+    SKIPPED(2);
 
     public int value;