You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/11/15 19:11:45 UTC
(gobblin) branch master updated: Emit metric to tune LeaseArbiter Linger metric (#3824)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new dd17bed43 Emit metric to tune LeaseArbiter Linger metric (#3824)
dd17bed43 is described below
commit dd17bed4367addd9c68608e2795ffae2d08434b6
Author: umustafi <um...@gmail.com>
AuthorDate: Wed Nov 15 11:11:39 2023 -0800
Emit metric to tune LeaseArbiter Linger metric (#3824)
* Monitor number of failed persisting leases to tune linger
* Increase default linger and epsilon values
* Add metric for lease persisting success
* Rename metrics
---------
Co-authored-by: Urmi Mustafi <um...@linkedin.com>
---
.../gobblin/configuration/ConfigurationKeys.java | 4 ++--
.../org/apache/gobblin/metrics/ServiceMetricNames.java | 18 ++++++++++--------
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 18 ++++++++----------
.../modules/orchestration/FlowTriggerHandler.java | 6 ++++++
4 files changed, 26 insertions(+), 20 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 5fe8f001a..f838fc606 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -117,10 +117,10 @@ public class ConfigurationKeys {
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = "-1";
public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
- public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
+ public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 2000;
// Note: linger should be on the order of seconds even though we measure in millis
public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
- public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
+ public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 90000;
public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000;
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 5550513c4..866644f50 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -36,14 +36,16 @@ public class ServiceMetricNames {
public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.delay";
// Flow Trigger Handler
- public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler";
- public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted";
- public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained";
- public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother";
- public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing";
- public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
- public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";
- public static final String FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasesObtainedDueToReminderCount";
+ public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler.";
+ public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + "numFlowsSubmitted";
+ public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leaseObtained";
+ public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leasedToAnother";
+ public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "noLongerLeasing";
+ public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "jobDoesNotExistInScheduler";
+ public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "failedToSetReminderCount";
+ public static final String FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leasesObtainedDueToReminderCount";
+ public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_RECORD_LEASE_SUCCESS_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "failedToRecordLeaseSuccessCount";
+ public static final String FLOW_TRIGGER_HANDLER_RECORDED_LEASE_SUCCESS_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "recordedLeaseSuccessCount";
// DagManager Related Metrics
public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 925490cd9..f514e4c01 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -244,10 +244,9 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, isReminderEvent, eventTimeMillis);
- // TODO: change all the `CASE N: ...` statements back to debug statements after uncovering issue
try {
if (!getResult.isPresent()) {
- log.info("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 1: no existing row for this flow action,"
+ log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 1: no existing row for this flow action,"
+ " then go ahead and insert", flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis);
int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.empty(), isReminderEvent);
@@ -266,7 +265,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
// because db laundering tells us that the currently worked on db event is newer and will have its own reminders
if (isReminderEvent) {
if (eventTimeMillis < dbEventTimestamp.getTime()) {
- log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - A new event trigger "
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - A new event trigger "
+ "is being worked on, so this older reminder will be dropped.", flowAction,
isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp);
return new NoLongerLeasingStatus();
@@ -279,8 +278,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp.getTime());
}
if (eventTimeMillis == dbEventTimestamp.getTime()) {
- // TODO: change this to a debug after fixing issue
- log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
+ "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis, dbEventTimestamp);
}
@@ -294,21 +292,21 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbEventTimestamp.getTime());
- log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime());
- log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
- log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of "
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of "
+ "whether same or distinct event)", flowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
if (isWithinEpsilon && !isReminderEvent) {
@@ -322,11 +320,11 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp), isReminderEvent);
} // No longer leasing this event
if (isWithinEpsilon) {
- log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event"
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event"
+ " in db", flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
return new NoLongerLeasingStatus();
}
- log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing "
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing "
+ "event in db", flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db eventTimestamp and NULL leaseAcquisitionTimestamp
int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index af65390ec..a1b141d95 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -80,6 +80,8 @@ public class FlowTriggerHandler {
private ContextAwareCounter jobDoesNotExistInSchedulerCount;
private ContextAwareCounter failedToSetEventReminderCount;
private ContextAwareMeter leasesObtainedDueToReminderCount;
+ private ContextAwareMeter failedToRecordLeaseSuccessCount;
+ private ContextAwareMeter recordedLeaseSuccessCount;
@Inject
public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
@@ -98,6 +100,8 @@ public class FlowTriggerHandler {
this.jobDoesNotExistInSchedulerCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT);
this.failedToSetEventReminderCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT);
this.leasesObtainedDueToReminderCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT);
+ this.failedToRecordLeaseSuccessCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_RECORD_LEASE_SUCCESS_COUNT);
+ this.recordedLeaseSuccessCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_RECORDED_LEASE_SUCCESS_COUNT);
}
/**
@@ -127,8 +131,10 @@ public class FlowTriggerHandler {
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(),
leaseObtainedStatus.getEventTimeMillis());
+ this.recordedLeaseSuccessCount.mark();
return;
}
+ this.failedToRecordLeaseSuccessCount.mark();
// If persisting the flow action failed, then we set another trigger for this event to occur immediately to
// re-attempt handling the event
scheduleReminderForEvent(jobProps,