You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/13 22:17:06 UTC

[gobblin] branch master updated: [GOBBLIN-1631]Emit heartbeat for dagManagerThread (#3492)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ffcd2d9dc [GOBBLIN-1631]Emit heartbeat for dagManagerThread (#3492)
ffcd2d9dc is described below

commit ffcd2d9dcf5d46c79db5bdafe894f4ce72d31aff
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Apr 13 15:17:00 2022 -0700

    [GOBBLIN-1631]Emit heartbeat for dagManagerThread (#3492)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1631]Emit heartbeat for dagManagerThread
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
 .../gobblin/service/modules/orchestration/DagManager.java      | 10 ++++++++--
 .../gobblin/service/modules/orchestration/DagManagerTest.java  |  2 +-
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2d89591e1..dfa231989 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.codahale.metrics.Meter;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.net.URI;
@@ -135,6 +136,7 @@ public class DagManager extends AbstractIdleService {
   private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
   public static final String FAILED_DAG_POLLING_INTERVAL = FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
   public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
+  public static final String DAG_MANAGER_HEARTBEAT = "gobblin.dagManager.heartbeat-%s";
   // Default job start SLA time if configured, measured in minutes. Default is 10 minutes
   private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
   private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
@@ -379,7 +381,7 @@ public class DagManager extends AbstractIdleService {
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
               runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, allSuccessfulMeter,
-              allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager);
+              allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager, i);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
         }
@@ -448,13 +450,14 @@ public class DagManager extends AbstractIdleService {
     private final BlockingQueue<String> cancelQueue;
     private final BlockingQueue<String> resumeQueue;
     private final Long defaultJobStartSlaTimeMillis;
+    private final Optional<Meter> dagManagerThreadHeartbeat;
     /**
      * Constructor.
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
         boolean instrumentationEnabled, Set<String> failedDagIds, ContextAwareMeter allSuccessfulMeter,
-        ContextAwareMeter allFailedMeter, Long defaultJobStartSla, UserQuotaManager quotaManager) {
+        ContextAwareMeter allFailedMeter, Long defaultJobStartSla, UserQuotaManager quotaManager, int dagMangerThreadId) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
@@ -474,10 +477,12 @@ public class DagManager extends AbstractIdleService {
         ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
             orchestrationDelay::get);
         this.metricContext.register(orchestrationDelayMetric);
+        this.dagManagerThreadHeartbeat = Optional.of(this.metricContext.contextAwareMeter(String.format(DAG_MANAGER_HEARTBEAT, dagMangerThreadId)));
       } else {
         this.metricContext = null;
         this.eventSubmitter = Optional.absent();
         this.jobStatusPolledTimer = Optional.absent();
+        this.dagManagerThreadHeartbeat = Optional.absent();
       }
     }
 
@@ -522,6 +527,7 @@ public class DagManager extends AbstractIdleService {
         log.debug("Cleaning up finished dags..");
         cleanUp();
         log.debug("Clean up done");
+        Instrumented.markMeter(dagManagerThreadHeartbeat);
       } catch (Exception e) {
         log.error(String.format("Exception encountered in %s", getClass().getName()), e);
       }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 9daad5fbc..e09f5bbbe 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -101,7 +101,7 @@ public class DagManagerTest {
     this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
     this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore, queue, cancelQueue,
         resumeQueue, true, new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
-        metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT, _gobblinServiceQuotaManager);
+        metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
 
     Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);