You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:31 UTC
[13/50] incubator-gobblin git commit: [GOBBLIN-400] Allow skipping
execution of MR job in MR tasks.
[GOBBLIN-400] Allow skipping execution of MR job in MR tasks.
Closes #2274 from ibuenros/mr-job-ski
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fd3a547e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fd3a547e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fd3a547e
Branch: refs/heads/0.12.0
Commit: fd3a547ec2ea5deac87febc8b95268ed63d78240
Parents: af68d7e
Author: ibuenros <is...@gmail.com>
Authored: Thu Feb 1 13:06:16 2018 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Thu Feb 1 13:06:16 2018 -0800
----------------------------------------------------------------------
.../gobblin/runtime/mapreduce/MRTask.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fd3a547e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
index 3abee65..a2e56d7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
@@ -49,6 +49,7 @@ public class MRTask extends BaseAbstractTask {
public static final String MR_JOB_STARTED_EVENT = "MRJobStarted";
public static final String MR_JOB_SUCCESSFUL = "MRJobSuccessful";
public static final String MR_JOB_FAILED = "MRJobFailed";
+ public static final String MR_JOB_SKIPPED = "MRJobSkipped";
public static final String JOB_URL = "jobTrackingUrl";
public static final String FAILURE_CONTEXT = "failureContext";
@@ -93,6 +94,14 @@ public class MRTask extends BaseAbstractTask {
try {
Job job = createJob();
+ if (job == null) {
+ log.info("No MR job created. Skipping.");
+ this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
+ this.eventSubmitter.submit(Events.MR_JOB_SKIPPED);
+ onSkippedMRJob();
+ return;
+ }
+
job.submit();
this.eventSubmitter.submit(Events.MR_JOB_STARTED_EVENT, Events.JOB_URL, job.getTrackingURL());
job.waitForCompletion(false);
@@ -116,6 +125,10 @@ public class MRTask extends BaseAbstractTask {
return Maps.newHashMap();
}
+ /**
+ * Create the {@link Job} to run in this task.
+ * @return the {@link Job} to run. If this method returns null, no job will be run and the task will be marked as successful.
+ */
protected Job createJob() throws IOException {
Job job = Job.getInstance(new Configuration());
for (Map.Entry<Object, Object> entry : this.taskContext.getTaskState().getProperties().entrySet()) {
@@ -127,4 +140,11 @@ public class MRTask extends BaseAbstractTask {
return job;
}
+ /**
+ * Called when a job is skipped (because {@link #createJob()} returned null).
+ */
+ protected void onSkippedMRJob() {
+ // do nothing
+ }
+
}