You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/11 00:47:05 UTC

[gobblin] branch master updated: [GOBBLIN-1466] Make meters shared between DagManagerThreads

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f74c232  [GOBBLIN-1466] Make meters shared between DagManagerThreads
f74c232 is described below

commit f74c232ad05b3b3ca0fcecd4789657bd81af9e42
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Jun 10 17:46:57 2021 -0700

    [GOBBLIN-1466] Make meters shared between DagManagerThreads
    
    Closes #3306 from jack-moseley/shared-meters
---
 .../service/modules/orchestration/DagManager.java  | 39 ++++++++++++++--------
 .../modules/orchestration/DagManagerTest.java      |  8 ++++-
 2 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 821d87e..5c6dd1b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -364,11 +364,22 @@ public class DagManager extends AbstractIdleService {
         this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
         Set<String> failedDagIds = Collections.synchronizedSet(this.failedDagStateStore.getDagIds());
 
+        ContextAwareMeter allSuccessfulMeter = null;
+        ContextAwareMeter allFailedMeter = null;
+        if (instrumentationEnabled) {
+          MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+          allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+              ServiceMetricNames.SUCCESSFUL_FLOW_METER));
+          allFailedMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+              ServiceMetricNames.FAILED_FLOW_METER));
+        }
+
         //On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
-              queue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds);
+              queue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
+              allSuccessfulMeter, allFailedMeter);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
         }
@@ -420,9 +431,11 @@ public class DagManager extends AbstractIdleService {
     private final int defaultQuota;
     private final Map<String, Integer> perUserQuota;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
-    private static Map<String, FlowState> flowGauges = Maps.newHashMap();
-    private ContextAwareMeter allSuccessfulMeter;
-    private ContextAwareMeter allFailedMeter;
+    private static final Map<String, FlowState> flowGauges = Maps.newConcurrentMap();
+    private final ContextAwareMeter allSuccessfulMeter;
+    private final ContextAwareMeter allFailedMeter;
+    private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
+    private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
 
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
@@ -436,7 +449,8 @@ public class DagManager extends AbstractIdleService {
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
-        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds) {
+        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds,
+        ContextAwareMeter allSuccessfulMeter, ContextAwareMeter allFailedMeter) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
@@ -446,16 +460,14 @@ public class DagManager extends AbstractIdleService {
       this.resumeQueue = resumeQueue;
       this.defaultQuota = defaultQuota;
       this.perUserQuota = perUserQuota;
+      this.allSuccessfulMeter = allSuccessfulMeter;
+      this.allFailedMeter = allFailedMeter;
       if (instrumentationEnabled) {
         this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
         this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
         this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
         ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
             () -> orchestrationDelay.get());
-        this.allSuccessfulMeter = metricContext.contextAwareMeter(
-            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SUCCESSFUL_FLOW_METER));
-        this.allFailedMeter = metricContext.contextAwareMeter(
-            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.FAILED_FLOW_METER));
         this.metricContext.register(orchestrationDelayMetric);
       } else {
         this.metricContext = null;
@@ -1131,9 +1143,10 @@ public class DagManager extends AbstractIdleService {
       return counters;
     }
 
-    private ContextAwareMeter getGroupMeterForDag(String dagId, String meterName) {
+    private ContextAwareMeter getGroupMeterForDag(String dagId, String meterName, Map<String, ContextAwareMeter> meterMap) {
       String flowGroup = DagManagerUtils.getFlowId(this.dags.get(dagId)).getFlowGroup();
-      return metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, meterName));
+      return meterMap.computeIfAbsent(flowGroup,
+          group -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, group, meterName)));
     }
 
     /**
@@ -1184,7 +1197,7 @@ public class DagManager extends AbstractIdleService {
       if (this.metricContext != null) {
         flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.SUCCESSFUL);
         this.allSuccessfulMeter.mark();
-        getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER).mark();
+        getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER, groupSuccessfulMeters).mark();
       }
     }
 
@@ -1193,7 +1206,7 @@ public class DagManager extends AbstractIdleService {
       if (this.metricContext != null) {
         flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
         this.allFailedMeter.mark();
-        getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER).mark();
+        getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER, groupFailureMeters).mark();
       }
     }
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 7749903..40c17df 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -47,6 +47,9 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
@@ -57,6 +60,7 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 public class DagManagerTest {
@@ -84,8 +88,10 @@ public class DagManagerTest {
     this.queue = new LinkedBlockingQueue<>();
     this.cancelQueue = new LinkedBlockingQueue<>();
     this.resumeQueue = new LinkedBlockingQueue<>();
+    MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
     this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, failedDagStateStore, queue, cancelQueue,
-        resumeQueue, true, 5, new HashMap<>(), new HashSet<>());
+        resumeQueue, true, 5, new HashMap<>(), new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
+        metricContext.contextAwareMeter("failedMeter"));
 
     Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);