You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/11 23:25:45 UTC
incubator-gobblin git commit: [GOBBLIN-653] Create JobSucceededTimer
tracking event to accurately track successful Gobblin jobs.[]
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 79878f992 -> 757b25b61
[GOBBLIN-653] Create JobSucceededTimer tracking event to accurately track successful Gobblin jobs.[]
Closes #2522 from sv2000/jobSuccessTrackingEvent
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/757b25b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/757b25b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/757b25b6
Branch: refs/heads/master
Commit: 757b25b611b195865328b52c3a4eb4cfa214e2af
Parents: 79878f9
Author: suvasude <su...@linkedin.biz>
Authored: Tue Dec 11 15:25:42 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Dec 11 15:25:42 2018 -0800
----------------------------------------------------------------------
.../gobblin/metrics/event/TimingEvent.java | 1 +
.../gobblin/runtime/AbstractJobLauncher.java | 37 +++++++++++++++-----
2 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/757b25b6/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 8fd1462..0b3defa 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -40,6 +40,7 @@ public class TimingEvent {
public static final String JOB_CANCEL = "JobCancelTimer";
public static final String JOB_COMPLETE = "JobCompleteTimer";
public static final String JOB_FAILED = "JobFailedTimer";
+ public static final String JOB_SUCCEEDED = "JobSucceededTimer";
}
public static class RunJobTimings {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/757b25b6/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 7f359dc..5b008d4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -333,6 +333,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
throws JobException {
String jobId = this.jobContext.getJobId();
final JobState jobState = this.jobContext.getJobState();
+ boolean isWorkUnitsEmpty = false;
try {
MDC.put(ConfigurationKeys.JOB_NAME_KEY, this.jobContext.getJobName());
@@ -380,14 +381,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
this.eventSubmitter.submit(JobEvent.WORK_UNITS_EMPTY);
LOG.warn("No work units have been created for job " + jobId);
jobState.setState(JobState.RunningState.COMMITTED);
- notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE,
- new JobListenerAction() {
- @Override
- public void apply(JobListener jobListener, JobContext jobContext)
- throws Exception {
- jobListener.onJobCompletion(jobContext);
- }
- });
+ isWorkUnitsEmpty = true;
return;
}
@@ -485,11 +479,28 @@ public abstract class AbstractJobLauncher implements JobLauncher {
TimingEvent jobCleanupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CLEANUP);
cleanupStagingData(jobState);
jobCleanupTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_CLEANUP));
-
// Write job execution info to the job history store upon job termination
this.jobContext.storeJobExecutionInfo();
} finally {
launchJobTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.FULL_JOB_EXECUTION));
+ if (isWorkUnitsEmpty) {
+ //If no WorkUnits are created, first send the JobCompleteTimer event.
+ notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
+ @Override
+ public void apply(JobListener jobListener, JobContext jobContext)
+ throws Exception {
+ jobListener.onJobCompletion(jobContext);
+ }
+ });
+ //Next, send the JobSucceededTimer event.
+ notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
+ @Override
+ public void apply(JobListener jobListener, JobContext jobContext)
+ throws Exception {
+ jobListener.onJobFailure(jobContext);
+ }
+ });
+ }
}
}
@@ -519,6 +530,14 @@ public abstract class AbstractJobLauncher implements JobLauncher {
}
});
throw new JobException(String.format("Job %s failed", jobId));
+ } else {
+ notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
+ @Override
+ public void apply(JobListener jobListener, JobContext jobContext)
+ throws Exception {
+ jobListener.onJobFailure(jobContext);
+ }
+ });
}
} finally {
// Stop metrics reporting