You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/10/20 21:35:31 UTC

[gobblin] branch master updated: [GOBBLIN-1931] Refactor dag action updating method & add clarifying comment (#3801)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e3102f11 [GOBBLIN-1931] Refactor dag action updating method & add clarifying comment (#3801)
2e3102f11 is described below

commit 2e3102f116986030a1b9f5f9d6cb6e031726a594
Author: umustafi <um...@gmail.com>
AuthorDate: Fri Oct 20 14:35:26 2023 -0700

    [GOBBLIN-1931] Refactor dag action updating method & add clarifying comment (#3801)
    
    * Refactor dag action updating method & add clarifying comment
    
    * Log filtered out duplicate messages
    
    * logs and metrics for missing messages from change monitor
    
    * Only add gobblin.service prefix for dagActionStoreChangeMonitor
    
    ---------
    
    Co-authored-by: Urmi Mustafi <um...@linkedin.com>
---
 .../java/org/apache/gobblin/runtime/api/DagActionStore.java  |  7 +++----
 .../gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java    |  6 +++---
 .../org/apache/gobblin/runtime/kafka/HighLevelConsumer.java  |  4 +++-
 .../runtime/api/MysqlMultiActiveLeaseArbiterTest.java        |  4 ++--
 .../service/modules/orchestration/FlowTriggerHandler.java    | 12 +++++++-----
 .../gobblin/service/monitoring/ChangeMonitorUtils.java       |  2 +-
 .../service/monitoring/DagActionStoreChangeMonitor.java      |  3 ++-
 7 files changed, 21 insertions(+), 17 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index eb26acd16..4f3442597 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -49,10 +49,9 @@ public interface DagActionStore {
     /**
      *   Replace flow execution id with agreed upon event time to easily track the flow
      */
-    public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction,
-        long eventTimeMillis) {
-      return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(),
-          String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
+    public DagAction updateFlowExecutionId(long eventTimeMillis) {
+      return new DagAction(this.getFlowGroup(), this.getFlowName(),
+          String.valueOf(eventTimeMillis), this.getFlowActionType());
     }
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index c6161d936..338e908a2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -321,14 +321,14 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
       // Lease is valid
       if (leaseValidityStatus == 1) {
         if (isWithinEpsilon) {
-          DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
+          DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbEventTimestamp.getTime());
           log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
               updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
           return new LeasedToAnotherStatus(updatedFlowAction,
               dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
         }
-        DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
+        DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime());
         log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
             updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
         // Utilize db lease acquisition timestamp for wait time
@@ -518,7 +518,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
     if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
       return new NoLongerLeasingStatus();
     }
-    DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
+    DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis);
     if (numRowsUpdated == 1) {
       log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction,
           isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 5e8daaa26..7b494201e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -95,7 +95,7 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
    */
   @Getter
   private MetricContext metricContext;
-  private Counter messagesRead;
+  protected Counter messagesRead;
   @Getter
   private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
   private final ScheduledExecutorService consumerExecutor;
@@ -329,6 +329,8 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
           }
         }
       } catch (InterruptedException e) {
+        log.warn("Encountered exception while processing queue ", e);
+        // TODO: evaluate whether we should interrupt the thread or continue processing
         Thread.currentThread().interrupt();
       }
     }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 7bafc78ff..08630ab36 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -217,7 +217,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
-    DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+    DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId(
         selectInfoResult.getEventTimeMillis());
     boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
         updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
@@ -299,7 +299,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
      // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
      MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
          mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
-     DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+     DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId(
          selectInfoResult.getEventTimeMillis());
      boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
          updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 8abaa209c..c5a5bb8e0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -112,8 +112,12 @@ public class FlowTriggerHandler {
     if (multiActiveLeaseArbiter.isPresent()) {
       MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(
           flowAction, eventTimeMillis, isReminderEvent);
+      // The flow action contained in the`LeaseAttemptStatus` from the lease arbiter contains an updated flow execution
+      // id. From this point onwards, always use the newer version of the flow action to easily track the action through
+      // orchestration and execution.
       if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
-        MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+        MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus)
+            leaseAttemptStatus;
         this.leaseObtainedCount.inc();
         if (persistFlowAction(leaseObtainedStatus)) {
           log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(),
@@ -122,11 +126,9 @@ public class FlowTriggerHandler {
         }
         // If persisting the flow action failed, then we set another trigger for this event to occur immediately to
         // re-attempt handling the event
-        DagActionStore.DagAction updatedFlowAction = DagActionStore.DagAction.updateFlowExecutionId(flowAction,
-            leaseObtainedStatus.getEventTimeMillis());
         scheduleReminderForEvent(jobProps,
-            new MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L),
-            eventTimeMillis);
+            new MultiActiveLeaseArbiter.LeasedToAnotherStatus(leaseObtainedStatus.getFlowAction(),
+                0L), eventTimeMillis);
         return;
       } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
         this.leasedToAnotherStatusCount.inc();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
index 33934ef06..a2d68fbc0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
@@ -35,7 +35,7 @@ public final class ChangeMonitorUtils {
       String operation, String timestamp) {
     // If we've already processed a message with this timestamp and key before then skip duplicate message
     if (cache.getIfPresent(changeIdentifier) != null) {
-      log.debug("Duplicate change event with identifier {}", changeIdentifier);
+      log.info("Duplicate change event with identifier {}", changeIdentifier);
       return false;
     }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 1435e076a..e5a2d090d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -216,7 +217,7 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
 
   @Override
   protected void createMetrics() {
-    super.createMetrics();
+    super.messagesRead = this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
     this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
     this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
     this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);