You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/07/21 16:26:44 UTC
[gobblin] branch master updated: [GOBBLIN- 1856] Add flow trigger handler leasing metrics (#3717)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0151890d1 [GOBBLIN- 1856] Add flow trigger handler leasing metrics (#3717)
0151890d1 is described below
commit 0151890d1f5274fb86827074a755e54adf189ba0
Author: meethngala <me...@gmail.com>
AuthorDate: Fri Jul 21 09:26:36 2023 -0700
[GOBBLIN- 1856] Add flow trigger handler leasing metrics (#3717)
* add flow trigger handler leasing metrics
* remove todo and resolve merge conflicts
* address PR comments
---------
Co-authored-by: Meeth Gala <mg...@linkedin.com>
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 5 +++++
.../modules/orchestration/FlowTriggerHandler.java | 19 ++++++++++++++++++-
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 94e2a82c6..9e9ab1e69 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -34,6 +34,11 @@ public class ServiceMetricNames {
public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.time";
public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay";
+ // Flow Trigger Handler Lease Status Counts
+ public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leaseObtained";
+ public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.leasedToAnother";
+ public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = GOBBLIN_SERVICE_PREFIX + ".flowTriggerHandler.noLongerLeasing";
+
//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index ec63276c4..7fdee0384 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -38,8 +38,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
@@ -69,6 +71,12 @@ public class FlowTriggerHandler {
private MetricContext metricContext;
private ContextAwareMeter numFlowsSubmitted;
+ private ContextAwareCounter leaseObtainedCount;
+
+ private ContextAwareCounter leasedToAnotherStatusCount;
+
+ private ContextAwareCounter noLongerLeasingStatusCount;
+
@Inject
public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
SchedulerService schedulerService, Optional<DagActionStore> dagActionStore) {
@@ -80,6 +88,9 @@ public class FlowTriggerHandler {
this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
this.getClass());
this.numFlowsSubmitted = metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED);
+ this.leaseObtainedCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT);
+ this.leasedToAnotherStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT);
+ this.noLongerLeasingStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
}
/**
@@ -94,10 +105,12 @@ public class FlowTriggerHandler {
throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
- // TODO: add a log event or metric for each of these cases
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+ this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
+ log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", leaseObtainedStatus.getFlowAction(),
+ leaseObtainedStatus.getEventTimestamp());
return;
}
// If persisting the flow action failed, then we set another trigger for this event to occur immediately to
@@ -107,10 +120,14 @@ public class FlowTriggerHandler {
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ this.leasedToAnotherStatusCount.inc();
scheduleReminderForEvent(jobProps, (MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ this.noLongerLeasingStatusCount.inc();
+ log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp: %s] ", leaseAttemptStatus.getClass().getName(),
+ eventTimeMillis);
return;
}
throw new RuntimeException(String.format("Received type of leaseAttemptStatus: %s not handled by this method",