You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/07/10 21:44:58 UTC

[gobblin] branch master updated: [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (#3712)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c2ffac77 [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (#3712)
2c2ffac77 is described below

commit 2c2ffac77fed07bcc6486f9dee8cd60e41650ef7
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Jul 10 17:44:53 2023 -0400

    [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (#3712)
    
    * Add tags to dagmanager metrics for extensibility
    
    * Fix concurrency bug in test
    
    * Add job level metrics in dagmanager test
    
    * Test not cleaning dm threads
    
    * Only cleanup metrics if threads started by the dagmanager
---
 .../org/apache/gobblin/metrics/MetricTagNames.java | 22 ++++++++++++++++++++++
 .../service/modules/orchestration/DagManager.java  |  2 +-
 .../modules/orchestration/DagManagerMetrics.java   | 15 ++++++++++++++-
 .../modules/orchestration/DagManagerTest.java      | 14 +++++++++-----
 4 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java
new file mode 100644
index 000000000..bce085d2b
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+public class MetricTagNames {
+  public static final String METRIC_BACKEND_REPRESENTATION = "metricBackendRepresentation";
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 80da8a9e9..acbf9f71d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -230,7 +230,7 @@ public class DagManager extends AbstractIdleService {
     } else {
       this.eventSubmitter = Optional.absent();
     }
-    this.dagManagerMetrics = new DagManagerMetrics(metricContext);
+    this.dagManagerMetrics = new DagManagerMetrics();
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index e9e605b06..a5f34cff7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -28,16 +28,22 @@ import com.codahale.metrics.MetricRegistry;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.MetricTagNames;
 import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.RequesterService;
@@ -75,6 +81,13 @@ public class DagManagerMetrics {
     this.metricContext = metricContext;
   }
 
+  public DagManagerMetrics() {
+    // Create a new metric context for the DagManagerMetrics tagged appropriately
+    List<Tag<?>> tags = new ArrayList<>();
+    tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, GobblinMetrics.MetricType.COUNTER));
+    this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), this.getClass(), tags);
+  }
+
   public void activate() {
     if (this.metricContext != null) {
       allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -249,7 +262,7 @@ public class DagManagerMetrics {
 
   public void cleanup() {
     // Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager
-    if(this.metricContext != null) {
+    if(this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName())) {
       // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
       // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
       RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 0a572cf26..2babd0683 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -709,6 +709,9 @@ public class DagManagerTest {
 
   @Test (dependsOnMethods = "testResumeCancelledDag")
   public void testJobStartSLAKilledDag() throws URISyntaxException, IOException {
+    String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+    long slaKilledMeterCount = metricContext.getParent().get().getMeters().get(slakilledMeterName) == null? 0 :
+        metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount();
     long flowExecutionId = System.currentTimeMillis();
     String flowGroupId = "0";
     String flowGroup = "group" + flowGroupId;
@@ -780,8 +783,7 @@ public class DagManagerTest {
     Assert.assertEquals(this.dagToJobs.size(), 1);
     Assert.assertTrue(this.dags.containsKey(dagId1));
 
-    String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
-    Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), 1);
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), slaKilledMeterCount + 1);
 
     // Cleanup
     this._dagManagerThread.run();
@@ -1162,10 +1164,13 @@ public class DagManagerTest {
     Config executorOneConfig = ConfigFactory.empty()
         .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
         .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))
-        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10))
+        .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true));
     Config executorTwoConfig = ConfigFactory.empty()
         .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo"))
-        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10))
+        .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true));
+
     List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser", executorOneConfig);
     dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig));
 
@@ -1229,7 +1234,6 @@ public class DagManagerTest {
     String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
     String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1","flow1", ServiceMetricNames.RUNNING_STATUS);
 
-    String slakilledGroupName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
     Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2);
     Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1);
     // Cleanup