You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/06/12 16:52:12 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-802] change gauge metrics context to RootMetricsContext

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bed94a  [GOBBLIN-802] change gauge metrics context to RootMetricsContext
6bed94a is described below

commit 6bed94a0f213ef64b225bb05b80ab338c25bbc17
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed Jun 12 09:52:05 2019 -0700

    [GOBBLIN-802] change gauge metrics context to RootMetricsContext
    
    Closes #2668 from arjun4084346/metric-fix
---
 .../org/apache/gobblin/metrics/MetricContext.java  |  1 +
 .../modules/orchestration/Orchestrator.java        | 43 +++++++++++++++++-----
 2 files changed, 35 insertions(+), 9 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index c712c4d..7a851e4 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -508,6 +508,7 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
 
   /**
    * Create a new {@link ContextAwareGauge} wrapping a given {@link com.codahale.metrics.Gauge}.
+   * Unlike other metrics, gauges are supposed to be registered by the caller.
    *
    * @param name name of the {@link ContextAwareGauge}
    * @param gauge the {@link com.codahale.metrics.Gauge} to be wrapped by the {@link ContextAwareGauge}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index c371b51..d8d5480 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -34,6 +34,7 @@ import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
 import javax.annotation.Nonnull;
@@ -45,7 +46,9 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -98,6 +101,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
 
   private final ClassAliasResolver<SpecCompiler> aliasResolver;
 
+  private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+
   public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
       boolean instrumentationEnabled) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -223,25 +228,28 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
           ? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
           : null;
 
-      //If the FlowSpec disallows concurrent executions, then check if another instance of the flow is already
-      //running. If so, return immediately.
       Config flowConfig = ((FlowSpec) spec).getConfig();
       String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+      if (!flowGauges.containsKey(spec.getUri().toString())) {
+        String flowCompiledGaugeName = MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED);
+        flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
+        ContextAwareGauge<Integer> gauge = RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () -> flowGauges.get(spec.getUri().toString()).state.value);
+        RootMetricContext.get().register(flowCompiledGaugeName, gauge);
+      }
+
+      //If the FlowSpec disallows concurrent executions, then check if another instance of the flow is already
+      //running. If so, return immediately.
       boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, true);
 
       if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
         _log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
             + "concurrent executions are disabled for this flow.", flowGroup, flowName);
-        // We send a gauge with value 0 signifying that the flow could not be compiled because previous execution is already running
-        metricContext.newContextAwareGauge(
-            MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED),
-            () -> 0L);
+        flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
         return;
       } else {
-        metricContext.newContextAwareGauge(
-            MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED),
-            () -> 1L);
+        flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
       }
 
       Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
@@ -404,4 +412,21 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   public void switchMetricContext(MetricContext context) {
     throw new UnsupportedOperationException();
   }
+
+  @Setter
+  private static class FlowCompiledState {
+    private CompiledState state = CompiledState.UNKNOWN;
+  }
+
+  private enum CompiledState {
+    FAILED(-1),
+    UNKNOWN(0),
+    SUCCESSFUL(1);
+
+    public int value;
+
+    CompiledState(int value) {
+      this.value = value;
+    }
+  }
 }
\ No newline at end of file