You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ku...@apache.org on 2020/03/19 03:58:24 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1090] send
compiled_skip metrics
This is an automated email from the ASF dual-hosted git repository.
kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 65e2a64 [GOBBLIN-1090] send compiled_skip metrics
65e2a64 is described below
commit 65e2a64d2e40f46203837f4785c2426eae29a12e
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed Mar 18 20:58:13 2020 -0700
[GOBBLIN-1090] send compiled_skip metrics
Closes #2931 from arjun4084346/compiledMetrics
---
.../service/modules/orchestration/Orchestrator.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7a1b611..b7b6593 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -225,10 +225,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
long startTime = System.nanoTime();
if (spec instanceof FlowSpec) {
- TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
- ? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
- : null;
-
Config flowConfig = ((FlowSpec) spec).getConfig();
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
@@ -247,12 +243,14 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
_log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup, flowName);
- flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
+ flowGauges.get(spec.getUri().toString()).setState(CompiledState.SKIPPED);
return;
- } else {
- flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
}
+ TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
+ ? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
+ : null;
+
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
@@ -265,11 +263,14 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get()
.getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) : null;
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
_log.warn("Cannot determine an executor to run on for Spec: " + spec);
if (flowCompileFailedTimer != null) {
flowCompileFailedTimer.stop(flowMetadata);
}
return;
+ } else {
+ flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
}
//If it is a scheduled flow (and hence, does not have flowExecutionId in the FlowSpec) and the flow compilation is successful,
@@ -281,7 +282,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
flowCompilationTimer.stop(flowMetadata);
}
-
if (this.dagManager.isPresent()) {
//Send the dag to the DagManager.
this.dagManager.get().addDag(jobExecutionPlanDag, true);
@@ -406,7 +406,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private enum CompiledState {
FAILED(-1),
UNKNOWN(0),
- SUCCESSFUL(1);
+ SUCCESSFUL(1),
+ SKIPPED(2);
public int value;