You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:31 UTC

[13/50] incubator-gobblin git commit: [GOBBLIN-400] Allow skipping execution of MR job in MR tasks.

[GOBBLIN-400] Allow skipping execution of MR job in MR tasks.

Closes #2274 from ibuenros/mr-job-ski


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fd3a547e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fd3a547e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fd3a547e

Branch: refs/heads/0.12.0
Commit: fd3a547ec2ea5deac87febc8b95268ed63d78240
Parents: af68d7e
Author: ibuenros <is...@gmail.com>
Authored: Thu Feb 1 13:06:16 2018 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Thu Feb 1 13:06:16 2018 -0800

----------------------------------------------------------------------
 .../gobblin/runtime/mapreduce/MRTask.java       | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fd3a547e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
index 3abee65..a2e56d7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
@@ -49,6 +49,7 @@ public class MRTask extends BaseAbstractTask {
     public static final String MR_JOB_STARTED_EVENT = "MRJobStarted";
     public static final String MR_JOB_SUCCESSFUL = "MRJobSuccessful";
     public static final String MR_JOB_FAILED = "MRJobFailed";
+    public static final String MR_JOB_SKIPPED = "MRJobSkipped";
 
     public static final String JOB_URL = "jobTrackingUrl";
     public static final String FAILURE_CONTEXT = "failureContext";
@@ -93,6 +94,14 @@ public class MRTask extends BaseAbstractTask {
     try {
       Job job = createJob();
 
+      if (job == null) {
+        log.info("No MR job created. Skipping.");
+        this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
+        this.eventSubmitter.submit(Events.MR_JOB_SKIPPED);
+        onSkippedMRJob();
+        return;
+      }
+
       job.submit();
       this.eventSubmitter.submit(Events.MR_JOB_STARTED_EVENT, Events.JOB_URL, job.getTrackingURL());
       job.waitForCompletion(false);
@@ -116,6 +125,10 @@ public class MRTask extends BaseAbstractTask {
     return Maps.newHashMap();
   }
 
+  /**
+   * Create the {@link Job} to run in this task.
+   * @return the {@link Job} to run. If this method returns null, no job will be run and the task will be marked as successful.
+   */
   protected Job createJob() throws IOException {
     Job job = Job.getInstance(new Configuration());
     for (Map.Entry<Object, Object> entry : this.taskContext.getTaskState().getProperties().entrySet()) {
@@ -127,4 +140,11 @@ public class MRTask extends BaseAbstractTask {
     return job;
   }
 
+  /**
+   * Called when a job is skipped (because {@link #createJob()} returned null).
+   */
+  protected void onSkippedMRJob() {
+    // do nothing
+  }
+
 }