You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/10/20 21:35:31 UTC
[gobblin] branch master updated: [GOBBLIN-1931] Refactor dag action updating method & add clarifying comment (#3801)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2e3102f11 [GOBBLIN-1931] Refactor dag action updating method & add clarifying comment (#3801)
2e3102f11 is described below
commit 2e3102f116986030a1b9f5f9d6cb6e031726a594
Author: umustafi <um...@gmail.com>
AuthorDate: Fri Oct 20 14:35:26 2023 -0700
[GOBBLIN-1931] Refactor dag action updating method & add clarifying comment (#3801)
* Refactor dag action updating method & add clarifying comment
* Log filtered out duplicate messages
* logs and metrics for missing messages from change monitor
* Only add gobblin.service prefix for dagActionStoreChangeMonitor
---------
Co-authored-by: Urmi Mustafi <um...@linkedin.com>
---
.../java/org/apache/gobblin/runtime/api/DagActionStore.java | 7 +++----
.../gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java | 6 +++---
.../org/apache/gobblin/runtime/kafka/HighLevelConsumer.java | 4 +++-
.../runtime/api/MysqlMultiActiveLeaseArbiterTest.java | 4 ++--
.../service/modules/orchestration/FlowTriggerHandler.java | 12 +++++++-----
.../gobblin/service/monitoring/ChangeMonitorUtils.java | 2 +-
.../service/monitoring/DagActionStoreChangeMonitor.java | 3 ++-
7 files changed, 21 insertions(+), 17 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index eb26acd16..4f3442597 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -49,10 +49,9 @@ public interface DagActionStore {
/**
* Replace flow execution id with agreed upon event time to easily track the flow
*/
- public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction,
- long eventTimeMillis) {
- return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(),
- String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
+ public DagAction updateFlowExecutionId(long eventTimeMillis) {
+ return new DagAction(this.getFlowGroup(), this.getFlowName(),
+ String.valueOf(eventTimeMillis), this.getFlowActionType());
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index c6161d936..338e908a2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -321,14 +321,14 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
- DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
+ DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbEventTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
- DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
+ DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
@@ -518,7 +518,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
- DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
+ DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis);
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 5e8daaa26..7b494201e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -95,7 +95,7 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
*/
@Getter
private MetricContext metricContext;
- private Counter messagesRead;
+ protected Counter messagesRead;
@Getter
private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
private final ScheduledExecutorService consumerExecutor;
@@ -329,6 +329,8 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
}
}
} catch (InterruptedException e) {
+ log.warn("Encountered exception while processing queue ", e);
+ // TODO: evaluate whether we should interrupt the thread or continue processing
Thread.currentThread().interrupt();
}
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 7bafc78ff..08630ab36 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -217,7 +217,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
- DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+ DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
@@ -299,7 +299,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
- DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+ DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 8abaa209c..c5a5bb8e0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -112,8 +112,12 @@ public class FlowTriggerHandler {
if (multiActiveLeaseArbiter.isPresent()) {
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(
flowAction, eventTimeMillis, isReminderEvent);
+ // The flow action contained in the`LeaseAttemptStatus` from the lease arbiter contains an updated flow execution
+ // id. From this point onwards, always use the newer version of the flow action to easily track the action through
+ // orchestration and execution.
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
- MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+ MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus)
+ leaseAttemptStatus;
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(),
@@ -122,11 +126,9 @@ public class FlowTriggerHandler {
}
// If persisting the flow action failed, then we set another trigger for this event to occur immediately to
// re-attempt handling the event
- DagActionStore.DagAction updatedFlowAction = DagActionStore.DagAction.updateFlowExecutionId(flowAction,
- leaseObtainedStatus.getEventTimeMillis());
scheduleReminderForEvent(jobProps,
- new MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L),
- eventTimeMillis);
+ new MultiActiveLeaseArbiter.LeasedToAnotherStatus(leaseObtainedStatus.getFlowAction(),
+ 0L), eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
this.leasedToAnotherStatusCount.inc();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
index 33934ef06..a2d68fbc0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
@@ -35,7 +35,7 @@ public final class ChangeMonitorUtils {
String operation, String timestamp) {
// If we've already processed a message with this timestamp and key before then skip duplicate message
if (cache.getIfPresent(changeIdentifier) != null) {
- log.debug("Duplicate change event with identifier {}", changeIdentifier);
+ log.info("Duplicate change event with identifier {}", changeIdentifier);
return false;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 1435e076a..e5a2d090d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -216,7 +217,7 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
@Override
protected void createMetrics() {
- super.createMetrics();
+ super.messagesRead = this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);