You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/03/04 01:29:14 UTC

hadoop git commit: MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public API in DistCp. Contributed by Jing Zhao.

Repository: hadoop
Updated Branches:
  refs/heads/trunk b2f1ec312 -> 5af693fde


MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public API in DistCp. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5af693fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5af693fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5af693fd

Branch: refs/heads/trunk
Commit: 5af693fde26755b6f175bd65f93cf4a80de0d1e0
Parents: b2f1ec3
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Tue Mar 3 16:28:22 2015 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Mar 3 16:28:41 2015 -0800

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../java/org/apache/hadoop/tools/DistCp.java    | 47 +++++++++++++++-----
 2 files changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5af693fd/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 7a2eff3..b2ae9d9 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -320,6 +320,9 @@ Release 2.7.0 - UNRELEASED
     MAPREDUCE-5612. Add javadoc for TaskCompletionEvent.Status.
     (Chris Palmer via aajisaka)
 
+    MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public
+    API in DistCp. (Jing Zhao via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5af693fd/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
index 28535a7..b80aeb8 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.tools;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,12 +53,14 @@ import com.google.common.annotations.VisibleForTesting;
  * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
  * behaviour.
  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public class DistCp extends Configured implements Tool {
 
   /**
-   * Priority of the ResourceManager shutdown hook.
+   * Priority of the shutdown hook.
    */
-  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+  static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
   private static final Log LOG = LogFactory.getLog(DistCp.class);
 
@@ -66,7 +70,7 @@ public class DistCp extends Configured implements Tool {
   private static final String PREFIX = "_distcp";
   private static final String WIP_PREFIX = "._WIP_";
   private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
-  public static final Random rand = new Random();
+  static final Random rand = new Random();
 
   private boolean submitted;
   private FileSystem jobFS;
@@ -90,7 +94,7 @@ public class DistCp extends Configured implements Tool {
    * To be used with the ToolRunner. Not for public consumption.
    */
   @VisibleForTesting
-  public DistCp() {}
+  DistCp() {}
 
   /**
    * Implementation of Tool::run(). Orchestrates the copy of source file(s)
@@ -100,6 +104,7 @@ public class DistCp extends Configured implements Tool {
    * @param argv List of arguments passed to DistCp, from the ToolRunner.
    * @return On success, it returns 0. Else, -1.
    */
+  @Override
   public int run(String[] argv) {
     if (argv.length < 1) {
       OptionsParser.usage();
@@ -145,9 +150,21 @@ public class DistCp extends Configured implements Tool {
    * @throws Exception
    */
   public Job execute() throws Exception {
+    Job job = createAndSubmitJob();
+
+    if (inputOptions.shouldBlock()) {
+      waitForJobCompletion(job);
+    }
+    return job;
+  }
+
+  /**
+   * Create and submit the mapreduce job.
+   * @return The mapreduce job object that has been submitted
+   */
+  public Job createAndSubmitJob() throws Exception {
     assert inputOptions != null;
     assert getConf() != null;
-
     Job job = null;
     try {
       synchronized(this) {
@@ -169,16 +186,24 @@ public class DistCp extends Configured implements Tool {
 
     String jobID = job.getJobID().toString();
     job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
-    
     LOG.info("DistCp job-id: " + jobID);
-    if (inputOptions.shouldBlock() && !job.waitForCompletion(true)) {
-      throw new IOException("DistCp failure: Job " + jobID + " has failed: "
-          + job.getStatus().getFailureInfo());
-    }
+
     return job;
   }
 
   /**
+   * Wait for the given job to complete.
+   * @param job the given mapreduce job that has already been submitted
+   */
+  public void waitForJobCompletion(Job job) throws Exception {
+    assert job != null;
+    if (!job.waitForCompletion(true)) {
+      throw new IOException("DistCp failure: Job " + job.getJobID()
+          + " has failed: " + job.getStatus().getFailureInfo());
+    }
+  }
+
+  /**
    * Set targetPathExists in both inputOptions and job config,
    * for the benefit of CopyCommitter
    */
@@ -436,7 +461,7 @@ public class DistCp extends Configured implements Tool {
   private static class Cleanup implements Runnable {
     private final DistCp distCp;
 
-    public Cleanup(DistCp distCp) {
+    Cleanup(DistCp distCp) {
       this.distCp = distCp;
     }