You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/08/14 01:35:06 UTC
svn commit: r685727 - in /hadoop/core/trunk: CHANGES.txt
src/core/org/apache/hadoop/util/StringUtils.java
src/test/org/apache/hadoop/fs/TestCopyFiles.java
src/test/org/apache/hadoop/util/TestStringUtils.java
src/tools/org/apache/hadoop/tools/DistCp.java
Author: cdouglas
Date: Wed Aug 13 16:35:05 2008
New Revision: 685727
URL: http://svn.apache.org/viewvc?rev=685727&view=rev
Log:
HADOOP-3873. Add -filelimit and -sizelimit options to distcp to cap the number
of files/bytes copied in a particular run to support incremental updates and
mirroring. Contributed by (TszWo (Nicholas), SZE.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java
hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Aug 13 16:35:05 2008
@@ -82,6 +82,10 @@
analysis framework. (Jerome Boulon, Andy Konwinski, Ari Rabkin,
and Eric Yang)
+ HADOOP-3873. Add -filelimit and -sizelimit options to distcp to cap the
+ number of files/bytes copied in a particular run to support incremental
+ updates and mirroring. (TszWo (Nicholas), SZE via cdouglas)
+
IMPROVEMENTS
HADOOP-3732. Delay intialization of datanode block verification till
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java Wed Aug 13 16:35:05 2008
@@ -466,7 +466,7 @@
* @param args arguments
* @param LOG the target log object
*/
- public static void startupShutdownMessage(Class clazz, String[] args,
+ public static void startupShutdownMessage(Class<?> clazz, String[] args,
final org.apache.commons.logging.Log LOG) {
final String hostname = getHostname();
final String classname = clazz.getSimpleName();
@@ -490,4 +490,62 @@
}
});
}
+
+ /**
+ * The traditional binary prefixes, kilo, mega, ..., exa,
+ * which can be represented by a 64-bit integer.
+ * TraditionalBinaryPrefix symbol are case insensitive.
+ */
+ public static enum TraditionalBinaryPrefix {
+ KILO(1024),
+ MEGA(KILO.value << 10),
+ GIGA(MEGA.value << 10),
+ TERA(GIGA.value << 10),
+ PETA(TERA.value << 10),
+ EXA(PETA.value << 10);
+
+ public final long value;
+ public final long symbol;
+
+ TraditionalBinaryPrefix(long value) {
+ this.value = value;
+ this.symbol = toString().charAt(0);
+ }
+
+ /**
+ * @return The TraditionalBinaryPrefix object corresponding to the symbol.
+ */
+ public static TraditionalBinaryPrefix valueOf(char symbol) {
+ symbol = Character.toUpperCase(symbol);
+ for(TraditionalBinaryPrefix prefix : TraditionalBinaryPrefix.values()) {
+ if (symbol == prefix.symbol) {
+ return prefix;
+ }
+ }
+ throw new IllegalArgumentException("Unknown symbol '" + symbol + "'");
+ }
+
+ /**
+ * Convert a string to long.
+ * The input string is first be trimmed
+ * and then it is parsed with traditional binary prefix.
+ *
+ * For example,
+ * "-1230k" will be converted to -1230 * 1024 = -1259520;
+ * "891g" will be converted to 891 * 1024^3 = 956703965184;
+ *
+ * @param s input string
+ * @return a long value represented by the input string.
+ */
+ public static long string2long(String s) {
+ s = s.trim();
+ final int lastpos = s.length() - 1;
+ final char lastchar = s.charAt(lastpos);
+ if (Character.isDigit(lastchar))
+ return Long.parseLong(s);
+ else
+ return TraditionalBinaryPrefix.valueOf(lastchar).value
+ * Long.parseLong(s.substring(0, lastpos));
+ }
+ }
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Wed Aug 13 16:35:05 2008
@@ -18,15 +18,24 @@
package org.apache.hadoop.fs;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import junit.framework.TestCase;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.tools.DistCp;
@@ -37,9 +46,17 @@
* A JUnit test for copying files recursively.
*/
public class TestCopyFiles extends TestCase {
+ {
+ ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
+ ).getLogger().setLevel(Level.OFF);
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
+ ((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL);
+ }
static final URI LOCAL_FS = URI.create("file:///");
+ private static final Random RAN = new Random();
private static final int NFILES = 20;
private static String TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"))
@@ -123,9 +140,15 @@
return createFile(root, fs, -1);
}
- /** check if the files have been copied correctly. */
- private static boolean checkFiles(String fsname, String topdir, MyFile[] files)
+ /** Same as checkFiles(fsname, topdir, files, false); */
+ private static boolean checkFiles(String fsname, String topdir, MyFile[] files)
throws IOException {
+ return checkFiles(fsname, topdir, files, false);
+ }
+
+ /** check if the files have been copied correctly. */
+ private static boolean checkFiles(String fsname, String topdir, MyFile[] files,
+ boolean existingOnly) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getNamed(fsname, conf);
@@ -133,20 +156,28 @@
for (int idx = 0; idx < files.length; idx++) {
Path fPath = new Path(root, files[idx].getName());
- FSDataInputStream in = fs.open(fPath);
- byte[] toRead = new byte[files[idx].getSize()];
- byte[] toCompare = new byte[files[idx].getSize()];
- Random rb = new Random(files[idx].getSeed());
- rb.nextBytes(toCompare);
- assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
- in.close();
- for (int i = 0; i < toRead.length; i++) {
- if (toRead[i] != toCompare[i]) {
- return false;
+ try {
+ fs.getFileStatus(fPath);
+ FSDataInputStream in = fs.open(fPath);
+ byte[] toRead = new byte[files[idx].getSize()];
+ byte[] toCompare = new byte[files[idx].getSize()];
+ Random rb = new Random(files[idx].getSeed());
+ rb.nextBytes(toCompare);
+ assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
+ in.close();
+ for (int i = 0; i < toRead.length; i++) {
+ if (toRead[i] != toCompare[i]) {
+ return false;
+ }
+ }
+ toRead = null;
+ toCompare = null;
+ }
+ catch(FileNotFoundException fnfe) {
+ if (!existingOnly) {
+ throw fnfe;
}
}
- toRead = null;
- toCompare = null;
}
return true;
@@ -176,14 +207,24 @@
private static FileStatus[] getFileStatus(String namenode,
String topdir, MyFile[] files) throws IOException {
+ return getFileStatus(namenode, topdir, files, false);
+ }
+ private static FileStatus[] getFileStatus(String namenode,
+ String topdir, MyFile[] files, boolean existingOnly) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getNamed(namenode, conf);
Path root = new Path(topdir);
- FileStatus[] ret = new FileStatus[NFILES];
+ List<FileStatus> statuses = new ArrayList<FileStatus>();
for (int idx = 0; idx < NFILES; ++idx) {
- ret[idx] = fs.getFileStatus(new Path(root, files[idx].getName()));
+ try {
+ statuses.add(fs.getFileStatus(new Path(root, files[idx].getName())));
+ } catch(FileNotFoundException fnfe) {
+ if (!existingOnly) {
+ throw fnfe;
+ }
+ }
}
- return ret;
+ return statuses.toArray(new FileStatus[statuses.size()]);
}
private static boolean checkUpdate(FileStatus[] old, String namenode,
@@ -567,4 +608,85 @@
}
}
+ public void testLimits() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ final String nnUri = FileSystem.getDefaultUri(conf).toString();
+ final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+ final DistCp distcp = new DistCp(conf);
+
+ final String srcrootdir = "/src_root";
+ final Path srcrootpath = new Path(srcrootdir);
+ final String dstrootdir = "/dst_root";
+ final Path dstrootpath = new Path(dstrootdir);
+
+ {//test -filelimit
+ MyFile[] files = createFiles(URI.create(nnUri), srcrootdir);
+ int filelimit = files.length / 2;
+ System.out.println("filelimit=" + filelimit);
+
+ ToolRunner.run(distcp,
+ new String[]{"-filelimit", ""+filelimit, nnUri+srcrootdir, nnUri+dstrootdir});
+
+ FileStatus[] dststat = getFileStatus(nnUri, dstrootdir, files, true);
+ assertEquals(filelimit, dststat.length);
+ deldir(nnUri, dstrootdir);
+ deldir(nnUri, srcrootdir);
+ }
+
+ {//test -sizelimit
+ createFiles(URI.create(nnUri), srcrootdir);
+ long sizelimit = fs.getContentSummary(srcrootpath).getLength()/2;
+ System.out.println("sizelimit=" + sizelimit);
+
+ ToolRunner.run(distcp,
+ new String[]{"-sizelimit", ""+sizelimit, nnUri+srcrootdir, nnUri+dstrootdir});
+
+ ContentSummary summary = fs.getContentSummary(dstrootpath);
+ System.out.println("summary=" + summary);
+ assertTrue(summary.getLength() <= sizelimit);
+ deldir(nnUri, dstrootdir);
+ deldir(nnUri, srcrootdir);
+ }
+
+ {//test update
+ final MyFile[] srcs = createFiles(URI.create(nnUri), srcrootdir);
+ final long totalsize = fs.getContentSummary(srcrootpath).getLength();
+ System.out.println("src.length=" + srcs.length);
+ System.out.println("totalsize =" + totalsize);
+ fs.mkdirs(dstrootpath);
+ final int parts = RAN.nextInt(NFILES/3 - 1) + 2;
+ final int filelimit = srcs.length/parts;
+ final long sizelimit = totalsize/parts;
+ System.out.println("filelimit=" + filelimit);
+ System.out.println("sizelimit=" + sizelimit);
+ System.out.println("parts =" + parts);
+ final String[] args = {"-filelimit", ""+filelimit, "-sizelimit", ""+sizelimit,
+ "-update", nnUri+srcrootdir, nnUri+dstrootdir};
+
+ int dstfilecount = 0;
+ long dstsize = 0;
+ for(int i = 0; i <= parts; i++) {
+ ToolRunner.run(distcp, args);
+
+ FileStatus[] dststat = getFileStatus(nnUri, dstrootdir, srcs, true);
+ System.out.println(i + ") dststat.length=" + dststat.length);
+ assertTrue(dststat.length - dstfilecount <= filelimit);
+ ContentSummary summary = fs.getContentSummary(dstrootpath);
+ System.out.println(i + ") summary.getLength()=" + summary.getLength());
+ assertTrue(summary.getLength() - dstsize <= sizelimit);
+ assertTrue(checkFiles(nnUri, dstrootdir, srcs, true));
+ dstfilecount = dststat.length;
+ dstsize = summary.getLength();
+ }
+
+ deldir(nnUri, dstrootdir);
+ deldir(nnUri, srcrootdir);
+ }
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java Wed Aug 13 16:35:05 2008
@@ -104,4 +104,18 @@
assertEquals(STR_WITH_BOTH2,
StringUtils.unEscapeString(ESCAPED_STR_WITH_BOTH2));
}
+
+ public void testTraditionalBinaryPrefix() throws Exception {
+ String[] symbol = {"k", "m", "g", "t", "p", "e"};
+ long m = 1024;
+ for(String s : symbol) {
+ assertEquals(0, StringUtils.TraditionalBinaryPrefix.string2long(0 + s));
+ assertEquals(m, StringUtils.TraditionalBinaryPrefix.string2long(1 + s));
+ m *= 1024;
+ }
+
+ assertEquals(0L, StringUtils.TraditionalBinaryPrefix.string2long("0"));
+ assertEquals(-1259520L, StringUtils.TraditionalBinaryPrefix.string2long("-1230k"));
+ assertEquals(956703965184L, StringUtils.TraditionalBinaryPrefix.string2long("891g"));
+ }
}
Modified: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Wed Aug 13 16:35:05 2008
@@ -21,10 +21,12 @@
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Stack;
@@ -66,7 +68,7 @@
* different file-systems.
*/
public class DistCp implements Tool {
- private static final Log LOG = LogFactory.getLog(DistCp.class);
+ public static final Log LOG = LogFactory.getLog(DistCp.class);
private static final String NAME = "distcp";
@@ -86,20 +88,32 @@
"\n-overwrite Overwrite destination" +
"\n-update Overwrite if src size different from dst size" +
"\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
- "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
+ "\n-filelimit <n> Limit the total number of files to be <= n" +
+ "\n-sizelimit <n> Limit the total size to be <= n bytes" +
+
+ "\n\nNOTE 1: if -overwrite or -update are set, each source URI is " +
"\n interpreted as an isomorphic update to an existing directory." +
"\nFor example:" +
"\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " +
"\"hdfs://B:8020/user/foo/baz\"\n" +
"\n would update all descendants of 'baz' also in 'bar'; it would " +
- "\n *not* update /user/foo/baz/bar\n";
+ "\n *not* update /user/foo/baz/bar" +
+ "\n\nNOTE 2: The parameter <n> in -filelimit and -sizelimit can be " +
+ "\n specified with symbolic representation. For examples," +
+ "\n 1230k = 1230 * 1024 = 1259520" +
+ "\n 891g = 891 * 1024^3 = 956703965184" +
+
+ "\n";
+
private static final long BYTES_PER_MAP = 256 * 1024 * 1024;
private static final int MAX_MAPS_PER_NODE = 20;
private static final int SYNC_FILE_MAX = 10;
static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
static enum Options {
+ FILE_LIMIT("-filelimit", NAME + ".limit.file"),
+ SIZE_LIMIT("-sizelimit", NAME + ".limit.size"),
IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
PRESERVE_STATUS("-p", NAME + ".preserve.status"),
OVERWRITE("-overwrite", NAME + ".overwrite.always"),
@@ -111,6 +125,17 @@
this.cmd = cmd;
this.propertyname = propertyname;
}
+
+ private long parseLong(String[] args, int offset) {
+ if (offset == args.length) {
+ throw new IllegalArgumentException("<n> not specified in " + cmd);
+ }
+ long n = StringUtils.TraditionalBinaryPrefix.string2long(args[offset]);
+ if (n <= 0) {
+ throw new IllegalArgumentException("n = " + n + " <= 0 in " + cmd);
+ }
+ return n;
+ }
}
static enum FileAttribute {
BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION;
@@ -303,9 +328,11 @@
* Right now, this merely checks that the src and dst len are not equal.
* This should be improved on once modification times, CRCs, etc. can
* be meaningful in this context.
+ * @throws IOException
*/
- private boolean needsUpdate(FileStatus src, FileStatus dst) {
- return update && src.getLen() != dst.getLen();
+ private boolean needsUpdate(FileStatus src, FileSystem dstfs, Path dstpath
+ ) throws IOException {
+ return update && !sameFile(src, dstfs, dstpath);
}
private FSDataOutputStream create(Path f, Reporter reporter,
@@ -355,7 +382,7 @@
}
if (destFileSys.exists(absdst) && !overwrite
- && !needsUpdate(srcstat, destFileSys.getFileStatus(absdst))) {
+ && !needsUpdate(srcstat, destFileSys, absdst)) {
outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
++skipcount;
reporter.incrCounter(Counter.SKIP, 1);
@@ -567,7 +594,10 @@
EnumSet<Options> flags = ignoreReadFailures
? EnumSet.of(Options.IGNORE_READ_FAILURES)
: EnumSet.noneOf(Options.class);
- copy(conf, tmp, new Path(destPath), logPath, flags, null);
+
+ final Path dst = new Path(destPath);
+ copy(conf, new Arguments(tmp, dst, logPath, flags, null,
+ Long.MAX_VALUE, Long.MAX_VALUE));
}
/** Sanity check for srcPath */
@@ -587,28 +617,24 @@
/**
* Driver to copy srcPath to destPath depending on required protocol.
- * @param srcPaths list of source paths
- * @param destPath Destination path
- * @param logPath Log output directory
- * @param flags Command-line flags
+ * @param args arguments
*/
- static void copy(Configuration conf, List<Path> srcPaths,
- Path destPath, Path logPath, EnumSet<Options> flags,
- String presevedAttributes) throws IOException {
- LOG.info("srcPaths=" + srcPaths);
- LOG.info("destPath=" + destPath);
- checkSrcPath(conf, srcPaths);
+ static void copy(final Configuration conf, final Arguments args
+ ) throws IOException {
+ LOG.info("srcPaths=" + args.srcs);
+ LOG.info("destPath=" + args.dst);
+ checkSrcPath(conf, args.srcs);
JobConf job = createJobConf(conf);
- if (presevedAttributes != null) {
- job.set(PRESERVE_STATUS_LABEL, presevedAttributes);
+ if (args.preservedAttributes != null) {
+ job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
}
//Initialize the mapper
try {
- setup(conf, job, srcPaths, destPath, logPath, flags);
+ setup(conf, job, args);
JobClient.runJob(job);
- finalize(conf, job, destPath, presevedAttributes);
+ finalize(conf, job, args.dst, args.preservedAttributes);
} finally {
//delete tmp
fullyDelete(job.get(TMP_DIR_LABEL), job);
@@ -669,29 +695,50 @@
}
}
- static private class CommandArgument {
+ static private class Arguments {
final List<Path> srcs;
final Path dst;
final Path log;
final EnumSet<Options> flags;
- final String presevedAttributes;
+ final String preservedAttributes;
+ final long filelimit;
+ final long sizelimit;
- CommandArgument(List<Path> srcs, Path dst, Path log,
- EnumSet<Options> flags, String presevedAttributes) {
+ /**
+ * Arguments for distcp
+ * @param srcs List of source paths
+ * @param dst Destination path
+ * @param log Log output directory
+ * @param flags Command-line flags
+ * @param preservedAttributes Preserved attributes
+ * @param filelimit File limit
+ * @param sizelimit Size limit
+ */
+ Arguments(List<Path> srcs, Path dst, Path log,
+ EnumSet<Options> flags, String preservedAttributes,
+ long filelimit, long sizelimit) {
this.srcs = srcs;
this.dst = dst;
this.log = log;
this.flags = flags;
- this.presevedAttributes = presevedAttributes;
+ this.preservedAttributes = preservedAttributes;
+ this.filelimit = filelimit;
+ this.sizelimit = sizelimit;
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("this = " + this);
+ }
}
- static CommandArgument valueOf(String[] args, Configuration conf
+ static Arguments valueOf(String[] args, Configuration conf
) throws IOException {
List<Path> srcs = new ArrayList<Path>();
Path dst = null;
Path log = null;
EnumSet<Options> flags = EnumSet.noneOf(Options.class);
String presevedAttributes = null;
+ long filelimit = Long.MAX_VALUE;
+ long sizelimit = Long.MAX_VALUE;
for (int idx = 0; idx < args.length; idx++) {
Options[] opt = Options.values();
@@ -704,6 +751,12 @@
presevedAttributes = args[idx].substring(2);
FileAttribute.parse(presevedAttributes); //validation
}
+ else if (opt[i] == Options.FILE_LIMIT) {
+ filelimit = Options.FILE_LIMIT.parseLong(args, ++idx);
+ }
+ else if (opt[i] == Options.SIZE_LIMIT) {
+ sizelimit = Options.SIZE_LIMIT.parseLong(args, ++idx);
+ }
} else if ("-f".equals(args[idx])) {
if (++idx == args.length) {
throw new IllegalArgumentException("urilist_uri not specified in -f");
@@ -741,7 +794,21 @@
if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
throw new IllegalArgumentException("Conflicting overwrite policies");
}
- return new CommandArgument(srcs, dst, log, flags, presevedAttributes);
+ return new Arguments(srcs, dst, log, flags, presevedAttributes,
+ filelimit, sizelimit);
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return getClass().getName() + "{"
+ + "\n srcs = " + srcs
+ + "\n dst = " + dst
+ + "\n log = " + log
+ + "\n flags = " + flags
+ + "\n preservedAttributes = " + preservedAttributes
+ + "\n filelimit = " + filelimit
+ + "\n sizelimit = " + sizelimit
+ + "\n}";
}
}
@@ -755,8 +822,7 @@
*/
public int run(String[] args) throws Exception {
try {
- CommandArgument p = CommandArgument.valueOf(args, conf);
- copy(conf, p.srcs, p.dst, p.log, p.flags, p.presevedAttributes);
+ copy(conf, Arguments.valueOf(args, conf));
return 0;
} catch (IllegalArgumentException e) {
System.err.println(StringUtils.stringifyException(e) + "\n" + usage);
@@ -858,61 +924,55 @@
return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
}
- private static boolean setBooleans(JobConf jobConf, EnumSet<Options> flags) {
- boolean update = flags.contains(Options.UPDATE);
- boolean overwrite = !update && flags.contains(Options.OVERWRITE);
- jobConf.setBoolean(Options.UPDATE.propertyname, update);
- jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
- jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
- flags.contains(Options.IGNORE_READ_FAILURES));
- jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
- flags.contains(Options.PRESERVE_STATUS));
- return update || overwrite;
- }
-
/**
* Initialize DFSCopyFileMapper specific job-configuration.
* @param conf : The dfs/mapred configuration.
* @param jobConf : The handle to the jobConf object to be initialized.
- * @param srcPaths : The source URIs.
- * @param destPath : The destination URI.
- * @param logPath : Log output directory
- * @param flags : Command-line flags
+ * @param args Arguments
*/
private static void setup(Configuration conf, JobConf jobConf,
- List<Path> srcPaths, final Path destPath,
- Path logPath, EnumSet<Options> flags)
+ final Arguments args)
throws IOException {
- jobConf.set(DST_DIR_LABEL, destPath.toUri().toString());
- final boolean updateORoverwrite = setBooleans(jobConf, flags);
+ jobConf.set(DST_DIR_LABEL, args.dst.toUri().toString());
+
+ //set boolean values
+ final boolean update = args.flags.contains(Options.UPDATE);
+ final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE);
+ jobConf.setBoolean(Options.UPDATE.propertyname, update);
+ jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
+ jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
+ args.flags.contains(Options.IGNORE_READ_FAILURES));
+ jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
+ args.flags.contains(Options.PRESERVE_STATUS));
final String randomId = getRandomId();
JobClient jClient = new JobClient(jobConf);
Path jobDirectory = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
- FileSystem dstfs = destPath.getFileSystem(conf);
- boolean dstExists = dstfs.exists(destPath);
+ FileSystem dstfs = args.dst.getFileSystem(conf);
+ boolean dstExists = dstfs.exists(args.dst);
boolean dstIsDir = false;
if (dstExists) {
- dstIsDir = dstfs.getFileStatus(destPath).isDir();
+ dstIsDir = dstfs.getFileStatus(args.dst).isDir();
}
// default logPath
+ Path logPath = args.log;
if (logPath == null) {
String filename = "_distcp_logs_" + randomId;
if (!dstExists || !dstIsDir) {
- Path parent = destPath.getParent();
+ Path parent = args.dst.getParent();
if (!dstfs.exists(parent)) {
dstfs.mkdirs(parent);
}
logPath = new Path(parent, filename);
} else {
- logPath = new Path(destPath, filename);
+ logPath = new Path(args.dst, filename);
}
}
FileOutputFormat.setOutputPath(jobConf, logPath);
-
+
// create src list, dst list
FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
@@ -937,45 +997,69 @@
// and we've only a single src directory OR we're updating/overwriting
// the contents of the destination directory.
final boolean special =
- (srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
+ (args.srcs.size() == 1 && !dstExists) || update || overwrite;
int srcCount = 0, cnsyncf = 0, dirsyn = 0;
- long cbsize = 0L, cbsyncs = 0L;
+ long fileCount = 0L, byteCount = 0L, cbsyncs = 0L;
+ boolean exceededlimit = false;
try {
- for (Path src : srcPaths) {
- FileSystem fs = src.getFileSystem(conf);
- FileStatus srcfilestat = fs.getFileStatus(src);
+ for(Iterator<Path> srcItr = args.srcs.iterator();
+ !exceededlimit && srcItr.hasNext(); ) {
+ final Path src = srcItr.next();
+ FileSystem srcfs = src.getFileSystem(conf);
+ FileStatus srcfilestat = srcfs.getFileStatus(src);
Path root = special && srcfilestat.isDir()? src: src.getParent();
if (srcfilestat.isDir()) {
++srcCount;
}
Stack<FileStatus> pathstack = new Stack<FileStatus>();
- for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
+ for(pathstack.push(srcfilestat); !exceededlimit && !pathstack.empty(); ) {
FileStatus cur = pathstack.pop();
- for(FileStatus child : fs.listStatus(cur.getPath())) {
+ FileStatus[] children = srcfs.listStatus(cur.getPath());
+ for(int i = 0; !exceededlimit && i < children.length; i++) {
+ boolean skipfile = false;
+ final FileStatus child = children[i];
+ final String dst = makeRelative(root, child.getPath());
++srcCount;
if (child.isDir()) {
pathstack.push(child);
}
else {
- ++cnsyncf;
- cbsyncs += child.getLen();
- cbsize += child.getLen();
-
- if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
- src_writer.sync();
- dst_writer.sync();
- cnsyncf = 0;
- cbsyncs = 0L;
+ //skip file if the src and the dst files are the same.
+ final Path absdst = new Path(args.dst, dst);
+ skipfile = update && sameFile(child, dstfs, absdst);
+
+ if (!skipfile) {
+ ++fileCount;
+ byteCount += child.getLen();
+
+ exceededlimit |= fileCount > args.filelimit
+ || byteCount > args.sizelimit;
+
+ if (!exceededlimit) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("adding file " + child.getPath());
+ }
+
+ ++cnsyncf;
+ cbsyncs += child.getLen();
+ if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
+ src_writer.sync();
+ dst_writer.sync();
+ cnsyncf = 0;
+ cbsyncs = 0L;
+ }
+ }
}
}
- String dst = makeRelative(root, child.getPath());
- src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
- new FilePair(child, dst));
- dst_writer.append(new Text(dst),
- new Text(child.getPath().toString()));
+ if (!skipfile && !exceededlimit) {
+ src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
+ new FilePair(child, dst));
+ dst_writer.append(new Text(dst),
+ new Text(child.getPath().toString()));
+ }
}
if (cur.isDir()) {
@@ -995,9 +1079,9 @@
}
// create dest path dir if copying > 1 file
- if (!dstfs.exists(destPath)) {
- if (srcCount > 1 && !dstfs.mkdirs(destPath)) {
- throw new IOException("Failed to create" + destPath);
+ if (!dstfs.exists(args.dst)) {
+ if (srcCount > 1 && !dstfs.mkdirs(args.dst)) {
+ throw new IOException("Failed to create" + args.dst);
}
}
@@ -1006,14 +1090,30 @@
Path tmpDir = new Path(
(dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
- destPath.getParent(): destPath, "_distcp_tmp_" + randomId);
+ args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId);
jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
LOG.info("srcCount=" + srcCount);
jobConf.setInt(SRC_COUNT_LABEL, srcCount);
- jobConf.setLong(TOTAL_SIZE_LABEL, cbsize);
- setMapCount(cbsize, jobConf);
+ jobConf.setLong(TOTAL_SIZE_LABEL, byteCount);
+ setMapCount(byteCount, jobConf);
+ }
+
+ /**
+ * Check whether the src and the dst are the same.
+ * Two files are considered as the same if they have the same size.
+ */
+ static private boolean sameFile(FileStatus src, FileSystem dstfs, Path dstpath
+ ) throws IOException {
+ FileStatus dst = null;
+ try {
+ dst = dstfs.getFileStatus(dstpath);
+ } catch (FileNotFoundException fnfe) {
+ return false;
+ }
+ return src.getLen() == dst.getLen();
}
+ /** Check whether the file list have duplication. */
static private void checkDuplication(FileSystem fs, Path file, Path sorted,
Configuration conf) throws IOException {
SequenceFile.Reader in = null;