You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/10/17 23:33:38 UTC
[gobblin] branch master updated: [GOBBLIN-1930] Improve Multi-active related logs and metrics (#3800)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new eeb51428c [GOBBLIN-1930] Improve Multi-active related logs and metrics (#3800)
eeb51428c is described below
commit eeb51428ccbc7231a2831c26888827c043cdf3c0
Author: umustafi <um...@gmail.com>
AuthorDate: Tue Oct 17 16:33:32 2023 -0700
[GOBBLIN-1930] Improve Multi-active related logs and metrics (#3800)
* Improve Multi-active related logs and metrics
* Add more metrics and logs around forwarding dag action to DagManager
* Improve logs in response to review comments
* Replace flow execution id with trigger timestamp from multi-active
* Update flow action execution id within lease arbiter
* Fix test & make Lease Statuses more lean
* Update javadoc
---------
Co-authored-by: Urmi Mustafi <um...@linkedin.com>
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 6 ++++
.../apache/gobblin/runtime/api/DagActionStore.java | 9 ++++++
.../runtime/api/MultiActiveLeaseArbiter.java | 27 ++++++++++++-----
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 35 ++++++++++++----------
.../gobblin/runtime/metrics/RuntimeMetrics.java | 17 ++++++-----
.../api/MysqlMultiActiveLeaseArbiterTest.java | 29 ++++++++++--------
.../service/modules/orchestration/DagManager.java | 19 ++++++++----
.../modules/orchestration/DagManagerMetrics.java | 13 ++++++++
.../modules/orchestration/FlowTriggerHandler.java | 6 ++--
.../modules/orchestration/Orchestrator.java | 11 ++++++-
.../monitoring/DagActionStoreChangeMonitor.java | 24 +++++++++------
.../orchestration/FlowTriggerHandlerTest.java | 4 +--
12 files changed, 139 insertions(+), 61 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 1a544a3df..aff097bbd 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -43,6 +43,12 @@ public class ServiceMetricNames {
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";
+ // DagManager Related Metrics
+ public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + ".dagManager";
+ public static final String
+ DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount";
+ public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount";
+
//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index 3e11d7c72..eb26acd16 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -45,6 +45,15 @@ public interface DagActionStore {
public FlowId getFlowId() {
return new FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
}
+
+ /**
+ * Replace flow execution id with agreed upon event time to easily track the flow
+ */
+ public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction,
+ long eventTimeMillis) {
+ return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(),
+ String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
index faacb0995..253db49ba 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
@@ -79,28 +79,41 @@ public interface MultiActiveLeaseArbiter {
class NoLongerLeasingStatus extends LeaseAttemptStatus {}
/*
- The participant calling this method acquired the lease for the event in question. The class contains the
- `eventTimestamp` associated with the lease as well as the time the caller obtained the lease or
- `leaseAcquisitionTimestamp`.
+ The participant calling this method acquired the lease for the event in question. `Flow action`'s flow execution id
+ is the timestamp associated with the lease and the time the caller obtained the lease is stored within the
+ `leaseAcquisitionTimestamp` field.
*/
@Data
class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
- private final long eventTimestamp;
private final long leaseAcquisitionTimestamp;
+
+ /**
+ * @return event time in millis since epoch for the event of this lease acquisition
+ */
+ public long getEventTimeMillis() {
+ return Long.parseLong(flowAction.getFlowExecutionId());
+ }
}
/*
This flow action event already has a valid lease owned by another participant.
- `eventTimeMillis` is the timestamp the lease is associated with, which may be a different timestamp for the same flow
- action corresponding to the same instance of the event or a distinct one.
+ `Flow action`'s flow execution id is the timestamp the lease is associated with, however the flow action event it
+ corresponds to may be a different and distinct occurrence of the same event.
`minimumLingerDurationMillis` is the minimum amount of time to wait before this participant should return to check if
the lease has completed or expired
*/
@Data
class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
- private final long eventTimeMillis;
private final long minimumLingerDurationMillis;
+
+ /**
+ * Returns event time in millis since epoch for the event whose lease was obtained by another participant.
+ * @return
+ */
+ public long getEventTimeMillis() {
+ return Long.parseLong(flowAction.getFlowExecutionId());
+ }
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 4c2e8d2da..c6161d936 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -41,6 +41,8 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;
+
/**
* MySQL based implementation of the {@link MultiActiveLeaseArbiter} which uses a MySQL store to resolve ownership of
@@ -242,7 +244,6 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
- Thread.sleep(10000);
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
retentionStatement.setLong(1, retentionPeriodMillis);
@@ -253,7 +254,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
}
return numRowsDeleted;
}, true);
- } catch (InterruptedException | IOException e) {
+ } catch (IOException e) {
log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
+ "affect our system performance. Examine exception: ", e);
}
@@ -307,7 +308,7 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
}
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
- log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time"
+ log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
+ "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis, dbEventTimestamp);
}
@@ -320,16 +321,18 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
+ DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
- flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
+ updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new LeasedToAnotherStatus(flowAction, dbEventTimestamp.getTime(),
+ return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
+ DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
- flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
+ updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
- return new LeasedToAnotherStatus(flowAction, dbCurrentTimestamp.getTime(),
+ return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
@@ -515,16 +518,16 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
+ DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
if (numRowsUpdated == 1) {
- log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", flowAction,
+ log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis);
- return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
- selectInfoResult.getLeaseAcquisitionTimeMillis().get());
+ return new LeaseObtainedStatus(updatedFlowAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}",
- flowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
+ updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
- return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
+ return new LeasedToAnotherStatus(updatedFlowAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get() + selectInfoResult.getDbLinger()
- (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}
@@ -599,22 +602,22 @@ public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowActionType.toString());
- updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL.get());
+ updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimeMillis()), UTC_CAL.get());
updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get());
int numRowsUpdated = updateStatement.executeUpdate();
if (numRowsUpdated == 0) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because "
+ "lease expired or event cleaned up before host completed required actions", flowAction,
- status.getEventTimestamp());
+ status.getEventTimeMillis());
return false;
}
if( numRowsUpdated == 1) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - COMPLETED, no longer leasing"
- + " this event after this.", flowAction, status.getEventTimestamp());
+ + " this event after this.", flowAction, status.getEventTimeMillis());
return true;
};
throw new IOException(String.format("Attempt to complete lease use: [%s, eventTimestamp: %s] - updated more "
- + "rows than expected", flowAction, status.getEventTimestamp()));
+ + "rows than expected", flowAction, status.getEventTimeMillis()));
}, true);
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 33734cfcb..8fc1258ab 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -43,14 +43,17 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed";
public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.kills.invoked";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.message.processed";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.resumes.invoked";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.flows.launched";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.unexpected.errors";
- public static final String
- GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.produce.to.consume.delay";
+ public static final String DAG_ACTION_STORE_MONITOR_PREFIX = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";
+ public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors";
+ public static final String
+ GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 15090e8f1..7bafc78ff 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -88,10 +88,11 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(firstLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
- Assert.assertTrue(firstObtainedStatus.getEventTimestamp() <=
+ Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
- new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH)));
+ new DagActionStore.DagAction(flowGroup, flowName, String.valueOf(firstObtainedStatus.getEventTimeMillis()),
+ DagActionStore.FlowActionType.LAUNCH)));
// Verify that different DagAction types for the same flow can have leases at the same time
DagActionStore.DagAction killDagAction = new
@@ -102,7 +103,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
Assert.assertTrue(
- killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimestamp());
+ killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimeMillis());
// Tests CASE 2 of acquire lease for a flow action event that already has a valid lease for the same event in db
// Very little time should have passed if this test directly follows the one above so this call will be considered
@@ -112,7 +113,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(secondLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
- Assert.assertEquals(firstObtainedStatus.getEventTimestamp(), secondLeasedToAnotherStatus.getEventTimeMillis());
+ Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() > 0);
// Tests CASE 3 of trying to acquire a lease for a distinct flow action event, while the previous event's lease is
@@ -124,7 +125,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(thirdLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
- Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > firstObtainedStatus.getEventTimestamp());
+ Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > firstObtainedStatus.getEventTimeMillis());
Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() < LINGER);
// Tests CASE 4 of lease out of date
@@ -134,14 +135,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(fourthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
- Assert.assertTrue(fourthObtainedStatus.getEventTimestamp() > eventTimeMillis + LINGER);
- Assert.assertTrue(fourthObtainedStatus.getEventTimestamp()
+ Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis() > eventTimeMillis + LINGER);
+ Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis()
<= fourthObtainedStatus.getLeaseAcquisitionTimestamp());
// Tests CASE 5 of no longer leasing the same event in DB
// done immediately after previous lease obtainment so should be marked as the same event
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
- Assert.assertTrue(System.currentTimeMillis() - fourthObtainedStatus.getEventTimestamp() < EPSILON);
+ Assert.assertTrue(System.currentTimeMillis() - fourthObtainedStatus.getEventTimeMillis() < EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(fifthLaunchStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus);
@@ -154,7 +155,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(sixthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
- Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
+ Assert.assertTrue(sixthObtainedStatus.getEventTimeMillis()
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}
@@ -216,8 +217,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+ selectInfoResult.getEventTimeMillis());
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
- resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+ updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been completed and acquisition timestamp val is NULL
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, resumeDagAction,
@@ -281,7 +284,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true);
Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
- Assert.assertTrue(obtainedStatus.getEventTimestamp() > selectInfoResult.getEventTimeMillis());
+ Assert.assertTrue(obtainedStatus.getEventTimeMillis() > selectInfoResult.getEventTimeMillis());
Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() > selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue());
}
@@ -296,8 +299,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+ selectInfoResult.getEventTimeMillis());
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
- resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+ updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
// Sleep enough time for the event to have been considered distinct
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index c8d6bf598..b4ec9c0ce 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -306,6 +306,8 @@ public class DagManager extends AbstractIdleService {
* Note this should only be called from the {@link Orchestrator} or {@link org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
*/
public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus) throws IOException {
+ // TODO: Used to track missing dag issue, remove later as needed
+ log.info("Add dag (persist: {}, setStatus: {}): {}", persist, setStatus, dag);
if (!isActive) {
log.warn("Skipping add dag because this instance of DagManager is not active for dag: {}", dag);
return;
@@ -509,14 +511,19 @@ public class DagManager extends AbstractIdleService {
// Upon handling the action, delete it so on leadership change this is not duplicated
this.dagActionStore.get().deleteDagAction(launchAction);
} catch (URISyntaxException e) {
- log.warn("Could not create URI object for flowId {} due to exception {}", flowId, e.getMessage());
+ log.warn(String.format("Could not create URI object for flowId %s due to exception", flowId), e);
+ this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (SpecNotFoundException e) {
- log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage());
+ log.warn(String.format("Spec not found for flowId %s due to exception", flowId), e);
+ this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (IOException e) {
- log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag action from dagActionStore (check "
- + "stacktrace) due to exception {}", flowId, e.getMessage());
+ log.warn(String.format("Failed to add Job Execution Plan for flowId %s OR delete dag action from dagActionStore "
+ + "(check stacktrace) due to exception", flowId), e);
+ this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (InterruptedException e) {
- log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e);
+ log.warn(String.format("SpecCompiler failed to reach healthy state before compilation of flowId %s due to "
+ + "exception", flowId), e);
+ this.dagManagerMetrics.incrementFailedLaunchCount();
}
}
@@ -620,6 +627,8 @@ public class DagManager extends AbstractIdleService {
}
//Initialize dag.
initialize(dag);
+ } else {
+ log.warn("Null dag despite non-empty queue; ignoring the dag");
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index a5f34cff7..6d6c545b5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -75,6 +75,9 @@ public class DagManagerMetrics {
private final Map<String, ContextAwareMeter> executorStartSlaExceededMeters = Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorSlaExceededMeters = Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorJobSentMeters = Maps.newConcurrentMap();
+
+ // Metrics for unexpected flow handling failures
+ private ContextAwareCounter failedLaunchEventsOnActivationCount;
MetricContext metricContext;
public DagManagerMetrics(MetricContext metricContext) {
@@ -100,6 +103,9 @@ public class DagManagerMetrics {
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
+ failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter(
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT));
}
}
@@ -199,6 +205,13 @@ public class DagManagerMetrics {
}
}
+ // Increment the count for num of failed launches during leader activation
+ public void incrementFailedLaunchCount() {
+ if (this.metricContext != null) {
+ this.failedLaunchEventsOnActivationCount.inc();
+ }
+ }
+
private List<ContextAwareCounter> getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
Config configs = dagNode.getValue().getJobSpec().getConfig();
String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 6d9dcc9d6..8abaa209c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -117,13 +117,15 @@ public class FlowTriggerHandler {
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(),
- leaseObtainedStatus.getEventTimestamp());
+ leaseObtainedStatus.getEventTimeMillis());
return;
}
// If persisting the flow action failed, then we set another trigger for this event to occur immediately to
// re-attempt handling the event
+ DagActionStore.DagAction updatedFlowAction = DagActionStore.DagAction.updateFlowExecutionId(flowAction,
+ leaseObtainedStatus.getEventTimeMillis());
scheduleReminderForEvent(jobProps,
- new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, leaseObtainedStatus.getEventTimestamp(), 0L),
+ new MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L),
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7afd8bba4..bcb174320 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
@@ -93,6 +94,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private Optional<Meter> flowOrchestrationFailedMeter;
@Getter
private Optional<Timer> flowOrchestrationTimer;
+ private Optional<Counter> flowFailedForwardToDagManagerCounter;
@Setter
private FlowStatusGenerator flowStatusGenerator;
@@ -137,12 +139,14 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
+ this.flowFailedForwardToDagManagerCounter = Optional.of(this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT));
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
} else {
this.metricContext = null;
this.flowOrchestrationSuccessFulMeter = Optional.absent();
this.flowOrchestrationFailedMeter = Optional.absent();
this.flowOrchestrationTimer = Optional.absent();
+ this.flowFailedForwardToDagManagerCounter = Optional.absent();
this.eventSubmitter = Optional.absent();
}
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
@@ -337,6 +341,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
} else {
+ _log.warn("Flow: {} submitted to dagManager failed to compile and produce a job execution plan dag", flowSpec);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
}
}
@@ -347,9 +352,13 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
//Send the dag to the DagManager.
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
} catch (Exception ex) {
+ String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
+ _log.warn("Orchestrator call - " + failureMessage, ex);
+ if (this.flowFailedForwardToDagManagerCounter.isPresent()) {
+ this.flowFailedForwardToDagManagerCounter.get().inc();
+ }
if (this.eventSubmitter.isPresent()) {
// pronounce failed before stack unwinds, to ensure flow not marooned in `COMPILED` state; (failure likely attributable to DB connection/failover)
- String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 870b68f53..1435e076a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -58,8 +58,10 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
private ContextAwareMeter killsInvoked;
private ContextAwareMeter resumesInvoked;
private ContextAwareMeter flowsLaunched;
+ private ContextAwareMeter failedFlowLaunchSubmissions;
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
+ private ContextAwareMeter messageFilteredOutMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge
private volatile Long produceToConsumeDelayValue = -1L;
@@ -130,19 +132,23 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
String changeIdentifier = tid + key;
if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, dagActionsSeenCache, operation,
produceTimestamp.toString())) {
+ this.messageFilteredOutMeter.mark();
return;
}
+ // Used to easily log information to identify the dag action
+ DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
+ dagActionType);
+
// We only expect INSERT and DELETE operations done to this table. INSERTs correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be processed. DELETEs require no action.
try {
if (operation.equals("INSERT")) {
+ log.info("DagAction change ({}) received for flow: {}", dagActionType, dagAction);
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
- log.info("Received insert dag action and about to send resume flow request");
dagManager.handleResumeFlowRequest(flowGroup, flowName,Long.parseLong(flowExecutionId));
this.resumesInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
- log.info("Received insert dag action and about to send kill flow request");
dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId));
this.killsInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.LAUNCH)) {
@@ -150,10 +156,8 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
if (!this.isMultiActiveSchedulerEnabled) {
this.unexpectedErrors.mark();
throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler "
- + "mode for flowAction: %s",
- new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, dagActionType)));
+ + "mode for flowAction: %s", dagAction));
}
- log.info("Received insert dag action and about to forward launch request to DagManager");
submitFlowToDagManagerHelper(flowGroup, flowName);
} else {
log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagActionType);
@@ -191,19 +195,19 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
this.orchestrator.submitFlowToDagManager(spec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
- this.unexpectedErrors.mark();
+ this.failedFlowLaunchSubmissions.mark();
return;
} catch (SpecNotFoundException e) {
log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage());
- this.unexpectedErrors.mark();
+ this.failedFlowLaunchSubmissions.mark();
return;
} catch (IOException e) {
log.warn("Failed to add Job Execution Plan for flowId {} due to exception {}", flowId, e.getMessage());
- this.unexpectedErrors.mark();
+ this.failedFlowLaunchSubmissions.mark();
return;
} catch (InterruptedException e) {
log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e);
- this.unexpectedErrors.mark();
+ this.failedFlowLaunchSubmissions.mark();
return;
}
// Only mark this if the dag was successfully added
@@ -216,8 +220,10 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
+ this.failedFlowLaunchSubmissions = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS);
this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
+ this.messageFilteredOutMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
index d580704d2..672892673 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
@@ -34,9 +34,9 @@ public class FlowTriggerHandlerTest {
String cronExpressionSuffix = truncateFirstTwoFieldsOfCronExpression(cronExpression);
int schedulerBackOffMillis = 10;
DagActionStore.DagAction flowAction = new DagActionStore.DagAction("flowName", "flowGroup",
- "999999", DagActionStore.FlowActionType.LAUNCH);
+ String.valueOf(eventToRevisit), DagActionStore.FlowActionType.LAUNCH);
MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus =
- new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, eventToRevisit, minimumLingerDurationMillis);
+ new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, minimumLingerDurationMillis);
/**
* Remove first two fields from cron expression representing seconds and minutes to return truncated cron expression