You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/07/21 16:26:44 UTC

[gobblin] branch master updated: [GOBBLIN- 1856] Add flow trigger handler leasing metrics (#3717)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0151890d1 [GOBBLIN- 1856] Add flow trigger handler leasing metrics (#3717)
0151890d1 is described below

commit 0151890d1f5274fb86827074a755e54adf189ba0
Author: meethngala <me...@gmail.com>
AuthorDate: Fri Jul 21 09:26:36 2023 -0700

    [GOBBLIN- 1856] Add flow trigger handler leasing metrics (#3717)
    
    * add flow trigger handler leasing metrics
    
    * remove todo and resolve merge conflicts
    
    * address PR comments
    
    ---------
    
    Co-authored-by: Meeth Gala <mg...@linkedin.com>
---
 .../apache/gobblin/metrics/ServiceMetricNames.java    |  5 +++++
 .../modules/orchestration/FlowTriggerHandler.java     | 19 ++++++++++++++++++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 94e2a82c6..9e9ab1e69 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -34,6 +34,11 @@ public class ServiceMetricNames {
   public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.time";
   public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay";
 
+  // Flow Trigger Handler Lease Status Counts
+  public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leaseObtained";
+  public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leasedToAnother";
+  public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.noLongerLeasing";
+
   //Job status poll timer
   public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index ec63276c4..7fdee0384 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -38,8 +38,10 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
 import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
@@ -69,6 +71,12 @@ public class FlowTriggerHandler {
   private MetricContext metricContext;
   private ContextAwareMeter numFlowsSubmitted;
 
+  private ContextAwareCounter leaseObtainedCount;
+
+  private ContextAwareCounter leasedToAnotherStatusCount;
+
+  private ContextAwareCounter noLongerLeasingStatusCount;
+
   @Inject
   public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
       SchedulerService schedulerService, Optional<DagActionStore> dagActionStore) {
@@ -80,6 +88,9 @@ public class FlowTriggerHandler {
     this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
         this.getClass());
     this.numFlowsSubmitted = metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED);
+    this.leaseObtainedCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT);
+    this.leasedToAnotherStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT);
+    this.noLongerLeasingStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
   }
 
   /**
@@ -94,10 +105,12 @@ public class FlowTriggerHandler {
       throws IOException {
     if (multiActiveLeaseArbiter.isPresent()) {
       MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
-      // TODO: add a log event or metric for each of these cases
       if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
         MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+        this.leaseObtainedCount.inc();
         if (persistFlowAction(leaseObtainedStatus)) {
+          log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", leaseObtainedStatus.getFlowAction(),
+              leaseObtainedStatus.getEventTimestamp());
           return;
         }
         // If persisting the flow action failed, then we set another trigger for this event to occur immediately to
@@ -107,10 +120,14 @@ public class FlowTriggerHandler {
             eventTimeMillis);
         return;
       } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+        this.leasedToAnotherStatusCount.inc();
         scheduleReminderForEvent(jobProps, (MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
             eventTimeMillis);
         return;
       } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+        this.noLongerLeasingStatusCount.inc();
+        log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp: %s] ", leaseAttemptStatus.getClass().getName(),
+            eventTimeMillis);
         return;
       }
       throw new RuntimeException(String.format("Received type of leaseAttemptStatus: %s not handled by this method",