You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/07/10 21:44:58 UTC
[gobblin] branch master updated: [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (#3712)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2c2ffac77 [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (#3712)
2c2ffac77 is described below
commit 2c2ffac77fed07bcc6486f9dee8cd60e41650ef7
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Jul 10 17:44:53 2023 -0400
[GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (#3712)
* Add tags to dagmanager metrics for extensibility
* Fix concurrency bug in test
* Add job level metrics in dagmanager test
* Test not cleaning dm threads
* Only cleanup metrics if threads started by the dagmanager
---
.../org/apache/gobblin/metrics/MetricTagNames.java | 22 ++++++++++++++++++++++
.../service/modules/orchestration/DagManager.java | 2 +-
.../modules/orchestration/DagManagerMetrics.java | 15 ++++++++++++++-
.../modules/orchestration/DagManagerTest.java | 14 +++++++++-----
4 files changed, 46 insertions(+), 7 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java
new file mode 100644
index 000000000..bce085d2b
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+public class MetricTagNames {
+ public static final String METRIC_BACKEND_REPRESENTATION = "metricBackendRepresentation";
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 80da8a9e9..acbf9f71d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -230,7 +230,7 @@ public class DagManager extends AbstractIdleService {
} else {
this.eventSubmitter = Optional.absent();
}
- this.dagManagerMetrics = new DagManagerMetrics(metricContext);
+ this.dagManagerMetrics = new DagManagerMetrics();
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index e9e605b06..a5f34cff7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -28,16 +28,22 @@ import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.MetricTagNames;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.RequesterService;
@@ -75,6 +81,13 @@ public class DagManagerMetrics {
this.metricContext = metricContext;
}
+ public DagManagerMetrics() {
+ // Create a new metric context for the DagManagerMetrics tagged appropriately
+ List<Tag<?>> tags = new ArrayList<>();
+ tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, GobblinMetrics.MetricType.COUNTER));
+ this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), this.getClass(), tags);
+ }
+
public void activate() {
if (this.metricContext != null) {
allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -249,7 +262,7 @@ public class DagManagerMetrics {
public void cleanup() {
// Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager
- if(this.metricContext != null) {
+ if(this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName())) {
// The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
// To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 0a572cf26..2babd0683 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -709,6 +709,9 @@ public class DagManagerTest {
@Test (dependsOnMethods = "testResumeCancelledDag")
public void testJobStartSLAKilledDag() throws URISyntaxException, IOException {
+ String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+ long slaKilledMeterCount = metricContext.getParent().get().getMeters().get(slakilledMeterName) == null? 0 :
+ metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount();
long flowExecutionId = System.currentTimeMillis();
String flowGroupId = "0";
String flowGroup = "group" + flowGroupId;
@@ -780,8 +783,7 @@ public class DagManagerTest {
Assert.assertEquals(this.dagToJobs.size(), 1);
Assert.assertTrue(this.dags.containsKey(dagId1));
- String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
- Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), 1);
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), slaKilledMeterCount + 1);
// Cleanup
this._dagManagerThread.run();
@@ -1162,10 +1164,13 @@ public class DagManagerTest {
Config executorOneConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
.withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))
- .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10))
+ .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true));
Config executorTwoConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo"))
- .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10))
+ .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true));
+
List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser", executorOneConfig);
dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig));
@@ -1229,7 +1234,6 @@ public class DagManagerTest {
String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1","flow1", ServiceMetricNames.RUNNING_STATUS);
- String slakilledGroupName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1);
// Cleanup