You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/11/15 19:11:45 UTC

(gobblin) branch master updated: Emit metric to tune LeaseArbiter Linger metric (#3824)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dd17bed43 Emit metric to tune LeaseArbiter Linger metric  (#3824)
dd17bed43 is described below

commit dd17bed4367addd9c68608e2795ffae2d08434b6
Author: umustafi <um...@gmail.com>
AuthorDate: Wed Nov 15 11:11:39 2023 -0800

    Emit metric to tune LeaseArbiter Linger metric  (#3824)
    
    * Monitor number of failed persisting leases to tune linger
    
    * Increase default linger and epsilon values
    
    * Add metric for lease persisting success
    
    * Rename metrics
    
    ---------
    
    Co-authored-by: Urmi Mustafi <um...@linkedin.com>
---
 .../gobblin/configuration/ConfigurationKeys.java       |  4 ++--
 .../org/apache/gobblin/metrics/ServiceMetricNames.java | 18 ++++++++++--------
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java      | 18 ++++++++----------
 .../modules/orchestration/FlowTriggerHandler.java      |  6 ++++++
 4 files changed, 26 insertions(+), 20 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 5fe8f001a..f838fc606 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -117,10 +117,10 @@ public class ConfigurationKeys {
   public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = "-1";
   public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
   public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
-  public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
+  public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 2000;
   // Note: linger should be on the order of seconds even though we measure in millis
   public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
-  public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
+  public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 90000;
   public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
   public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000;
 
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 5550513c4..866644f50 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -36,14 +36,16 @@ public class ServiceMetricNames {
   public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.delay";
 
   // Flow Trigger Handler
-  public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler";
-  public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted";
-  public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained";
-  public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother";
-  public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing";
-  public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
-  public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";
-  public static final String FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasesObtainedDueToReminderCount";
+  public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler.";
+  public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + "numFlowsSubmitted";
+  public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leaseObtained";
+  public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leasedToAnother";
+  public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "noLongerLeasing";
+  public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "jobDoesNotExistInScheduler";
+  public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "failedToSetReminderCount";
+  public static final String FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leasesObtainedDueToReminderCount";
+  public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_RECORD_LEASE_SUCCESS_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "failedToRecordLeaseSuccessCount";
+  public static final String FLOW_TRIGGER_HANDLER_RECORDED_LEASE_SUCCESS_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "recordedLeaseSuccessCount";
 
   // DagManager Related Metrics
   public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 925490cd9..f514e4c01 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -244,10 +244,9 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
     // Query lease arbiter table about this flow action
     Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, isReminderEvent, eventTimeMillis);
 
-    // TODO: change all the `CASE N: ...` statements back to debug statements after uncovering issue
     try {
       if (!getResult.isPresent()) {
-        log.info("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 1: no existing row for this flow action,"
+        log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 1: no existing row for this flow action,"
             + " then go ahead and insert", flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis);
         int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.empty(), isReminderEvent);
@@ -266,7 +265,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
       // because db laundering tells us that the currently worked on db event is newer and will have its own reminders
       if (isReminderEvent) {
         if (eventTimeMillis < dbEventTimestamp.getTime()) {
-          log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - A new event trigger "
+          log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - A new event trigger "
                   + "is being worked on, so this older reminder will be dropped.", flowAction,
               isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp);
           return new NoLongerLeasingStatus();
@@ -279,8 +278,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
               isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp.getTime());
         }
         if (eventTimeMillis == dbEventTimestamp.getTime()) {
-          // TODO: change this to a debug after fixing issue
-          log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
+          log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
                   + "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
               eventTimeMillis, dbEventTimestamp);
         }
@@ -294,21 +292,21 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
       if (leaseValidityStatus == 1) {
         if (isWithinEpsilon) {
           DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbEventTimestamp.getTime());
-          log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
+          log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
               updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
           return new LeasedToAnotherStatus(updatedFlowAction,
               dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
         }
         DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime());
-        log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
+        log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
             updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
         // Utilize db lease acquisition timestamp for wait time
         return new LeasedToAnotherStatus(updatedFlowAction,
             dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - dbCurrentTimestamp.getTime());
       } // Lease is invalid
       else if (leaseValidityStatus == 2) {
-        log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of "
+        log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of "
             + "whether same or distinct event)", flowAction, isReminderEvent ? "reminder" : "original",
             dbCurrentTimestamp.getTime());
         if (isWithinEpsilon && !isReminderEvent) {
@@ -322,11 +320,11 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
         return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp), isReminderEvent);
       } // No longer leasing this event
         if (isWithinEpsilon) {
-          log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event"
+          log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event"
               + " in db", flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
           return new NoLongerLeasingStatus();
         }
-        log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing "
+        log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing "
             + "event in db", flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db eventTimestamp and NULL leaseAcquisitionTimestamp
         int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index af65390ec..a1b141d95 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -80,6 +80,8 @@ public class FlowTriggerHandler {
   private ContextAwareCounter jobDoesNotExistInSchedulerCount;
   private ContextAwareCounter failedToSetEventReminderCount;
   private ContextAwareMeter leasesObtainedDueToReminderCount;
+  private ContextAwareMeter failedToRecordLeaseSuccessCount;
+  private ContextAwareMeter recordedLeaseSuccessCount;
 
   @Inject
   public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
@@ -98,6 +100,8 @@ public class FlowTriggerHandler {
     this.jobDoesNotExistInSchedulerCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT);
     this.failedToSetEventReminderCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT);
     this.leasesObtainedDueToReminderCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT);
+    this.failedToRecordLeaseSuccessCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_RECORD_LEASE_SUCCESS_COUNT);
+    this.recordedLeaseSuccessCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_RECORDED_LEASE_SUCCESS_COUNT);
   }
 
   /**
@@ -127,8 +131,10 @@ public class FlowTriggerHandler {
         if (persistFlowAction(leaseObtainedStatus)) {
           log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(),
               leaseObtainedStatus.getEventTimeMillis());
+          this.recordedLeaseSuccessCount.mark();
           return;
         }
+        this.failedToRecordLeaseSuccessCount.mark();
         // If persisting the flow action failed, then we set another trigger for this event to occur immediately to
         // re-attempt handling the event
         scheduleReminderForEvent(jobProps,