You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yj...@apache.org on 2015/03/06 01:26:30 UTC

hadoop git commit: HDFS-7312. Update DistCp v1 to optionally not use tmp location. (Joseph Prosser via yzhangal)

Repository: hadoop
Updated Branches:
  refs/heads/branch-1 eb5191195 -> 22543d34b


HDFS-7312. Update DistCp v1 to optionally not use tmp location. (Joseph Prosser via yzhangal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22543d34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22543d34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22543d34

Branch: refs/heads/branch-1
Commit: 22543d34bc5ca83c242fae075d42d705a4c0c34d
Parents: eb51911
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Tue Mar 3 20:08:17 2015 -0800
Committer: Yongjun Zhang <yz...@cloudera.com>
Committed: Thu Mar 5 15:53:22 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/hadoop/fs/TestCopyFiles.java     | 414 ++++++++++++-------
 src/tools/org/apache/hadoop/tools/DistCp.java   |  60 ++-
 3 files changed, 316 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/22543d34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3903f0..786ab47 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -265,6 +265,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-6198. NPE from JobTracker#resolveAndAddToTopology in MR1 cause
     initJob and heartbeat failure. (zxu via rkanter)
 
+    HDFS-7312. Update DistCp v1 to optionally not use tmp location.
+    (Joseph Prosser via yzhangal)
+
 Release 1.2.2 - unreleased
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22543d34/src/test/org/apache/hadoop/fs/TestCopyFiles.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/hadoop/fs/TestCopyFiles.java b/src/test/org/apache/hadoop/fs/TestCopyFiles.java
index 9ce0769..0fe63d4 100644
--- a/src/test/org/apache/hadoop/fs/TestCopyFiles.java
+++ b/src/test/org/apache/hadoop/fs/TestCopyFiles.java
@@ -308,8 +308,26 @@ public class TestCopyFiles extends TestCase {
     deldir(localfs, TEST_ROOT_DIR+"/srcdat");
   }
 
-  /** copy files from dfs file system to dfs file system */
-  public void testCopyFromDfsToDfs() throws Exception {
+  private static void addToArgList(List<String> argList, final String... args) {
+	for (String arg : args) {
+	  argList.add(arg);
+	}
+  }
+  
+  private void addSrcDstToArgList(List<String> argList, final boolean skipTmp,
+      final String dst, final String... srcs) {
+    if (skipTmp) {
+      argList.add("-skiptmp");
+    }
+    addToArgList(argList, srcs);
+    argList.add(dst);
+  }
+
+  /**
+   * copy files from dfs file system to dfs file system Pass option to use
+   * -skiptmp flag
+   */
+  private void testCopyFromDfsToDfs(boolean skipTmp) throws Exception {
     String namenode = null;
     MiniDFSCluster cluster = null;
     try {
@@ -319,24 +337,37 @@ public class TestCopyFiles extends TestCase {
       namenode = FileSystem.getDefaultUri(conf).toString();
       if (namenode.startsWith("hdfs://")) {
         MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
-        ToolRunner.run(new DistCp(conf), new String[] {
-                                         "-log",
-                                         namenode+"/logs",
-                                         namenode+"/srcdat",
-                                         namenode+"/destdat"});
+        List<String> argList = new ArrayList<String>();
+        addToArgList(argList, "-log", namenode + "/logs");
+        addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+            + "/srcdat");
+        ToolRunner.run(new DistCp(conf),
+            argList.toArray(new String[argList.size()]));
         assertTrue("Source and destination directories do not match.",
-                   checkFiles(hdfs, "/destdat", files));
-        FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf);
+            checkFiles(hdfs, "/destdat", files));
+        FileSystem fs = FileSystem.get(URI.create(namenode + "/logs"), conf);
         assertTrue("Log directory does not exist.",
-                   fs.exists(new Path(namenode+"/logs")));
+            fs.exists(new Path(namenode + "/logs")));
         deldir(hdfs, "/destdat");
         deldir(hdfs, "/srcdat");
         deldir(hdfs, "/logs");
       }
     } finally {
-      if (cluster != null) { cluster.shutdown(); }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
+
+  /** copy files from dfs file system to dfs file system */
+  public void testCopyFromDfsToDfs() throws Exception {
+    testCopyFromDfsToDfs(false);
+  }
+
+  /** copy files from dfs file system to dfs file system with skiptmp */
+  public void testCopyFromDfsToDfsWithSkiptmp() throws Exception {
+    testCopyFromDfsToDfs(true);
+  }
   
   /** copy files from local file system to dfs file system */
   public void testCopyFromLocalToDfs() throws Exception {
@@ -362,7 +393,9 @@ public class TestCopyFiles extends TestCase {
         deldir(FileSystem.get(LOCAL_FS_URI, conf), TEST_ROOT_DIR+"/srcdat");
       }
     } finally {
-      if (cluster != null) { cluster.shutdown(); }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
@@ -395,7 +428,8 @@ public class TestCopyFiles extends TestCase {
     }
   }
 
-  public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
+  private void testCopyDfsToDfsUpdateOverwrite(boolean skipTmp)
+      throws Exception {
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
@@ -404,75 +438,86 @@ public class TestCopyFiles extends TestCase {
       final String namenode = hdfs.getUri().toString();
       if (namenode.startsWith("hdfs://")) {
         MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
-        ToolRunner.run(new DistCp(conf), new String[] {
-                                         "-p",
-                                         "-log",
-                                         namenode+"/logs",
-                                         namenode+"/srcdat",
-                                         namenode+"/destdat"});
+        // run without update
+        List<String> argList = new ArrayList<String>();
+        addToArgList(argList, "-p", "-log", namenode + "/logs");
+        addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+            + "/srcdat");
+        ToolRunner.run(new DistCp(conf),
+            argList.toArray(new String[argList.size()]));
         assertTrue("Source and destination directories do not match.",
-                   checkFiles(hdfs, "/destdat", files));
-        FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf);
+            checkFiles(hdfs, "/destdat", files));
+        FileSystem fs = FileSystem.get(URI.create(namenode + "/logs"), conf);
         assertTrue("Log directory does not exist.",
-                    fs.exists(new Path(namenode+"/logs")));
+            fs.exists(new Path(namenode + "/logs")));
 
         FileStatus[] dchkpoint = getFileStatus(hdfs, "/destdat", files);
-        final int nupdate = NFILES>>2;
+        final int nupdate = NFILES >> 2;
         updateFiles(cluster.getFileSystem(), "/srcdat", files, nupdate);
         deldir(hdfs, "/logs");
-
-        ToolRunner.run(new DistCp(conf), new String[] {
-                                         "-p",
-                                         "-update",
-                                         "-log",
-                                         namenode+"/logs",
-                                         namenode+"/srcdat",
-                                         namenode+"/destdat"});
+        argList.clear();
+        // run with update
+        addToArgList(argList, "-p", "-update", "-log", namenode + "/logs");
+        addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+            + "/srcdat");
+        ToolRunner.run(new DistCp(conf),
+            argList.toArray(new String[argList.size()]));
         assertTrue("Source and destination directories do not match.",
-                   checkFiles(hdfs, "/destdat", files));
+            checkFiles(hdfs, "/destdat", files));
         assertTrue("Update failed to replicate all changes in src",
-                 checkUpdate(hdfs, dchkpoint, "/destdat", files, nupdate));
+            checkUpdate(hdfs, dchkpoint, "/destdat", files, nupdate));
 
         deldir(hdfs, "/logs");
-        ToolRunner.run(new DistCp(conf), new String[] {
-                                         "-p",
-                                         "-overwrite",
-                                         "-log",
-                                         namenode+"/logs",
-                                         namenode+"/srcdat",
-                                         namenode+"/destdat"});
+        argList.clear();
+        // run with overwrite
+        addToArgList(argList, "-p", "-overwrite", "-log", namenode + "/logs");
+        addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+            + "/srcdat");
+        ToolRunner.run(new DistCp(conf),
+            argList.toArray(new String[argList.size()]));
         assertTrue("Source and destination directories do not match.",
-                   checkFiles(hdfs, "/destdat", files));
+            checkFiles(hdfs, "/destdat", files));
         assertTrue("-overwrite didn't.",
-                 checkUpdate(hdfs, dchkpoint, "/destdat", files, NFILES));
+            checkUpdate(hdfs, dchkpoint, "/destdat", files, NFILES));
 
         deldir(hdfs, "/destdat");
         deldir(hdfs, "/srcdat");
         deldir(hdfs, "/logs");
       }
     } finally {
-      if (cluster != null) { cluster.shutdown(); }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
-  public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception {
+  public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
+    testCopyDfsToDfsUpdateOverwrite(false);
+  }
+
+  public void testCopyDfsToDfsUpdateOverwriteSkiptmp() throws Exception {
+    testCopyDfsToDfsUpdateOverwrite(true);
+  }
+
+  private void testCopyDfsToDfsUpdateWithSkipCRC(boolean skipTmp)
+      throws Exception {
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
       cluster = new MiniDFSCluster(conf, 2, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
       final String namenode = hdfs.getUri().toString();
-      
+
       FileSystem fs = FileSystem.get(URI.create(namenode), new Configuration());
       // Create two files of the same name, same length but different
       // contents
       final String testfilename = "test";
       final String srcData = "act act act";
       final String destData = "cat cat cat";
-      
+
       if (namenode.startsWith("hdfs://")) {
-        deldir(hdfs,"/logs");
-        
+        deldir(hdfs, "/logs");
+
         Path srcPath = new Path("/srcdat", testfilename);
         Path destPath = new Path("/destdat", testfilename);
         FSDataOutputStream out = fs.create(srcPath, true);
@@ -482,17 +527,13 @@ public class TestCopyFiles extends TestCase {
         out = fs.create(destPath, true);
         out.writeUTF(destData);
         out.close();
-        
         // Run with -skipcrccheck option
-        ToolRunner.run(new DistCp(conf), new String[] {
-          "-p",
-          "-update",
-          "-skipcrccheck",
-          "-log",
-          namenode+"/logs",
-          namenode+"/srcdat",
-          namenode+"/destdat"});
-        
+        List<String> argList = new ArrayList<String>();
+        addToArgList(argList, "-p", "-update", "-skipcrccheck", "-log", namenode + "/logs");
+        addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+            + "/srcdat");
+        ToolRunner.run(new DistCp(conf),
+            argList.toArray(new String[argList.size()]));
         // File should not be overwritten
         FSDataInputStream in = hdfs.open(destPath);
         String s = in.readUTF();
@@ -500,18 +541,14 @@ public class TestCopyFiles extends TestCase {
         assertTrue("Dest got over written even with skip crc",
             s.equalsIgnoreCase(destData));
         in.close();
-        
         deldir(hdfs, "/logs");
-
-        // Run without the option        
-        ToolRunner.run(new DistCp(conf), new String[] {
-          "-p",
-          "-update",
-          "-log",
-          namenode+"/logs",
-          namenode+"/srcdat",
-          namenode+"/destdat"});
-        
+        argList.clear();
+        // Run without the option
+        addToArgList(argList, "-p", "-update", "-log", namenode + "/logs");
+        addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+            + "/srcdat");
+        ToolRunner.run(new DistCp(conf),
+            argList.toArray(new String[argList.size()]));
         // File should be overwritten
         in = hdfs.open(destPath);
         s = in.readUTF();
@@ -524,18 +561,29 @@ public class TestCopyFiles extends TestCase {
         deldir(hdfs, "/destdat");
         deldir(hdfs, "/srcdat");
         deldir(hdfs, "/logs");
-       }
+      }
     } finally {
-      if (cluster != null) { cluster.shutdown(); }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
+  public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception {
+    testCopyDfsToDfsUpdateWithSkipCRC(false);
+  }
+
+  public void testCopyDfsToDfsUpdateWithSkipCRCSkiptmp() throws Exception {
+    testCopyDfsToDfsUpdateWithSkipCRC(true);
+  }
+
   /**
    * A helper function to test copying files between local file system and dfs
-   * file system, with staging area set to local file system. 
+   * file system, with staging area set to local file system.
    */
   private void stagingAreaTest(final FileSystem srcFs, final FileSystem destFs,
-      MiniDFSCluster cluster, Configuration conf) throws Exception {
+      MiniDFSCluster cluster, Configuration conf, boolean skipTmp)
+      throws Exception {
     try {
       final String fileDir = "/files";
       final String srcParent = "/srcdat";
@@ -548,45 +596,44 @@ public class TestCopyFiles extends TestCase {
       URI srcUri = srcFs.getUri();
       URI destUri = destFs.getUri();
 
-      final boolean isSrcLocalFs = srcUri.getScheme().equals(LOCAL_FS_URI.getScheme());
+      final boolean isSrcLocalFs =
+          srcUri.getScheme().equals(LOCAL_FS_URI.getScheme());
 
       final FileSystem localFs = FileSystem.get(LOCAL_FS_URI, conf);
       String prevStagingArea =
           conf.get(JT_STAGING_AREA_ROOT, JT_STAGING_AREA_ROOT_DEFAULT);
-      String newStagingArea = (isSrcLocalFs? source : destination);
+      String newStagingArea = (isSrcLocalFs ? source : destination);
       newStagingArea += "/STAGING";
       conf.set(JT_STAGING_AREA_ROOT, TEST_ROOT_DIR + newStagingArea);
-        
-      final String srcParentPrefix = isSrcLocalFs? TEST_ROOT_DIR : "";
-      final String destParentPrefix = isSrcLocalFs? "" : TEST_ROOT_DIR;
- 
+
+      final String srcParentPrefix = isSrcLocalFs ? TEST_ROOT_DIR : "";
+      final String destParentPrefix = isSrcLocalFs ? "" : TEST_ROOT_DIR;
+
       String createDelSrcParent = srcParentPrefix + srcParent;
       String createDelDestParent = destParentPrefix + destParent;
       String createDelSrc = createDelSrcParent + fileDir;
       String createDelDest = createDelDestParent + fileDir;
-      
+
       MyFile[] srcFiles = createFiles(srcUri, createDelSrc);
       createFiles(destUri, createDelDest);
 
-      String distcpSrc = String.valueOf(srcUri) + createDelSrc;     
+      String distcpSrc = String.valueOf(srcUri) + createDelSrc;
       String distcpDest = String.valueOf(destUri) + createDelDest;
-      
-      ToolRunner.run(new DistCp(conf), new String[] {
-        "-log",
-        LOCAL_FS_STR + logDir,
-        "-update",
-        "-delete",
-        distcpSrc,
-        distcpDest});
-          
-        assertTrue("Source and destination directories do not match.",
-            checkFiles(destFs, createDelDest, srcFiles));
 
-        deldir(localFs, logDir);
-        deldir(srcFs, createDelSrcParent);
-        deldir(destFs, createDelDestParent);
+      List<String> argList = new ArrayList<String>();
+      addToArgList(argList, "-log", LOCAL_FS_STR + logDir, "-update", "-delete");
+      addSrcDstToArgList(argList, skipTmp, distcpDest, distcpSrc);
+      ToolRunner.run(new DistCp(conf),
+          argList.toArray(new String[argList.size()]));
+
+      assertTrue("Source and destination directories do not match.",
+          checkFiles(destFs, createDelDest, srcFiles));
+
+      deldir(localFs, logDir);
+      deldir(srcFs, createDelSrcParent);
+      deldir(destFs, createDelDestParent);
 
-        conf.set(JT_STAGING_AREA_ROOT, prevStagingArea); 
+      conf.set(JT_STAGING_AREA_ROOT, prevStagingArea);
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -598,37 +645,108 @@ public class TestCopyFiles extends TestCase {
    * test copying files from local file system to dfs file system with staging
    * area in src
    */
-  public void testCopyFromLocalToDfsWithStagingAreaInSrc() throws Exception {
+  private void testCopyFromLocalToDfsWithStagingAreaInSrc(boolean skipTmp)
+      throws Exception {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
 
     String namenode = FileSystem.getDefaultUri(conf).toString();
-    assertTrue("Name node doesn't start with hdfs://", namenode.startsWith("hdfs://"));
-    
+    assertTrue("Name node doesn't start with hdfs://",
+        namenode.startsWith("hdfs://"));
+
     final FileSystem srcFs = FileSystem.get(LOCAL_FS_URI, conf);
     final FileSystem destFs = cluster.getFileSystem();
-    
-    stagingAreaTest(srcFs, destFs, cluster, conf);
+
+    stagingAreaTest(srcFs, destFs, cluster, conf, skipTmp);
+  }
+
+  public void testCopyFromLocalToDfsWithStagingAreaInSrcSkiptmp()
+      throws Exception {
+    testCopyFromLocalToDfsWithStagingAreaInSrc(true);
+  }
+
+  public void testCopyFromLocalToDfsWithStagingAreaInSrc() throws Exception {
+    testCopyFromLocalToDfsWithStagingAreaInSrc(false);
   }
 
   /**
    * test copying files from dfs file system to local file system with staging
-   * area in dest
+   * area in dest and setting skiptmp flag as needed
    */
-  public void testCopyFromDfsToLocalWithStagingAreaInDest() throws Exception {
+  public void testCopyFromDfsToLocalWithStagingAreaInDest(boolean skipTmp)
+      throws Exception {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
-    
+
     String namenode = FileSystem.getDefaultUri(conf).toString();
-    assertTrue("Name node doesn't start with hdfs://", namenode.startsWith("hdfs://"));
-    
+    assertTrue("Name node doesn't start with hdfs://",
+        namenode.startsWith("hdfs://"));
+
     final FileSystem srcFs = cluster.getFileSystem();
     final FileSystem destFs = FileSystem.get(LOCAL_FS_URI, conf);
-    
-    stagingAreaTest(srcFs, destFs, cluster, conf);
+
+    stagingAreaTest(srcFs, destFs, cluster, conf, skipTmp);
+  }
+  
+  
+  /**
+   * test copying files from dfs file system to local file system with staging
+   * area in dest and skiptmp set
+   */
+  public void testCopyFromDfsToLocalWithStagingAreaInDestSkiptmp()
+    throws Exception {
+	testCopyFromDfsToLocalWithStagingAreaInDest(true);
+  }
+
+  /**
+   * test copying files from dfs file system to local file system with staging
+   * area in dest
+   */
+  public void testCopyFromDfsToLocalWithStagingAreaInDest() throws Exception {
+	testCopyFromDfsToLocalWithStagingAreaInDest(false);
+  }
+
+  /**
+   * test copying files from dfs file system to local file system with staging
+   * area in dest. Optionally set skiptmp flag
+   */
+  private void testCopyDuplication(boolean skipTmp) throws Exception {
+    final FileSystem localfs =
+        FileSystem.get(LOCAL_FS_URI, new Configuration());
+    try {
+      MyFile[] files = createFiles(localfs, TEST_ROOT_DIR + "/srcdat");
+      List<String> argList = new ArrayList<String>();
+      addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+          + "/src2/srcdat", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat");
+      ToolRunner.run(new DistCp(new Configuration()),
+          argList.toArray(new String[argList.size()]));
+      assertTrue("Source and destination directories do not match.",
+          checkFiles(localfs, TEST_ROOT_DIR + "/src2/srcdat", files));
+      argList.clear();
+      
+      addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+          + "/destdat", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat",
+          LOCAL_FS_STR + TEST_ROOT_DIR + "/src2/srcdat");
+      assertEquals(
+          DistCp.DuplicationException.ERROR_CODE,
+          ToolRunner.run(new DistCp(new Configuration()),
+              argList.toArray(new String[argList.size()])));
+    } finally {
+      deldir(localfs, TEST_ROOT_DIR + "/destdat");
+      deldir(localfs, TEST_ROOT_DIR + "/srcdat");
+      deldir(localfs, TEST_ROOT_DIR + "/src2");
+    }
   }
 
   public void testCopyDuplication() throws Exception {
+    testCopyDuplication(false);
+  }
+
+  public void testCopyDuplicationSkiptmp() throws Exception {
+    testCopyDuplication(true);
+  }
+
+  public void oldtestCopyDuplication() throws Exception {
     final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, new Configuration());
     try {    
       MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat");
@@ -651,54 +769,70 @@ public class TestCopyFiles extends TestCase {
     }
   }
 
-  public void testCopySingleFile() throws Exception {
+  private void testCopySingleFile(boolean skipTmp) throws Exception {
     FileSystem fs = FileSystem.get(LOCAL_FS_URI, new Configuration());
-    Path root = new Path(TEST_ROOT_DIR+"/srcdat");
-    try {    
-      MyFile[] files = {createFile(root, fs)};
-      //copy a dir with a single file
+    Path root = new Path(TEST_ROOT_DIR + "/srcdat");
+    try {
+      MyFile[] files = { createFile(root, fs) };
+      List<String> argList = new ArrayList<String>();
+      // copy a dir with a single file
+      addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+          + "/destdat", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat");
       ToolRunner.run(new DistCp(new Configuration()),
-          new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
-                        LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"});
+          argList.toArray(new String[argList.size()]));
       assertTrue("Source and destination directories do not match.",
-                 checkFiles(fs, TEST_ROOT_DIR+"/destdat", files));
-      
-      //copy a single file
+          checkFiles(fs, TEST_ROOT_DIR + "/destdat", files));
+      argList.clear();
+      // copy a single file
       String fname = files[0].getName();
       Path p = new Path(root, fname);
       FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p));
+      addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+          + "/dest2/" + fname, LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat/"
+          + fname);
       ToolRunner.run(new DistCp(new Configuration()),
-          new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+fname,
-                        LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"+fname});
+          argList.toArray(new String[argList.size()]));
       assertTrue("Source and destination directories do not match.",
-          checkFiles(fs, TEST_ROOT_DIR+"/dest2", files));     
-      //copy single file to existing dir
-      deldir(fs, TEST_ROOT_DIR+"/dest2");
-      fs.mkdirs(new Path(TEST_ROOT_DIR+"/dest2"));
-      MyFile[] files2 = {createFile(root, fs, 0)};
+          checkFiles(fs, TEST_ROOT_DIR + "/dest2", files));
+      argList.clear();
+      // copy single file to existing dir
+      deldir(fs, TEST_ROOT_DIR + "/dest2");
+      fs.mkdirs(new Path(TEST_ROOT_DIR + "/dest2"));
+      MyFile[] files2 = { createFile(root, fs, 0) };
       String sname = files2[0].getName();
+      addToArgList(argList, "-update");
+      addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+          + "/dest2/", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat/" + sname);
       ToolRunner.run(new DistCp(new Configuration()),
-          new String[] {"-update",
-                        LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname,
-                        LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"});
+          argList.toArray(new String[argList.size()]));
       assertTrue("Source and destination directories do not match.",
-          checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));     
-      updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1);
-      //copy single file to existing dir w/ dst name conflict
+          checkFiles(fs, TEST_ROOT_DIR + "/dest2", files2));
+      updateFiles(fs, TEST_ROOT_DIR + "/srcdat", files2, 1);
+      argList.clear();
+      // copy single file to existing dir w/ dst name conflict
+      addToArgList(argList, "-update");
+      addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+          + "/dest2/", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat/" + sname);
       ToolRunner.run(new DistCp(new Configuration()),
-          new String[] {"-update",
-                        LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname,
-                        LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"});
+          argList.toArray(new String[argList.size()]));
       assertTrue("Source and destination directories do not match.",
-          checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));     
-    }
-    finally {
-      deldir(fs, TEST_ROOT_DIR+"/destdat");
-      deldir(fs, TEST_ROOT_DIR+"/dest2");
-      deldir(fs, TEST_ROOT_DIR+"/srcdat");
+          checkFiles(fs, TEST_ROOT_DIR + "/dest2", files2));
+    } finally {
+      deldir(fs, TEST_ROOT_DIR + "/destdat");
+      deldir(fs, TEST_ROOT_DIR + "/dest2");
+      deldir(fs, TEST_ROOT_DIR + "/srcdat");
     }
   }
 
+  public void testCopySingleFile() throws Exception {
+    testCopySingleFile(false);
+  }
+
+  public void testCopySingleFileWithSkiptmp() throws Exception {
+    testCopySingleFile(true);
+  }
+  
+  
   public void testPreserveOption() throws Exception {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22543d34/src/tools/org/apache/hadoop/tools/DistCp.java
----------------------------------------------------------------------
diff --git a/src/tools/org/apache/hadoop/tools/DistCp.java b/src/tools/org/apache/hadoop/tools/DistCp.java
index 09122cf..93ce18c 100644
--- a/src/tools/org/apache/hadoop/tools/DistCp.java
+++ b/src/tools/org/apache/hadoop/tools/DistCp.java
@@ -94,6 +94,8 @@ public class DistCp implements Tool {
     "\n                       -p alone is equivalent to -prbugp" +
     "\n-i                     Ignore failures" +
     "\n-log <logdir>          Write logs to <logdir>" +
+    "\n-skiptmp               Do not copy files to tmp directory and rename." +
+    "\n                       Instead, copy files directly to the final destination." +
     "\n-m <num_maps>          Maximum number of simultaneous copies" +
     "\n-overwrite             Overwrite destination" +
     "\n-update                Overwrite if src size different from dst size" +
@@ -118,7 +120,14 @@ public class DistCp implements Tool {
     "\n     specified with symbolic representation.  For examples," +
     "\n       1230k = 1230 * 1024 = 1259520" +
     "\n       891g = 891 * 1024^3 = 956703965184" +
-    
+
+    "\n\nNOTE 3: By default, distcp copies files to temporary area first, " + 
+    "\n     then renames to the final destination. Using -skiptmp " +
+    "\n     switch means that distcp copies files directly to the " +
+    "\n     destination. Recommend to use it only when you really need to " +
+    "\n     (such as to avoid copy/rename overhead in s3, where rename is " +
+    "\n     not natively supported), because it may cause damage to " +
+    "\n     existing destination file if distcp fails for some reason. " +
     "\n";
   
   private static final long BYTES_PER_MAP =  256 * 1024 * 1024;
@@ -134,7 +143,8 @@ public class DistCp implements Tool {
     PRESERVE_STATUS("-p", NAME + ".preserve.status"),
     OVERWRITE("-overwrite", NAME + ".overwrite.always"),
     UPDATE("-update", NAME + ".overwrite.ifnewer"),
-    SKIPCRC("-skipcrccheck", NAME + ".skip.crc.check");
+    SKIPCRC("-skipcrccheck", NAME + ".skip.crc.check"),
+    SKIPTMP("-skiptmp", NAME + ".skip.tmp");
 
     final String cmd, propertyname;
 
@@ -327,6 +337,7 @@ public class DistCp implements Tool {
     private byte[] buffer = null;
     private JobConf job;
     private boolean skipCRCCheck = false;
+    private boolean skipTmp = false;
 
     // stats
     private int failcount = 0;
@@ -382,10 +393,17 @@ public class DistCp implements Tool {
     private void copy(FileStatus srcstat, Path relativedst,
         OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)
         throws IOException {
-      Path absdst = new Path(destPath, relativedst);
       int totfiles = job.getInt(SRC_COUNT_LABEL, -1);
       assert totfiles >= 0 : "Invalid file count " + totfiles;
 
+      // if we are copying a single file and the dest doesn't exist, we
+      // treat it as a copy/rename. The relativedst becomes the new
+      // filename and the destPath becomes its parent directory.
+      if (totfiles == 1 && !destFileSys.exists(destPath)) {
+        relativedst = new Path(destPath.getName());
+        destPath = destPath.getParent();
+      }
+      Path  absdst = new Path(destPath, relativedst);
       // if a directory, ensure created even if empty
       if (srcstat.isDir()) {
         if (destFileSys.exists(absdst)) {
@@ -444,23 +462,16 @@ public class DistCp implements Tool {
             + " from " + srcstat.getPath());        
       }
       else {
-        if (totfiles == 1) {
-          // Copying a single file; use dst path provided by user as destination
-          // rather than destination directory, if a file
-          Path dstparent = absdst.getParent();
-          if (!(destFileSys.exists(dstparent) &&
-                destFileSys.getFileStatus(dstparent).isDir())) {
-            absdst = dstparent;
-          }
-        }
         if (destFileSys.exists(absdst) &&
             destFileSys.getFileStatus(absdst).isDir()) {
           throw new IOException(absdst + " is a directory");
         }
         if (!destFileSys.mkdirs(absdst.getParent())) {
-          throw new IOException("Failed to craete parent dir: " + absdst.getParent());
+          throw new IOException("Failed to create parent dir: " + absdst.getParent());
+        }
+        if (!skipTmp){
+            rename(tmpfile, absdst);
         }
-        rename(tmpfile, absdst);
 
         FileStatus dststat = destFileSys.getFileStatus(absdst);
         if (dststat.getLen() != srcstat.getLen()) {
@@ -530,6 +541,7 @@ public class DistCp implements Tool {
       update = job.getBoolean(Options.UPDATE.propertyname, false);
       overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
       skipCRCCheck = job.getBoolean(Options.SKIPCRC.propertyname, false);
+      skipTmp = job.getBoolean(Options.SKIPTMP.propertyname, false);
       this.job = job;
     }
 
@@ -654,7 +666,6 @@ public class DistCp implements Tool {
     LOG.info("destPath=" + args.dst);
 
     JobConf job = createJobConf(conf);
-    
     checkSrcPath(job, args.srcs);
     if (args.preservedAttributes != null) {
       job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
@@ -671,7 +682,9 @@ public class DistCp implements Tool {
       finalize(conf, job, args.dst, args.preservedAttributes);
     } finally {
       //delete tmp
-      fullyDelete(job.get(TMP_DIR_LABEL), job);
+      if(!args.flags.contains(Options.SKIPTMP)) {
+        fullyDelete(job.get(TMP_DIR_LABEL), job);
+      }
       //delete jobDirectory
       fullyDelete(job.get(JOB_DIR_LABEL), job);
     }
@@ -945,7 +958,7 @@ public class DistCp implements Tool {
    * command line) and at most (distcp.max.map.tasks, default
    * MAX_MAPS_PER_NODE * nodes in the cluster).
    * @param totalBytes Count of total bytes for job
-   * @param job The job to configure
+   * @param job The job configuration
    * @return Count of maps to run.
    */
   private static void setMapCount(long totalBytes, JobConf job) 
@@ -962,7 +975,9 @@ public class DistCp implements Tool {
   static void fullyDelete(String dir, Configuration conf) throws IOException {
     if (dir != null) {
       Path tmp = new Path(dir);
-      tmp.getFileSystem(conf).delete(tmp, true);
+      if (tmp.getFileSystem(conf).exists(tmp)){
+        tmp.getFileSystem(conf).delete(tmp, true);
+      }
     }
   }
 
@@ -1013,9 +1028,11 @@ public class DistCp implements Tool {
     //set boolean values
     final boolean update = args.flags.contains(Options.UPDATE);
     final boolean skipCRCCheck = args.flags.contains(Options.SKIPCRC);
+    final boolean skipTmp = args.flags.contains(Options.SKIPTMP);
     final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE);
     jobConf.setBoolean(Options.UPDATE.propertyname, update);
     jobConf.setBoolean(Options.SKIPCRC.propertyname, skipCRCCheck);
+    jobConf.setBoolean(Options.SKIPTMP.propertyname, skipTmp);
     jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
     jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
         args.flags.contains(Options.IGNORE_READ_FAILURES));
@@ -1205,9 +1222,10 @@ public class DistCp implements Tool {
           jobfs, jobDirectory, jobConf, conf);
     }
 
-    Path tmpDir = new Path(
-        (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
-        args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId);
+    String tmpDirPrefix = (dstExists && !dstIsDir) || (!dstExists && srcCount == 1) ? 
+    		args.dst.getParent().toString() : args.dst.toString();
+    Path tmpDir = new Path(tmpDirPrefix + (skipTmp? "" : "/_distcp_tmp_" + randomId));
+
     jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
     LOG.info("sourcePathsCount=" + srcCount);
     LOG.info("filesToCopyCount=" + fileCount);