You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/11 00:47:05 UTC
[gobblin] branch master updated: [GOBBLIN-1466] Make meters shared
between DagManagerThreads
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f74c232 [GOBBLIN-1466] Make meters shared between DagManagerThreads
f74c232 is described below
commit f74c232ad05b3b3ca0fcecd4789657bd81af9e42
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Jun 10 17:46:57 2021 -0700
[GOBBLIN-1466] Make meters shared between DagManagerThreads
Closes #3306 from jack-moseley/shared-meters
---
.../service/modules/orchestration/DagManager.java | 39 ++++++++++++++--------
.../modules/orchestration/DagManagerTest.java | 8 ++++-
2 files changed, 33 insertions(+), 14 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 821d87e..5c6dd1b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -364,11 +364,22 @@ public class DagManager extends AbstractIdleService {
this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
Set<String> failedDagIds = Collections.synchronizedSet(this.failedDagStateStore.getDagIds());
+ ContextAwareMeter allSuccessfulMeter = null;
+ ContextAwareMeter allFailedMeter = null;
+ if (instrumentationEnabled) {
+ MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+ allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SUCCESSFUL_FLOW_METER));
+ allFailedMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.FAILED_FLOW_METER));
+ }
+
//On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
- queue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds);
+ queue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
+ allSuccessfulMeter, allFailedMeter);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
}
@@ -420,9 +431,11 @@ public class DagManager extends AbstractIdleService {
private final int defaultQuota;
private final Map<String, Integer> perUserQuota;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
- private static Map<String, FlowState> flowGauges = Maps.newHashMap();
- private ContextAwareMeter allSuccessfulMeter;
- private ContextAwareMeter allFailedMeter;
+ private static final Map<String, FlowState> flowGauges = Maps.newConcurrentMap();
+ private final ContextAwareMeter allSuccessfulMeter;
+ private final ContextAwareMeter allFailedMeter;
+ private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
+ private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
@@ -436,7 +449,8 @@ public class DagManager extends AbstractIdleService {
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
- boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds) {
+ boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds,
+ ContextAwareMeter allSuccessfulMeter, ContextAwareMeter allFailedMeter) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
@@ -446,16 +460,14 @@ public class DagManager extends AbstractIdleService {
this.resumeQueue = resumeQueue;
this.defaultQuota = defaultQuota;
this.perUserQuota = perUserQuota;
+ this.allSuccessfulMeter = allSuccessfulMeter;
+ this.allFailedMeter = allFailedMeter;
if (instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
() -> orchestrationDelay.get());
- this.allSuccessfulMeter = metricContext.contextAwareMeter(
- MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SUCCESSFUL_FLOW_METER));
- this.allFailedMeter = metricContext.contextAwareMeter(
- MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.FAILED_FLOW_METER));
this.metricContext.register(orchestrationDelayMetric);
} else {
this.metricContext = null;
@@ -1131,9 +1143,10 @@ public class DagManager extends AbstractIdleService {
return counters;
}
- private ContextAwareMeter getGroupMeterForDag(String dagId, String meterName) {
+ private ContextAwareMeter getGroupMeterForDag(String dagId, String meterName, Map<String, ContextAwareMeter> meterMap) {
String flowGroup = DagManagerUtils.getFlowId(this.dags.get(dagId)).getFlowGroup();
- return metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, meterName));
+ return meterMap.computeIfAbsent(flowGroup,
+ group -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, group, meterName)));
}
/**
@@ -1184,7 +1197,7 @@ public class DagManager extends AbstractIdleService {
if (this.metricContext != null) {
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.SUCCESSFUL);
this.allSuccessfulMeter.mark();
- getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER).mark();
+ getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER, groupSuccessfulMeters).mark();
}
}
@@ -1193,7 +1206,7 @@ public class DagManager extends AbstractIdleService {
if (this.metricContext != null) {
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
this.allFailedMeter.mark();
- getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER).mark();
+ getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER, groupFailureMeters).mark();
}
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 7749903..40c17df 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -47,6 +47,9 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
@@ -57,6 +60,7 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
public class DagManagerTest {
@@ -84,8 +88,10 @@ public class DagManagerTest {
this.queue = new LinkedBlockingQueue<>();
this.cancelQueue = new LinkedBlockingQueue<>();
this.resumeQueue = new LinkedBlockingQueue<>();
+ MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, failedDagStateStore, queue, cancelQueue,
- resumeQueue, true, 5, new HashMap<>(), new HashSet<>());
+ resumeQueue, true, 5, new HashMap<>(), new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
+ metricContext.contextAwareMeter("failedMeter"));
Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);