You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/13 22:17:06 UTC
[gobblin] branch master updated: [GOBBLIN-1631]Emit heartbeat for dagManagerThread (#3492)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ffcd2d9dc [GOBBLIN-1631]Emit heartbeat for dagManagerThread (#3492)
ffcd2d9dc is described below
commit ffcd2d9dcf5d46c79db5bdafe894f4ce72d31aff
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Apr 13 15:17:00 2022 -0700
[GOBBLIN-1631]Emit heartbeat for dagManagerThread (#3492)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1631]Emit heartbeat for dagManagerThread
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../gobblin/service/modules/orchestration/DagManager.java | 10 ++++++++--
.../gobblin/service/modules/orchestration/DagManagerTest.java | 2 +-
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2d89591e1..dfa231989 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
@@ -135,6 +136,7 @@ public class DagManager extends AbstractIdleService {
private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
public static final String FAILED_DAG_POLLING_INTERVAL = FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
+ public static final String DAG_MANAGER_HEARTBEAT = "gobblin.dagManager.heartbeat-%s";
// Default job start SLA time if configured, measured in minutes. Default is 10 minutes
private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
@@ -379,7 +381,7 @@ public class DagManager extends AbstractIdleService {
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, allSuccessfulMeter,
- allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager);
+ allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager, i);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
}
@@ -448,13 +450,14 @@ public class DagManager extends AbstractIdleService {
private final BlockingQueue<String> cancelQueue;
private final BlockingQueue<String> resumeQueue;
private final Long defaultJobStartSlaTimeMillis;
+ private final Optional<Meter> dagManagerThreadHeartbeat;
/**
* Constructor.
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
boolean instrumentationEnabled, Set<String> failedDagIds, ContextAwareMeter allSuccessfulMeter,
- ContextAwareMeter allFailedMeter, Long defaultJobStartSla, UserQuotaManager quotaManager) {
+ ContextAwareMeter allFailedMeter, Long defaultJobStartSla, UserQuotaManager quotaManager, int dagMangerThreadId) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
@@ -474,10 +477,12 @@ public class DagManager extends AbstractIdleService {
ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelay::get);
this.metricContext.register(orchestrationDelayMetric);
+ this.dagManagerThreadHeartbeat = Optional.of(this.metricContext.contextAwareMeter(String.format(DAG_MANAGER_HEARTBEAT, dagMangerThreadId)));
} else {
this.metricContext = null;
this.eventSubmitter = Optional.absent();
this.jobStatusPolledTimer = Optional.absent();
+ this.dagManagerThreadHeartbeat = Optional.absent();
}
}
@@ -522,6 +527,7 @@ public class DagManager extends AbstractIdleService {
log.debug("Cleaning up finished dags..");
cleanUp();
log.debug("Clean up done");
+ Instrumented.markMeter(dagManagerThreadHeartbeat);
} catch (Exception e) {
log.error(String.format("Exception encountered in %s", getClass().getName()), e);
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 9daad5fbc..e09f5bbbe 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -101,7 +101,7 @@ public class DagManagerTest {
this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore, queue, cancelQueue,
resumeQueue, true, new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
- metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT, _gobblinServiceQuotaManager);
+ metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);