You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yj...@apache.org on 2015/03/06 01:26:30 UTC
hadoop git commit: HDFS-7312. Update DistCp v1 to optionally not use
tmp location. (Joseph Prosser via yzhangal)
Repository: hadoop
Updated Branches:
refs/heads/branch-1 eb5191195 -> 22543d34b
HDFS-7312. Update DistCp v1 to optionally not use tmp location. (Joseph Prosser via yzhangal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22543d34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22543d34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22543d34
Branch: refs/heads/branch-1
Commit: 22543d34bc5ca83c242fae075d42d705a4c0c34d
Parents: eb51911
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Tue Mar 3 20:08:17 2015 -0800
Committer: Yongjun Zhang <yz...@cloudera.com>
Committed: Thu Mar 5 15:53:22 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/hadoop/fs/TestCopyFiles.java | 414 ++++++++++++-------
src/tools/org/apache/hadoop/tools/DistCp.java | 60 ++-
3 files changed, 316 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22543d34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3903f0..786ab47 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -265,6 +265,9 @@ Release 1.3.0 - unreleased
MAPREDUCE-6198. NPE from JobTracker#resolveAndAddToTopology in MR1 cause
initJob and heartbeat failure. (zxu via rkanter)
+ HDFS-7312. Update DistCp v1 to optionally not use tmp location.
+ (Joseph Prosser via yzhangal)
+
Release 1.2.2 - unreleased
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22543d34/src/test/org/apache/hadoop/fs/TestCopyFiles.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/hadoop/fs/TestCopyFiles.java b/src/test/org/apache/hadoop/fs/TestCopyFiles.java
index 9ce0769..0fe63d4 100644
--- a/src/test/org/apache/hadoop/fs/TestCopyFiles.java
+++ b/src/test/org/apache/hadoop/fs/TestCopyFiles.java
@@ -308,8 +308,26 @@ public class TestCopyFiles extends TestCase {
deldir(localfs, TEST_ROOT_DIR+"/srcdat");
}
- /** copy files from dfs file system to dfs file system */
- public void testCopyFromDfsToDfs() throws Exception {
+ private static void addToArgList(List<String> argList, final String... args) {
+ for (String arg : args) {
+ argList.add(arg);
+ }
+ }
+
+ private void addSrcDstToArgList(List<String> argList, final boolean skipTmp,
+ final String dst, final String... srcs) {
+ if (skipTmp) {
+ argList.add("-skiptmp");
+ }
+ addToArgList(argList, srcs);
+ argList.add(dst);
+ }
+
+ /**
+ * copy files from dfs file system to dfs file system Pass option to use
+ * -skiptmp flag
+ */
+ private void testCopyFromDfsToDfs(boolean skipTmp) throws Exception {
String namenode = null;
MiniDFSCluster cluster = null;
try {
@@ -319,24 +337,37 @@ public class TestCopyFiles extends TestCase {
namenode = FileSystem.getDefaultUri(conf).toString();
if (namenode.startsWith("hdfs://")) {
MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
- ToolRunner.run(new DistCp(conf), new String[] {
- "-log",
- namenode+"/logs",
- namenode+"/srcdat",
- namenode+"/destdat"});
+ List<String> argList = new ArrayList<String>();
+ addToArgList(argList, "-log", namenode + "/logs");
+ addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+ + "/srcdat");
+ ToolRunner.run(new DistCp(conf),
+ argList.toArray(new String[argList.size()]));
assertTrue("Source and destination directories do not match.",
- checkFiles(hdfs, "/destdat", files));
- FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf);
+ checkFiles(hdfs, "/destdat", files));
+ FileSystem fs = FileSystem.get(URI.create(namenode + "/logs"), conf);
assertTrue("Log directory does not exist.",
- fs.exists(new Path(namenode+"/logs")));
+ fs.exists(new Path(namenode + "/logs")));
deldir(hdfs, "/destdat");
deldir(hdfs, "/srcdat");
deldir(hdfs, "/logs");
}
} finally {
- if (cluster != null) { cluster.shutdown(); }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}
+
+ /** copy files from dfs file system to dfs file system */
+ public void testCopyFromDfsToDfs() throws Exception {
+ testCopyFromDfsToDfs(false);
+ }
+
+ /** copy files from dfs file system to dfs file system with skiptmp */
+ public void testCopyFromDfsToDfsWithSkiptmp() throws Exception {
+ testCopyFromDfsToDfs(true);
+ }
/** copy files from local file system to dfs file system */
public void testCopyFromLocalToDfs() throws Exception {
@@ -362,7 +393,9 @@ public class TestCopyFiles extends TestCase {
deldir(FileSystem.get(LOCAL_FS_URI, conf), TEST_ROOT_DIR+"/srcdat");
}
} finally {
- if (cluster != null) { cluster.shutdown(); }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}
@@ -395,7 +428,8 @@ public class TestCopyFiles extends TestCase {
}
}
- public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
+ private void testCopyDfsToDfsUpdateOverwrite(boolean skipTmp)
+ throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
@@ -404,75 +438,86 @@ public class TestCopyFiles extends TestCase {
final String namenode = hdfs.getUri().toString();
if (namenode.startsWith("hdfs://")) {
MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
- ToolRunner.run(new DistCp(conf), new String[] {
- "-p",
- "-log",
- namenode+"/logs",
- namenode+"/srcdat",
- namenode+"/destdat"});
+ // run without update
+ List<String> argList = new ArrayList<String>();
+ addToArgList(argList, "-p", "-log", namenode + "/logs");
+ addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+ + "/srcdat");
+ ToolRunner.run(new DistCp(conf),
+ argList.toArray(new String[argList.size()]));
assertTrue("Source and destination directories do not match.",
- checkFiles(hdfs, "/destdat", files));
- FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf);
+ checkFiles(hdfs, "/destdat", files));
+ FileSystem fs = FileSystem.get(URI.create(namenode + "/logs"), conf);
assertTrue("Log directory does not exist.",
- fs.exists(new Path(namenode+"/logs")));
+ fs.exists(new Path(namenode + "/logs")));
FileStatus[] dchkpoint = getFileStatus(hdfs, "/destdat", files);
- final int nupdate = NFILES>>2;
+ final int nupdate = NFILES >> 2;
updateFiles(cluster.getFileSystem(), "/srcdat", files, nupdate);
deldir(hdfs, "/logs");
-
- ToolRunner.run(new DistCp(conf), new String[] {
- "-p",
- "-update",
- "-log",
- namenode+"/logs",
- namenode+"/srcdat",
- namenode+"/destdat"});
+ argList.clear();
+ // run with update
+ addToArgList(argList, "-p", "-update", "-log", namenode + "/logs");
+ addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+ + "/srcdat");
+ ToolRunner.run(new DistCp(conf),
+ argList.toArray(new String[argList.size()]));
assertTrue("Source and destination directories do not match.",
- checkFiles(hdfs, "/destdat", files));
+ checkFiles(hdfs, "/destdat", files));
assertTrue("Update failed to replicate all changes in src",
- checkUpdate(hdfs, dchkpoint, "/destdat", files, nupdate));
+ checkUpdate(hdfs, dchkpoint, "/destdat", files, nupdate));
deldir(hdfs, "/logs");
- ToolRunner.run(new DistCp(conf), new String[] {
- "-p",
- "-overwrite",
- "-log",
- namenode+"/logs",
- namenode+"/srcdat",
- namenode+"/destdat"});
+ argList.clear();
+ // run with overwrite
+ addToArgList(argList, "-p", "-overwrite", "-log", namenode + "/logs");
+ addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+ + "/srcdat");
+ ToolRunner.run(new DistCp(conf),
+ argList.toArray(new String[argList.size()]));
assertTrue("Source and destination directories do not match.",
- checkFiles(hdfs, "/destdat", files));
+ checkFiles(hdfs, "/destdat", files));
assertTrue("-overwrite didn't.",
- checkUpdate(hdfs, dchkpoint, "/destdat", files, NFILES));
+ checkUpdate(hdfs, dchkpoint, "/destdat", files, NFILES));
deldir(hdfs, "/destdat");
deldir(hdfs, "/srcdat");
deldir(hdfs, "/logs");
}
} finally {
- if (cluster != null) { cluster.shutdown(); }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}
- public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception {
+ public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
+ testCopyDfsToDfsUpdateOverwrite(false);
+ }
+
+ public void testCopyDfsToDfsUpdateOverwriteSkiptmp() throws Exception {
+ testCopyDfsToDfsUpdateOverwrite(true);
+ }
+
+ private void testCopyDfsToDfsUpdateWithSkipCRC(boolean skipTmp)
+ throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster(conf, 2, true, null);
final FileSystem hdfs = cluster.getFileSystem();
final String namenode = hdfs.getUri().toString();
-
+
FileSystem fs = FileSystem.get(URI.create(namenode), new Configuration());
// Create two files of the same name, same length but different
// contents
final String testfilename = "test";
final String srcData = "act act act";
final String destData = "cat cat cat";
-
+
if (namenode.startsWith("hdfs://")) {
- deldir(hdfs,"/logs");
-
+ deldir(hdfs, "/logs");
+
Path srcPath = new Path("/srcdat", testfilename);
Path destPath = new Path("/destdat", testfilename);
FSDataOutputStream out = fs.create(srcPath, true);
@@ -482,17 +527,13 @@ public class TestCopyFiles extends TestCase {
out = fs.create(destPath, true);
out.writeUTF(destData);
out.close();
-
// Run with -skipcrccheck option
- ToolRunner.run(new DistCp(conf), new String[] {
- "-p",
- "-update",
- "-skipcrccheck",
- "-log",
- namenode+"/logs",
- namenode+"/srcdat",
- namenode+"/destdat"});
-
+ List<String> argList = new ArrayList<String>();
+ addToArgList(argList, "-p", "-update", "-skipcrccheck", "-log", namenode + "/logs");
+ addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+ + "/srcdat");
+ ToolRunner.run(new DistCp(conf),
+ argList.toArray(new String[argList.size()]));
// File should not be overwritten
FSDataInputStream in = hdfs.open(destPath);
String s = in.readUTF();
@@ -500,18 +541,14 @@ public class TestCopyFiles extends TestCase {
assertTrue("Dest got over written even with skip crc",
s.equalsIgnoreCase(destData));
in.close();
-
deldir(hdfs, "/logs");
-
- // Run without the option
- ToolRunner.run(new DistCp(conf), new String[] {
- "-p",
- "-update",
- "-log",
- namenode+"/logs",
- namenode+"/srcdat",
- namenode+"/destdat"});
-
+ argList.clear();
+ // Run without the option
+ addToArgList(argList, "-p", "-update", "-log", namenode + "/logs");
+ addSrcDstToArgList(argList, skipTmp, namenode + "/destdat", namenode
+ + "/srcdat");
+ ToolRunner.run(new DistCp(conf),
+ argList.toArray(new String[argList.size()]));
// File should be overwritten
in = hdfs.open(destPath);
s = in.readUTF();
@@ -524,18 +561,29 @@ public class TestCopyFiles extends TestCase {
deldir(hdfs, "/destdat");
deldir(hdfs, "/srcdat");
deldir(hdfs, "/logs");
- }
+ }
} finally {
- if (cluster != null) { cluster.shutdown(); }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}
+ public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception {
+ testCopyDfsToDfsUpdateWithSkipCRC(false);
+ }
+
+ public void testCopyDfsToDfsUpdateWithSkipCRCSkiptmp() throws Exception {
+ testCopyDfsToDfsUpdateWithSkipCRC(true);
+ }
+
/**
* A helper function to test copying files between local file system and dfs
- * file system, with staging area set to local file system.
+ * file system, with staging area set to local file system.
*/
private void stagingAreaTest(final FileSystem srcFs, final FileSystem destFs,
- MiniDFSCluster cluster, Configuration conf) throws Exception {
+ MiniDFSCluster cluster, Configuration conf, boolean skipTmp)
+ throws Exception {
try {
final String fileDir = "/files";
final String srcParent = "/srcdat";
@@ -548,45 +596,44 @@ public class TestCopyFiles extends TestCase {
URI srcUri = srcFs.getUri();
URI destUri = destFs.getUri();
- final boolean isSrcLocalFs = srcUri.getScheme().equals(LOCAL_FS_URI.getScheme());
+ final boolean isSrcLocalFs =
+ srcUri.getScheme().equals(LOCAL_FS_URI.getScheme());
final FileSystem localFs = FileSystem.get(LOCAL_FS_URI, conf);
String prevStagingArea =
conf.get(JT_STAGING_AREA_ROOT, JT_STAGING_AREA_ROOT_DEFAULT);
- String newStagingArea = (isSrcLocalFs? source : destination);
+ String newStagingArea = (isSrcLocalFs ? source : destination);
newStagingArea += "/STAGING";
conf.set(JT_STAGING_AREA_ROOT, TEST_ROOT_DIR + newStagingArea);
-
- final String srcParentPrefix = isSrcLocalFs? TEST_ROOT_DIR : "";
- final String destParentPrefix = isSrcLocalFs? "" : TEST_ROOT_DIR;
-
+
+ final String srcParentPrefix = isSrcLocalFs ? TEST_ROOT_DIR : "";
+ final String destParentPrefix = isSrcLocalFs ? "" : TEST_ROOT_DIR;
+
String createDelSrcParent = srcParentPrefix + srcParent;
String createDelDestParent = destParentPrefix + destParent;
String createDelSrc = createDelSrcParent + fileDir;
String createDelDest = createDelDestParent + fileDir;
-
+
MyFile[] srcFiles = createFiles(srcUri, createDelSrc);
createFiles(destUri, createDelDest);
- String distcpSrc = String.valueOf(srcUri) + createDelSrc;
+ String distcpSrc = String.valueOf(srcUri) + createDelSrc;
String distcpDest = String.valueOf(destUri) + createDelDest;
-
- ToolRunner.run(new DistCp(conf), new String[] {
- "-log",
- LOCAL_FS_STR + logDir,
- "-update",
- "-delete",
- distcpSrc,
- distcpDest});
-
- assertTrue("Source and destination directories do not match.",
- checkFiles(destFs, createDelDest, srcFiles));
- deldir(localFs, logDir);
- deldir(srcFs, createDelSrcParent);
- deldir(destFs, createDelDestParent);
+ List<String> argList = new ArrayList<String>();
+ addToArgList(argList, "-log", LOCAL_FS_STR + logDir, "-update", "-delete");
+ addSrcDstToArgList(argList, skipTmp, distcpDest, distcpSrc);
+ ToolRunner.run(new DistCp(conf),
+ argList.toArray(new String[argList.size()]));
+
+ assertTrue("Source and destination directories do not match.",
+ checkFiles(destFs, createDelDest, srcFiles));
+
+ deldir(localFs, logDir);
+ deldir(srcFs, createDelSrcParent);
+ deldir(destFs, createDelDestParent);
- conf.set(JT_STAGING_AREA_ROOT, prevStagingArea);
+ conf.set(JT_STAGING_AREA_ROOT, prevStagingArea);
} finally {
if (cluster != null) {
cluster.shutdown();
@@ -598,37 +645,108 @@ public class TestCopyFiles extends TestCase {
* test copying files from local file system to dfs file system with staging
* area in src
*/
- public void testCopyFromLocalToDfsWithStagingAreaInSrc() throws Exception {
+ private void testCopyFromLocalToDfsWithStagingAreaInSrc(boolean skipTmp)
+ throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
String namenode = FileSystem.getDefaultUri(conf).toString();
- assertTrue("Name node doesn't start with hdfs://", namenode.startsWith("hdfs://"));
-
+ assertTrue("Name node doesn't start with hdfs://",
+ namenode.startsWith("hdfs://"));
+
final FileSystem srcFs = FileSystem.get(LOCAL_FS_URI, conf);
final FileSystem destFs = cluster.getFileSystem();
-
- stagingAreaTest(srcFs, destFs, cluster, conf);
+
+ stagingAreaTest(srcFs, destFs, cluster, conf, skipTmp);
+ }
+
+ public void testCopyFromLocalToDfsWithStagingAreaInSrcSkiptmp()
+ throws Exception {
+ testCopyFromLocalToDfsWithStagingAreaInSrc(true);
+ }
+
+ public void testCopyFromLocalToDfsWithStagingAreaInSrc() throws Exception {
+ testCopyFromLocalToDfsWithStagingAreaInSrc(false);
}
/**
* test copying files from dfs file system to local file system with staging
- * area in dest
+ * area in dest and setting skiptmp flag as needed
*/
- public void testCopyFromDfsToLocalWithStagingAreaInDest() throws Exception {
+ public void testCopyFromDfsToLocalWithStagingAreaInDest(boolean skipTmp)
+ throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
-
+
String namenode = FileSystem.getDefaultUri(conf).toString();
- assertTrue("Name node doesn't start with hdfs://", namenode.startsWith("hdfs://"));
-
+ assertTrue("Name node doesn't start with hdfs://",
+ namenode.startsWith("hdfs://"));
+
final FileSystem srcFs = cluster.getFileSystem();
final FileSystem destFs = FileSystem.get(LOCAL_FS_URI, conf);
-
- stagingAreaTest(srcFs, destFs, cluster, conf);
+
+ stagingAreaTest(srcFs, destFs, cluster, conf, skipTmp);
+ }
+
+
+ /**
+ * test copying files from dfs file system to local file system with staging
+ * area in dest and skiptmp set
+ */
+ public void testCopyFromDfsToLocalWithStagingAreaInDestSkiptmp()
+ throws Exception {
+ testCopyFromDfsToLocalWithStagingAreaInDest(true);
+ }
+
+ /**
+ * test copying files from dfs file system to local file system with staging
+ * area in dest
+ */
+ public void testCopyFromDfsToLocalWithStagingAreaInDest() throws Exception {
+ testCopyFromDfsToLocalWithStagingAreaInDest(false);
+ }
+
+ /**
+ * test copying files from dfs file system to local file system with staging
+ * area in dest. Optionally set skiptmp flag
+ */
+ private void testCopyDuplication(boolean skipTmp) throws Exception {
+ final FileSystem localfs =
+ FileSystem.get(LOCAL_FS_URI, new Configuration());
+ try {
+ MyFile[] files = createFiles(localfs, TEST_ROOT_DIR + "/srcdat");
+ List<String> argList = new ArrayList<String>();
+ addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+ + "/src2/srcdat", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat");
+ ToolRunner.run(new DistCp(new Configuration()),
+ argList.toArray(new String[argList.size()]));
+ assertTrue("Source and destination directories do not match.",
+ checkFiles(localfs, TEST_ROOT_DIR + "/src2/srcdat", files));
+ argList.clear();
+
+ addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+ + "/destdat", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat",
+ LOCAL_FS_STR + TEST_ROOT_DIR + "/src2/srcdat");
+ assertEquals(
+ DistCp.DuplicationException.ERROR_CODE,
+ ToolRunner.run(new DistCp(new Configuration()),
+ argList.toArray(new String[argList.size()])));
+ } finally {
+ deldir(localfs, TEST_ROOT_DIR + "/destdat");
+ deldir(localfs, TEST_ROOT_DIR + "/srcdat");
+ deldir(localfs, TEST_ROOT_DIR + "/src2");
+ }
}
public void testCopyDuplication() throws Exception {
+ testCopyDuplication(false);
+ }
+
+ public void testCopyDuplicationSkiptmp() throws Exception {
+ testCopyDuplication(true);
+ }
+
+ public void oldtestCopyDuplication() throws Exception {
final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, new Configuration());
try {
MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat");
@@ -651,54 +769,70 @@ public class TestCopyFiles extends TestCase {
}
}
- public void testCopySingleFile() throws Exception {
+ private void testCopySingleFile(boolean skipTmp) throws Exception {
FileSystem fs = FileSystem.get(LOCAL_FS_URI, new Configuration());
- Path root = new Path(TEST_ROOT_DIR+"/srcdat");
- try {
- MyFile[] files = {createFile(root, fs)};
- //copy a dir with a single file
+ Path root = new Path(TEST_ROOT_DIR + "/srcdat");
+ try {
+ MyFile[] files = { createFile(root, fs) };
+ List<String> argList = new ArrayList<String>();
+ // copy a dir with a single file
+ addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+ + "/destdat", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat");
ToolRunner.run(new DistCp(new Configuration()),
- new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
- LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"});
+ argList.toArray(new String[argList.size()]));
assertTrue("Source and destination directories do not match.",
- checkFiles(fs, TEST_ROOT_DIR+"/destdat", files));
-
- //copy a single file
+ checkFiles(fs, TEST_ROOT_DIR + "/destdat", files));
+ argList.clear();
+ // copy a single file
String fname = files[0].getName();
Path p = new Path(root, fname);
FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p));
+ addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+ + "/dest2/" + fname, LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat/"
+ + fname);
ToolRunner.run(new DistCp(new Configuration()),
- new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+fname,
- LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"+fname});
+ argList.toArray(new String[argList.size()]));
assertTrue("Source and destination directories do not match.",
- checkFiles(fs, TEST_ROOT_DIR+"/dest2", files));
- //copy single file to existing dir
- deldir(fs, TEST_ROOT_DIR+"/dest2");
- fs.mkdirs(new Path(TEST_ROOT_DIR+"/dest2"));
- MyFile[] files2 = {createFile(root, fs, 0)};
+ checkFiles(fs, TEST_ROOT_DIR + "/dest2", files));
+ argList.clear();
+ // copy single file to existing dir
+ deldir(fs, TEST_ROOT_DIR + "/dest2");
+ fs.mkdirs(new Path(TEST_ROOT_DIR + "/dest2"));
+ MyFile[] files2 = { createFile(root, fs, 0) };
String sname = files2[0].getName();
+ addToArgList(argList, "-update");
+ addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+ + "/dest2/", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat/" + sname);
ToolRunner.run(new DistCp(new Configuration()),
- new String[] {"-update",
- LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname,
- LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"});
+ argList.toArray(new String[argList.size()]));
assertTrue("Source and destination directories do not match.",
- checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));
- updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1);
- //copy single file to existing dir w/ dst name conflict
+ checkFiles(fs, TEST_ROOT_DIR + "/dest2", files2));
+ updateFiles(fs, TEST_ROOT_DIR + "/srcdat", files2, 1);
+ argList.clear();
+ // copy single file to existing dir w/ dst name conflict
+ addToArgList(argList, "-update");
+ addSrcDstToArgList(argList, skipTmp, LOCAL_FS_STR + TEST_ROOT_DIR
+ + "/dest2/", LOCAL_FS_STR + TEST_ROOT_DIR + "/srcdat/" + sname);
ToolRunner.run(new DistCp(new Configuration()),
- new String[] {"-update",
- LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname,
- LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"});
+ argList.toArray(new String[argList.size()]));
assertTrue("Source and destination directories do not match.",
- checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));
- }
- finally {
- deldir(fs, TEST_ROOT_DIR+"/destdat");
- deldir(fs, TEST_ROOT_DIR+"/dest2");
- deldir(fs, TEST_ROOT_DIR+"/srcdat");
+ checkFiles(fs, TEST_ROOT_DIR + "/dest2", files2));
+ } finally {
+ deldir(fs, TEST_ROOT_DIR + "/destdat");
+ deldir(fs, TEST_ROOT_DIR + "/dest2");
+ deldir(fs, TEST_ROOT_DIR + "/srcdat");
}
}
+ public void testCopySingleFile() throws Exception {
+ testCopySingleFile(false);
+ }
+
+ public void testCopySingleFileWithSkiptmp() throws Exception {
+ testCopySingleFile(true);
+ }
+
+
public void testPreserveOption() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22543d34/src/tools/org/apache/hadoop/tools/DistCp.java
----------------------------------------------------------------------
diff --git a/src/tools/org/apache/hadoop/tools/DistCp.java b/src/tools/org/apache/hadoop/tools/DistCp.java
index 09122cf..93ce18c 100644
--- a/src/tools/org/apache/hadoop/tools/DistCp.java
+++ b/src/tools/org/apache/hadoop/tools/DistCp.java
@@ -94,6 +94,8 @@ public class DistCp implements Tool {
"\n -p alone is equivalent to -prbugp" +
"\n-i Ignore failures" +
"\n-log <logdir> Write logs to <logdir>" +
+ "\n-skiptmp Do not copy files to tmp directory and rename." +
+ "\n Instead, copy files directly to the final destination." +
"\n-m <num_maps> Maximum number of simultaneous copies" +
"\n-overwrite Overwrite destination" +
"\n-update Overwrite if src size different from dst size" +
@@ -118,7 +120,14 @@ public class DistCp implements Tool {
"\n specified with symbolic representation. For examples," +
"\n 1230k = 1230 * 1024 = 1259520" +
"\n 891g = 891 * 1024^3 = 956703965184" +
-
+
+ "\n\nNOTE 3: By default, distcp copies files to temporary area first, " +
+ "\n then renames to the final destination. Using -skiptmp " +
+ "\n switch means that distcp copies files directly to the " +
+ "\n destination. Recommend to use it only when you really need to " +
+ "\n (such as to avoid copy/rename overhead in s3, where rename is " +
+ "\n not natively supported), because it may cause damage to " +
+ "\n existing destination file if distcp fails for some reason. " +
"\n";
private static final long BYTES_PER_MAP = 256 * 1024 * 1024;
@@ -134,7 +143,8 @@ public class DistCp implements Tool {
PRESERVE_STATUS("-p", NAME + ".preserve.status"),
OVERWRITE("-overwrite", NAME + ".overwrite.always"),
UPDATE("-update", NAME + ".overwrite.ifnewer"),
- SKIPCRC("-skipcrccheck", NAME + ".skip.crc.check");
+ SKIPCRC("-skipcrccheck", NAME + ".skip.crc.check"),
+ SKIPTMP("-skiptmp", NAME + ".skip.tmp");
final String cmd, propertyname;
@@ -327,6 +337,7 @@ public class DistCp implements Tool {
private byte[] buffer = null;
private JobConf job;
private boolean skipCRCCheck = false;
+ private boolean skipTmp = false;
// stats
private int failcount = 0;
@@ -382,10 +393,17 @@ public class DistCp implements Tool {
private void copy(FileStatus srcstat, Path relativedst,
OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)
throws IOException {
- Path absdst = new Path(destPath, relativedst);
int totfiles = job.getInt(SRC_COUNT_LABEL, -1);
assert totfiles >= 0 : "Invalid file count " + totfiles;
+ // if we are copying a single file and the dest doesn't exist, we
+ // treat it as a copy/rename. The relativedst becomes the new
+ // filename and the destPath becomes its parent directory.
+ if (totfiles == 1 && !destFileSys.exists(destPath)) {
+ relativedst = new Path(destPath.getName());
+ destPath = destPath.getParent();
+ }
+ Path absdst = new Path(destPath, relativedst);
// if a directory, ensure created even if empty
if (srcstat.isDir()) {
if (destFileSys.exists(absdst)) {
@@ -444,23 +462,16 @@ public class DistCp implements Tool {
+ " from " + srcstat.getPath());
}
else {
- if (totfiles == 1) {
- // Copying a single file; use dst path provided by user as destination
- // rather than destination directory, if a file
- Path dstparent = absdst.getParent();
- if (!(destFileSys.exists(dstparent) &&
- destFileSys.getFileStatus(dstparent).isDir())) {
- absdst = dstparent;
- }
- }
if (destFileSys.exists(absdst) &&
destFileSys.getFileStatus(absdst).isDir()) {
throw new IOException(absdst + " is a directory");
}
if (!destFileSys.mkdirs(absdst.getParent())) {
- throw new IOException("Failed to craete parent dir: " + absdst.getParent());
+ throw new IOException("Failed to create parent dir: " + absdst.getParent());
+ }
+ if (!skipTmp){
+ rename(tmpfile, absdst);
}
- rename(tmpfile, absdst);
FileStatus dststat = destFileSys.getFileStatus(absdst);
if (dststat.getLen() != srcstat.getLen()) {
@@ -530,6 +541,7 @@ public class DistCp implements Tool {
update = job.getBoolean(Options.UPDATE.propertyname, false);
overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
skipCRCCheck = job.getBoolean(Options.SKIPCRC.propertyname, false);
+ skipTmp = job.getBoolean(Options.SKIPTMP.propertyname, false);
this.job = job;
}
@@ -654,7 +666,6 @@ public class DistCp implements Tool {
LOG.info("destPath=" + args.dst);
JobConf job = createJobConf(conf);
-
checkSrcPath(job, args.srcs);
if (args.preservedAttributes != null) {
job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
@@ -671,7 +682,9 @@ public class DistCp implements Tool {
finalize(conf, job, args.dst, args.preservedAttributes);
} finally {
//delete tmp
- fullyDelete(job.get(TMP_DIR_LABEL), job);
+ if(!args.flags.contains(Options.SKIPTMP)) {
+ fullyDelete(job.get(TMP_DIR_LABEL), job);
+ }
//delete jobDirectory
fullyDelete(job.get(JOB_DIR_LABEL), job);
}
@@ -945,7 +958,7 @@ public class DistCp implements Tool {
* command line) and at most (distcp.max.map.tasks, default
* MAX_MAPS_PER_NODE * nodes in the cluster).
* @param totalBytes Count of total bytes for job
- * @param job The job to configure
+ * @param job The job configuration
* @return Count of maps to run.
*/
private static void setMapCount(long totalBytes, JobConf job)
@@ -962,7 +975,9 @@ public class DistCp implements Tool {
static void fullyDelete(String dir, Configuration conf) throws IOException {
if (dir != null) {
Path tmp = new Path(dir);
- tmp.getFileSystem(conf).delete(tmp, true);
+ if (tmp.getFileSystem(conf).exists(tmp)){
+ tmp.getFileSystem(conf).delete(tmp, true);
+ }
}
}
@@ -1013,9 +1028,11 @@ public class DistCp implements Tool {
//set boolean values
final boolean update = args.flags.contains(Options.UPDATE);
final boolean skipCRCCheck = args.flags.contains(Options.SKIPCRC);
+ final boolean skipTmp = args.flags.contains(Options.SKIPTMP);
final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE);
jobConf.setBoolean(Options.UPDATE.propertyname, update);
jobConf.setBoolean(Options.SKIPCRC.propertyname, skipCRCCheck);
+ jobConf.setBoolean(Options.SKIPTMP.propertyname, skipTmp);
jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
args.flags.contains(Options.IGNORE_READ_FAILURES));
@@ -1205,9 +1222,10 @@ public class DistCp implements Tool {
jobfs, jobDirectory, jobConf, conf);
}
- Path tmpDir = new Path(
- (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
- args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId);
+ String tmpDirPrefix = (dstExists && !dstIsDir) || (!dstExists && srcCount == 1) ?
+ args.dst.getParent().toString() : args.dst.toString();
+ Path tmpDir = new Path(tmpDirPrefix + (skipTmp? "" : "/_distcp_tmp_" + randomId));
+
jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
LOG.info("sourcePathsCount=" + srcCount);
LOG.info("filesToCopyCount=" + fileCount);