You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/06/12 16:52:12 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-802] change
gauge metrics context to RootMetricsContext
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6bed94a [GOBBLIN-802] change gauge metrics context to RootMetricsContext
6bed94a is described below
commit 6bed94a0f213ef64b225bb05b80ab338c25bbc17
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed Jun 12 09:52:05 2019 -0700
[GOBBLIN-802] change gauge metrics context to RootMetricsContext
Closes #2668 from arjun4084346/metric-fix
---
.../org/apache/gobblin/metrics/MetricContext.java | 1 +
.../modules/orchestration/Orchestrator.java | 43 +++++++++++++++++-----
2 files changed, 35 insertions(+), 9 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index c712c4d..7a851e4 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -508,6 +508,7 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
/**
* Create a new {@link ContextAwareGauge} wrapping a given {@link com.codahale.metrics.Gauge}.
+ * Unlike other metrics, gauges are supposed to be registered by the caller.
*
* @param name name of the {@link ContextAwareGauge}
* @param gauge the {@link com.codahale.metrics.Gauge} to be wrapped by the {@link ContextAwareGauge}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index c371b51..d8d5480 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -34,6 +34,7 @@ import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import javax.annotation.Nonnull;
@@ -45,7 +46,9 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -98,6 +101,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private final ClassAliasResolver<SpecCompiler> aliasResolver;
+ private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+
public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
boolean instrumentationEnabled) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -223,25 +228,28 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
: null;
- //If the FlowSpec disallows concurrent executions, then check if another instance of the flow is already
- //running. If so, return immediately.
Config flowConfig = ((FlowSpec) spec).getConfig();
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+ if (!flowGauges.containsKey(spec.getUri().toString())) {
+ String flowCompiledGaugeName = MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED);
+ flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
+ ContextAwareGauge<Integer> gauge = RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () -> flowGauges.get(spec.getUri().toString()).state.value);
+ RootMetricContext.get().register(flowCompiledGaugeName, gauge);
+ }
+
+ //If the FlowSpec disallows concurrent executions, then check if another instance of the flow is already
+ //running. If so, return immediately.
boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, true);
if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
_log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup, flowName);
- // We send a gauge with value 0 signifying that the flow could not be compiled because previous execution is already running
- metricContext.newContextAwareGauge(
- MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED),
- () -> 0L);
+ flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
return;
} else {
- metricContext.newContextAwareGauge(
- MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED),
- () -> 1L);
+ flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
}
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
@@ -404,4 +412,21 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
public void switchMetricContext(MetricContext context) {
throw new UnsupportedOperationException();
}
+
+ @Setter
+ private static class FlowCompiledState {
+ private CompiledState state = CompiledState.UNKNOWN;
+ }
+
+ private enum CompiledState {
+ FAILED(-1),
+ UNKNOWN(0),
+ SUCCESSFUL(1);
+
+ public int value;
+
+ CompiledState(int value) {
+ this.value = value;
+ }
+ }
}
\ No newline at end of file