You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/11 23:25:45 UTC

incubator-gobblin git commit: [GOBBLIN-653] Create JobSucceededTimer tracking event to accurately track successful Gobblin jobs.[]

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 79878f992 -> 757b25b61


[GOBBLIN-653] Create JobSucceededTimer tracking event to accurately track successful Gobblin jobs.[]

Closes #2522 from sv2000/jobSuccessTrackingEvent


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/757b25b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/757b25b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/757b25b6

Branch: refs/heads/master
Commit: 757b25b611b195865328b52c3a4eb4cfa214e2af
Parents: 79878f9
Author: suvasude <su...@linkedin.biz>
Authored: Tue Dec 11 15:25:42 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Dec 11 15:25:42 2018 -0800

----------------------------------------------------------------------
 .../gobblin/metrics/event/TimingEvent.java      |  1 +
 .../gobblin/runtime/AbstractJobLauncher.java    | 37 +++++++++++++++-----
 2 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/757b25b6/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 8fd1462..0b3defa 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -40,6 +40,7 @@ public class TimingEvent {
     public static final String JOB_CANCEL = "JobCancelTimer";
     public static final String JOB_COMPLETE = "JobCompleteTimer";
     public static final String JOB_FAILED = "JobFailedTimer";
+    public static final String JOB_SUCCEEDED = "JobSucceededTimer";
   }
 
   public static class RunJobTimings {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/757b25b6/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 7f359dc..5b008d4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -333,6 +333,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
       throws JobException {
     String jobId = this.jobContext.getJobId();
     final JobState jobState = this.jobContext.getJobState();
+    boolean isWorkUnitsEmpty = false;
 
     try {
       MDC.put(ConfigurationKeys.JOB_NAME_KEY, this.jobContext.getJobName());
@@ -380,14 +381,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
           this.eventSubmitter.submit(JobEvent.WORK_UNITS_EMPTY);
           LOG.warn("No work units have been created for job " + jobId);
           jobState.setState(JobState.RunningState.COMMITTED);
-          notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE,
-              new JobListenerAction() {
-                @Override
-                public void apply(JobListener jobListener, JobContext jobContext)
-                    throws Exception {
-                  jobListener.onJobCompletion(jobContext);
-                }
-              });
+          isWorkUnitsEmpty = true;
           return;
         }
 
@@ -485,11 +479,28 @@ public abstract class AbstractJobLauncher implements JobLauncher {
           TimingEvent jobCleanupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CLEANUP);
           cleanupStagingData(jobState);
           jobCleanupTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_CLEANUP));
-
           // Write job execution info to the job history store upon job termination
           this.jobContext.storeJobExecutionInfo();
         } finally {
           launchJobTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.FULL_JOB_EXECUTION));
+          if (isWorkUnitsEmpty) {
+            //If no WorkUnits are created, first send the JobCompleteTimer event.
+            notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
+              @Override
+              public void apply(JobListener jobListener, JobContext jobContext)
+                  throws Exception {
+                jobListener.onJobCompletion(jobContext);
+              }
+            });
+            //Next, send the JobSucceededTimer event.
+            notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
+              @Override
+              public void apply(JobListener jobListener, JobContext jobContext)
+                  throws Exception {
+                jobListener.onJobFailure(jobContext);
+              }
+            });
+          }
         }
       }
 
@@ -519,6 +530,14 @@ public abstract class AbstractJobLauncher implements JobLauncher {
           }
         });
         throw new JobException(String.format("Job %s failed", jobId));
+      } else {
+        notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
+          @Override
+          public void apply(JobListener jobListener, JobContext jobContext)
+              throws Exception {
+            jobListener.onJobFailure(jobContext);
+          }
+        });
       }
     } finally {
       // Stop metrics reporting