You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/10/17 23:33:38 UTC

[gobblin] branch master updated: [GOBBLIN-1930] Improve Multi-active related logs and metrics (#3800)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new eeb51428c [GOBBLIN-1930] Improve Multi-active related logs and metrics (#3800)
eeb51428c is described below

commit eeb51428ccbc7231a2831c26888827c043cdf3c0
Author: umustafi <um...@gmail.com>
AuthorDate: Tue Oct 17 16:33:32 2023 -0700

    [GOBBLIN-1930] Improve Multi-active related logs and metrics (#3800)
    
    * Improve Multi-active related logs and metrics
    
    * Add more metrics and logs around forwarding dag action to DagManager
    
    * Improve logs in response to review comments
    
    * Replace flow execution id with trigger timestamp from multi-active
    
    * Update flow action execution id within lease arbiter
    
    * Fix test & make Lease Statuses more lean
    
    * Update javadoc
    
    ---------
    
    Co-authored-by: Urmi Mustafi <um...@linkedin.com>
---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  6 ++++
 .../apache/gobblin/runtime/api/DagActionStore.java |  9 ++++++
 .../runtime/api/MultiActiveLeaseArbiter.java       | 27 ++++++++++++-----
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  | 35 ++++++++++++----------
 .../gobblin/runtime/metrics/RuntimeMetrics.java    | 17 ++++++-----
 .../api/MysqlMultiActiveLeaseArbiterTest.java      | 29 ++++++++++--------
 .../service/modules/orchestration/DagManager.java  | 19 ++++++++----
 .../modules/orchestration/DagManagerMetrics.java   | 13 ++++++++
 .../modules/orchestration/FlowTriggerHandler.java  |  6 ++--
 .../modules/orchestration/Orchestrator.java        | 11 ++++++-
 .../monitoring/DagActionStoreChangeMonitor.java    | 24 +++++++++------
 .../orchestration/FlowTriggerHandlerTest.java      |  4 +--
 12 files changed, 139 insertions(+), 61 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 1a544a3df..aff097bbd 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -43,6 +43,12 @@ public class ServiceMetricNames {
   public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
   public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";
 
+  // DagManager Related Metrics
+  public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + ".dagManager";
+  public static final String
+      DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount";
+  public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount";
+
   //Job status poll timer
   public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index 3e11d7c72..eb26acd16 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -45,6 +45,15 @@ public interface DagActionStore {
     public FlowId getFlowId() {
       return new FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
     }
+
+    /**
+     *   Replace flow execution id with agreed upon event time to easily track the flow
+     */
+    public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction,
+        long eventTimeMillis) {
+      return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(),
+          String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
+    }
   }
 
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
index faacb0995..253db49ba 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
@@ -79,28 +79,41 @@ public interface MultiActiveLeaseArbiter {
   class NoLongerLeasingStatus extends LeaseAttemptStatus {}
 
   /*
-  The participant calling this method acquired the lease for the event in question. The class contains the
-  `eventTimestamp` associated with the lease as well as the time the caller obtained the lease or
-  `leaseAcquisitionTimestamp`.
+  The participant calling this method acquired the lease for the event in question. `Flow action`'s flow execution id
+  is the timestamp associated with the lease and the time the caller obtained the lease is stored within the
+  `leaseAcquisitionTimestamp` field.
   */
   @Data
   class LeaseObtainedStatus extends LeaseAttemptStatus {
     private final DagActionStore.DagAction flowAction;
-    private final long eventTimestamp;
     private final long leaseAcquisitionTimestamp;
+
+    /**
+     * @return event time in millis since epoch for the event of this lease acquisition
+     */
+    public long getEventTimeMillis() {
+      return Long.parseLong(flowAction.getFlowExecutionId());
+    }
   }
 
   /*
   This flow action event already has a valid lease owned by another participant.
-  `eventTimeMillis` is the timestamp the lease is associated with, which may be a different timestamp for the same flow
-  action corresponding to the same instance of the event or a distinct one.
+  `Flow action`'s flow execution id is the timestamp the lease is associated with, however the flow action event it
+  corresponds to may be a different and distinct occurrence of the same event.
   `minimumLingerDurationMillis` is the minimum amount of time to wait before this participant should return to check if
   the lease has completed or expired
    */
   @Data
   class LeasedToAnotherStatus extends LeaseAttemptStatus {
     private final DagActionStore.DagAction flowAction;
-    private final long eventTimeMillis;
     private final long minimumLingerDurationMillis;
+
+    /**
+     * Returns event time in millis since epoch for the event whose lease was obtained by another participant.
+     * @return
+     */
+    public long getEventTimeMillis() {
+      return Long.parseLong(flowAction.getFlowExecutionId());
+    }
 }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 4c2e8d2da..c6161d936 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -41,6 +41,8 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;
+
 
 /**
  * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which uses a MySQL store to resolve ownership of
@@ -242,7 +244,6 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
     ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
     Runnable retentionTask = () -> {
       try {
-        Thread.sleep(10000);
         withPreparedStatement(thisTableRetentionStatement,
             retentionStatement -> {
               retentionStatement.setLong(1, retentionPeriodMillis);
@@ -253,7 +254,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
               }
               return numRowsDeleted;
             }, true);
-      } catch (InterruptedException | IOException e) {
+      } catch (IOException e) {
         log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
             + "affect our system performance. Examine exception: ", e);
       }
@@ -307,7 +308,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
         }
         if (eventTimeMillis == dbEventTimestamp.getTime()) {
           // TODO: change this to a debug after fixing issue
-          log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time"
+          log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
                   + "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
               eventTimeMillis, dbEventTimestamp);
         }
@@ -320,16 +321,18 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
       // Lease is valid
       if (leaseValidityStatus == 1) {
         if (isWithinEpsilon) {
+          DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
           log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
-              flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
+              updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
-          return new LeasedToAnotherStatus(flowAction, dbEventTimestamp.getTime(),
+          return new LeasedToAnotherStatus(updatedFlowAction,
               dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
         }
+        DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
         log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
-            flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
+            updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
         // Utilize db lease acquisition timestamp for wait time
-        return new LeasedToAnotherStatus(flowAction, dbCurrentTimestamp.getTime(),
+        return new LeasedToAnotherStatus(updatedFlowAction,
             dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - dbCurrentTimestamp.getTime());
       } // Lease is invalid
       else if (leaseValidityStatus == 2) {
@@ -515,16 +518,16 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
     if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
       return new NoLongerLeasingStatus();
     }
+    DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
     if (numRowsUpdated == 1) {
-      log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", flowAction,
+      log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction,
           isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis);
-      return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
-          selectInfoResult.getLeaseAcquisitionTimeMillis().get());
+      return new LeaseObtainedStatus(updatedFlowAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get());
     }
     log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}",
-        flowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
+        updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
     // Another participant acquired lease in between
-    return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
+    return new LeasedToAnotherStatus(updatedFlowAction,
         selectInfoResult.getLeaseAcquisitionTimeMillis().get() + selectInfoResult.getDbLinger()
             - (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
   }
@@ -599,22 +602,22 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
           updateStatement.setString(++i, flowGroup);
           updateStatement.setString(++i, flowName);
           updateStatement.setString(++i, flowActionType.toString());
-          updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL.get());
+          updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimeMillis()), UTC_CAL.get());
           updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get());
           int numRowsUpdated = updateStatement.executeUpdate();
           if (numRowsUpdated == 0) {
             log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because "
                 + "lease expired or event cleaned up before host completed required actions", flowAction,
-                status.getEventTimestamp());
+                status.getEventTimeMillis());
             return false;
           }
           if( numRowsUpdated == 1) {
             log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - COMPLETED, no longer leasing"
-                    + " this event after this.", flowAction, status.getEventTimestamp());
+                    + " this event after this.", flowAction, status.getEventTimeMillis());
             return true;
           };
           throw new IOException(String.format("Attempt to complete lease use: [%s, eventTimestamp: %s] - updated more "
-                  + "rows than expected", flowAction, status.getEventTimestamp()));
+                  + "rows than expected", flowAction, status.getEventTimeMillis()));
         }, true);
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 33734cfcb..8fc1258ab 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -43,14 +43,17 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed";
   public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
       ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.kills.invoked";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.message.processed";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.resumes.invoked";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.flows.launched";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.unexpected.errors";
-  public static final String
-      GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.produce.to.consume.delay";
+  public static final String DAG_ACTION_STORE_MONITOR_PREFIX = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";
 
+  public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors";
+  public static final String
+      GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay";
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors";
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 15090e8f1..7bafc78ff 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -88,10 +88,11 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(firstLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
-    Assert.assertTrue(firstObtainedStatus.getEventTimestamp() <=
+    Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
         firstObtainedStatus.getLeaseAcquisitionTimestamp());
     Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
-        new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH)));
+        new DagActionStore.DagAction(flowGroup, flowName, String.valueOf(firstObtainedStatus.getEventTimeMillis()),
+            DagActionStore.FlowActionType.LAUNCH)));
 
     // Verify that different DagAction types for the same flow can have leases at the same time
     DagActionStore.DagAction killDagAction = new
@@ -102,7 +103,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
     Assert.assertTrue(
-        killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimestamp());
+        killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimeMillis());
 
     // Tests CASE 2 of acquire lease for a flow action event that already has a valid lease for the same event in db
     // Very little time should have passed if this test directly follows the one above so this call will be considered
@@ -112,7 +113,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(secondLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
     MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
         (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
-    Assert.assertEquals(firstObtainedStatus.getEventTimestamp(), secondLeasedToAnotherStatus.getEventTimeMillis());
+    Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), secondLeasedToAnotherStatus.getEventTimeMillis());
     Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() > 0);
 
     // Tests CASE 3 of trying to acquire a lease for a distinct flow action event, while the previous event's lease is
@@ -124,7 +125,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(thirdLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
     MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
         (MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
-    Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > firstObtainedStatus.getEventTimestamp());
+    Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > firstObtainedStatus.getEventTimeMillis());
     Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() < LINGER);
 
     // Tests CASE 4 of lease out of date
@@ -134,14 +135,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(fourthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
-    Assert.assertTrue(fourthObtainedStatus.getEventTimestamp() > eventTimeMillis + LINGER);
-    Assert.assertTrue(fourthObtainedStatus.getEventTimestamp()
+    Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis() > eventTimeMillis + LINGER);
+    Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis()
         <= fourthObtainedStatus.getLeaseAcquisitionTimestamp());
 
     // Tests CASE 5 of no longer leasing the same event in DB
     // done immediately after previous lease obtainment so should be marked as the same event
     Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
-    Assert.assertTrue(System.currentTimeMillis() - fourthObtainedStatus.getEventTimestamp() < EPSILON);
+    Assert.assertTrue(System.currentTimeMillis() - fourthObtainedStatus.getEventTimeMillis() < EPSILON);
     MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
     Assert.assertTrue(fifthLaunchStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus);
@@ -154,7 +155,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(sixthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
-    Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
+    Assert.assertTrue(sixthObtainedStatus.getEventTimeMillis()
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
 
@@ -216,8 +217,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+    DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+        selectInfoResult.getEventTimeMillis());
     boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
-        resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+        updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
     Assert.assertTrue(markedSuccess);
     // Ensure no NPE results from calling this after a lease has been completed and acquisition timestamp val is NULL
     mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, resumeDagAction,
@@ -281,7 +284,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true);
     Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
     LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
-    Assert.assertTrue(obtainedStatus.getEventTimestamp() > selectInfoResult.getEventTimeMillis());
+    Assert.assertTrue(obtainedStatus.getEventTimeMillis() > selectInfoResult.getEventTimeMillis());
     Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() > selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue());
   }
 
@@ -296,8 +299,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
      // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
      MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
          mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+     DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+         selectInfoResult.getEventTimeMillis());
      boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
-         resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+         updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
      Assert.assertTrue(markedSuccess);
 
      // Sleep enough time for the event to have been considered distinct
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index c8d6bf598..b4ec9c0ce 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -306,6 +306,8 @@ public class DagManager extends AbstractIdleService {
    * Note this should only be called from the {@link Orchestrator} or {@link org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
    */
   public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus) throws IOException {
+    // TODO: Used to track missing dag issue, remove later as needed
+    log.info("Add dag (persist: {}, setStatus: {}): {}", persist, setStatus, dag);
     if (!isActive) {
       log.warn("Skipping add dag because this instance of DagManager is not active for dag: {}", dag);
       return;
@@ -509,14 +511,19 @@ public class DagManager extends AbstractIdleService {
       // Upon handling the action, delete it so on leadership change this is not duplicated
       this.dagActionStore.get().deleteDagAction(launchAction);
     } catch (URISyntaxException e) {
-      log.warn("Could not create URI object for flowId {} due to exception {}", flowId, e.getMessage());
+      log.warn(String.format("Could not create URI object for flowId %s due to exception", flowId), e);
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     } catch (SpecNotFoundException e) {
-      log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage());
+      log.warn(String.format("Spec not found for flowId %s due to exception", flowId), e);
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     } catch (IOException e) {
-      log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag action from dagActionStore (check "
-          + "stacktrace) due to exception {}", flowId, e.getMessage());
+      log.warn(String.format("Failed to add Job Execution Plan for flowId %s OR delete dag action from dagActionStore "
+          + "(check stacktrace) due to exception", flowId), e);
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     } catch (InterruptedException e) {
-      log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e);
+      log.warn(String.format("SpecCompiler failed to reach healthy state before compilation of flowId %s due to "
+              + "exception", flowId), e);
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     }
   }
 
@@ -620,6 +627,8 @@ public class DagManager extends AbstractIdleService {
             }
             //Initialize dag.
             initialize(dag);
+          } else {
+            log.warn("Null dag despite non-empty queue; ignoring the dag");
           }
         }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index a5f34cff7..6d6c545b5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -75,6 +75,9 @@ public class DagManagerMetrics {
   private final Map<String, ContextAwareMeter> executorStartSlaExceededMeters = Maps.newConcurrentMap();
   private final Map<String, ContextAwareMeter> executorSlaExceededMeters = Maps.newConcurrentMap();
   private final Map<String, ContextAwareMeter> executorJobSentMeters = Maps.newConcurrentMap();
+
+  // Metrics for unexpected flow handling failures
+  private ContextAwareCounter failedLaunchEventsOnActivationCount;
   MetricContext metricContext;
 
   public DagManagerMetrics(MetricContext metricContext) {
@@ -100,6 +103,9 @@ public class DagManagerMetrics {
           ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
       allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
           ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
+      failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter(
+          MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+              ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT));
     }
   }
 
@@ -199,6 +205,13 @@ public class DagManagerMetrics {
     }
   }
 
+  // Increment the count for num of failed launches during leader activation
+  public void incrementFailedLaunchCount() {
+    if (this.metricContext != null) {
+      this.failedLaunchEventsOnActivationCount.inc();
+    }
+  }
+
   private List<ContextAwareCounter> getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
     Config configs = dagNode.getValue().getJobSpec().getConfig();
     String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 6d9dcc9d6..8abaa209c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -117,13 +117,15 @@ public class FlowTriggerHandler {
         this.leaseObtainedCount.inc();
         if (persistFlowAction(leaseObtainedStatus)) {
           log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(),
-              leaseObtainedStatus.getEventTimestamp());
+              leaseObtainedStatus.getEventTimeMillis());
           return;
         }
         // If persisting the flow action failed, then we set another trigger for this event to occur immediately to
         // re-attempt handling the event
+        DagActionStore.DagAction updatedFlowAction = DagActionStore.DagAction.updateFlowExecutionId(flowAction,
+            leaseObtainedStatus.getEventTimeMillis());
         scheduleReminderForEvent(jobProps,
-            new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, leaseObtainedStatus.getEventTimestamp(), 0L),
+            new MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L),
             eventTimeMillis);
         return;
       } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7afd8bba4..bcb174320 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
@@ -93,6 +94,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   private Optional<Meter> flowOrchestrationFailedMeter;
   @Getter
   private Optional<Timer> flowOrchestrationTimer;
+  private Optional<Counter> flowFailedForwardToDagManagerCounter;
   @Setter
   private FlowStatusGenerator flowStatusGenerator;
 
@@ -137,12 +139,14 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
       this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
       this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
       this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
+      this.flowFailedForwardToDagManagerCounter = Optional.of(this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT));
       this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
     } else {
       this.metricContext = null;
       this.flowOrchestrationSuccessFulMeter = Optional.absent();
       this.flowOrchestrationFailedMeter = Optional.absent();
       this.flowOrchestrationTimer = Optional.absent();
+      this.flowFailedForwardToDagManagerCounter = Optional.absent();
       this.eventSubmitter = Optional.absent();
     }
     this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
@@ -337,6 +341,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
     if (optionalJobExecutionPlanDag.isPresent()) {
       submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
     } else {
+      _log.warn("Flow: {} submitted to dagManager failed to compile and produce a job execution plan dag", flowSpec);
       Instrumented.markMeter(this.flowOrchestrationFailedMeter);
     }
   }
@@ -347,9 +352,13 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
       //Send the dag to the DagManager.
       this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
     } catch (Exception ex) {
+      String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
+      _log.warn("Orchestrator call - " + failureMessage, ex);
+      if (this.flowFailedForwardToDagManagerCounter.isPresent()) {
+        this.flowFailedForwardToDagManagerCounter.get().inc();
+      }
       if (this.eventSubmitter.isPresent()) {
         // pronounce failed before stack unwinds, to ensure flow not marooned in `COMPILED` state; (failure likely attributable to DB connection/failover)
-        String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
         Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
         flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
         new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 870b68f53..1435e076a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -58,8 +58,10 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
   private ContextAwareMeter killsInvoked;
   private ContextAwareMeter resumesInvoked;
   private ContextAwareMeter flowsLaunched;
+  private ContextAwareMeter failedFlowLaunchSubmissions;
   private ContextAwareMeter unexpectedErrors;
   private ContextAwareMeter messageProcessedMeter;
+  private ContextAwareMeter messageFilteredOutMeter;
   private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge
 
   private volatile Long produceToConsumeDelayValue = -1L;
@@ -130,19 +132,23 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
     String changeIdentifier = tid + key;
     if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, dagActionsSeenCache, operation,
         produceTimestamp.toString())) {
+      this.messageFilteredOutMeter.mark();
       return;
     }
 
+    // Used to easily log information to identify the dag action
+    DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
+        dagActionType);
+
     // We only expect INSERT and DELETE operations done to this table. INSERTs correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be processed. DELETEs require no action.
     try {
       if (operation.equals("INSERT")) {
+        log.info("DagAction change ({}) received for flow: {}", dagActionType, dagAction);
         if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
-          log.info("Received insert dag action and about to send resume flow request");
           dagManager.handleResumeFlowRequest(flowGroup, flowName,Long.parseLong(flowExecutionId));
           this.resumesInvoked.mark();
         } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
-          log.info("Received insert dag action and about to send kill flow request");
           dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId));
           this.killsInvoked.mark();
         } else if (dagActionType.equals(DagActionStore.FlowActionType.LAUNCH)) {
@@ -150,10 +156,8 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
           if (!this.isMultiActiveSchedulerEnabled) {
             this.unexpectedErrors.mark();
             throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler "
-                + "mode for flowAction: %s",
-                new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, dagActionType)));
+                + "mode for flowAction: %s", dagAction));
           }
-          log.info("Received insert dag action and about to forward launch request to DagManager");
           submitFlowToDagManagerHelper(flowGroup, flowName);
         } else {
           log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagActionType);
@@ -191,19 +195,19 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
       this.orchestrator.submitFlowToDagManager(spec);
     } catch (URISyntaxException e) {
       log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
-      this.unexpectedErrors.mark();
+      this.failedFlowLaunchSubmissions.mark();
       return;
     } catch (SpecNotFoundException e) {
       log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage());
-      this.unexpectedErrors.mark();
+      this.failedFlowLaunchSubmissions.mark();
       return;
     } catch (IOException e) {
       log.warn("Failed to add Job Execution Plan for flowId {} due to exception {}", flowId, e.getMessage());
-      this.unexpectedErrors.mark();
+      this.failedFlowLaunchSubmissions.mark();
       return;
     } catch (InterruptedException e) {
       log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e);
-      this.unexpectedErrors.mark();
+      this.failedFlowLaunchSubmissions.mark();
       return;
     }
     // Only mark this if the dag was successfully added
@@ -216,8 +220,10 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
     this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
     this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
     this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
+    this.failedFlowLaunchSubmissions = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS);
     this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
+    this.messageFilteredOutMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
     this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);
     this.getMetricContext().register(this.produceToConsumeDelayMillis);
   }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
index d580704d2..672892673 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
@@ -34,9 +34,9 @@ public class FlowTriggerHandlerTest {
   String cronExpressionSuffix = truncateFirstTwoFieldsOfCronExpression(cronExpression);
   int schedulerBackOffMillis = 10;
   DagActionStore.DagAction flowAction = new DagActionStore.DagAction("flowName", "flowGroup",
-      "999999", DagActionStore.FlowActionType.LAUNCH);
+      String.valueOf(eventToRevisit), DagActionStore.FlowActionType.LAUNCH);
   MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus =
-      new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, eventToRevisit, minimumLingerDurationMillis);
+      new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, minimumLingerDurationMillis);
 
   /**
    * Remove first two fields from cron expression representing seconds and minutes to return truncated cron expression