You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/08/14 01:35:06 UTC

svn commit: r685727 - in /hadoop/core/trunk: CHANGES.txt src/core/org/apache/hadoop/util/StringUtils.java src/test/org/apache/hadoop/fs/TestCopyFiles.java src/test/org/apache/hadoop/util/TestStringUtils.java src/tools/org/apache/hadoop/tools/DistCp.java

Author: cdouglas
Date: Wed Aug 13 16:35:05 2008
New Revision: 685727

URL: http://svn.apache.org/viewvc?rev=685727&view=rev
Log:
HADOOP-3873. Add -filelimit and -sizelimit options to distcp to cap the number
of files/bytes copied in a particular run to support incremental updates and
mirroring. Contributed by (TszWo (Nicholas), SZE.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
    hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java
    hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Aug 13 16:35:05 2008
@@ -82,6 +82,10 @@
     analysis framework. (Jerome Boulon, Andy Konwinski, Ari Rabkin, 
     and Eric Yang)
 
+    HADOOP-3873. Add -filelimit and -sizelimit options to distcp to cap the
+    number of files/bytes copied in a particular run to support incremental
+    updates and mirroring. (TszWo (Nicholas), SZE via cdouglas)
+
   IMPROVEMENTS
 
     HADOOP-3732. Delay intialization of datanode block verification till

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java Wed Aug 13 16:35:05 2008
@@ -466,7 +466,7 @@
    * @param args arguments
    * @param LOG the target log object
    */
-  public static void startupShutdownMessage(Class clazz, String[] args,
+  public static void startupShutdownMessage(Class<?> clazz, String[] args,
                                      final org.apache.commons.logging.Log LOG) {
     final String hostname = getHostname();
     final String classname = clazz.getSimpleName();
@@ -490,4 +490,62 @@
       }
     });
   }
+
+  /**
+   * The traditional binary prefixes, kilo, mega, ..., exa,
+   * which can be represented by a 64-bit integer.
+   * TraditionalBinaryPrefix symbol are case insensitive. 
+   */
+  public static enum TraditionalBinaryPrefix {
+    KILO(1024),
+    MEGA(KILO.value << 10),
+    GIGA(MEGA.value << 10),
+    TERA(GIGA.value << 10),
+    PETA(TERA.value << 10),
+    EXA(PETA.value << 10);
+
+    public final long value;
+    public final long symbol;
+
+    TraditionalBinaryPrefix(long value) {
+      this.value = value;
+      this.symbol = toString().charAt(0);
+    }
+
+    /**
+     * @return The TraditionalBinaryPrefix object corresponding to the symbol.
+     */
+    public static TraditionalBinaryPrefix valueOf(char symbol) {
+      symbol = Character.toUpperCase(symbol);
+      for(TraditionalBinaryPrefix prefix : TraditionalBinaryPrefix.values()) {
+        if (symbol == prefix.symbol) {
+          return prefix;
+        }
+      }
+      throw new IllegalArgumentException("Unknown symbol '" + symbol + "'");
+    }
+
+    /**
+     * Convert a string to long.
+     * The input string is first be trimmed
+     * and then it is parsed with traditional binary prefix.
+     *
+     * For example,
+     * "-1230k" will be converted to -1230 * 1024 = -1259520;
+     * "891g" will be converted to 891 * 1024^3 = 956703965184;
+     *
+     * @param s input string
+     * @return a long value represented by the input string.
+     */
+    public static long string2long(String s) {
+      s = s.trim();
+      final int lastpos = s.length() - 1;
+      final char lastchar = s.charAt(lastpos);
+      if (Character.isDigit(lastchar))
+        return Long.parseLong(s);
+      else
+        return TraditionalBinaryPrefix.valueOf(lastchar).value
+               * Long.parseLong(s.substring(0, lastpos));
+    }
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Wed Aug 13 16:35:05 2008
@@ -18,15 +18,24 @@
 
 package org.apache.hadoop.fs;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.tools.DistCp;
@@ -37,9 +46,17 @@
  * A JUnit test for copying files recursively.
  */
 public class TestCopyFiles extends TestCase {
+  {
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
+        ).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL);
+  }
   
   static final URI LOCAL_FS = URI.create("file:///");
   
+  private static final Random RAN = new Random();
   private static final int NFILES = 20;
   private static String TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"))
@@ -123,9 +140,15 @@
     return createFile(root, fs, -1);
   }
 
-  /** check if the files have been copied correctly. */
-  private static boolean checkFiles(String fsname, String topdir, MyFile[] files) 
+  /** Same as checkFiles(fsname, topdir, files, false); */
+  private static boolean checkFiles(String fsname, String topdir, MyFile[] files)
     throws IOException {
+    return checkFiles(fsname, topdir, files, false);
+  }
+
+  /** check if the files have been copied correctly. */
+  private static boolean checkFiles(String fsname, String topdir, MyFile[] files,
+      boolean existingOnly) throws IOException {
     
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.getNamed(fsname, conf);
@@ -133,20 +156,28 @@
     
     for (int idx = 0; idx < files.length; idx++) {
       Path fPath = new Path(root, files[idx].getName());
-      FSDataInputStream in = fs.open(fPath);
-      byte[] toRead = new byte[files[idx].getSize()];
-      byte[] toCompare = new byte[files[idx].getSize()];
-      Random rb = new Random(files[idx].getSeed());
-      rb.nextBytes(toCompare);
-      assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
-      in.close();
-      for (int i = 0; i < toRead.length; i++) {
-        if (toRead[i] != toCompare[i]) {
-          return false;
+      try {
+        fs.getFileStatus(fPath);
+        FSDataInputStream in = fs.open(fPath);
+        byte[] toRead = new byte[files[idx].getSize()];
+        byte[] toCompare = new byte[files[idx].getSize()];
+        Random rb = new Random(files[idx].getSeed());
+        rb.nextBytes(toCompare);
+        assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
+        in.close();
+        for (int i = 0; i < toRead.length; i++) {
+          if (toRead[i] != toCompare[i]) {
+            return false;
+          }
+        }
+        toRead = null;
+        toCompare = null;
+      }
+      catch(FileNotFoundException fnfe) {
+        if (!existingOnly) {
+          throw fnfe;
         }
       }
-      toRead = null;
-      toCompare = null;
     }
     
     return true;
@@ -176,14 +207,24 @@
 
   private static FileStatus[] getFileStatus(String namenode,
       String topdir, MyFile[] files) throws IOException {
+    return getFileStatus(namenode, topdir, files, false);
+  }
+  private static FileStatus[] getFileStatus(String namenode,
+      String topdir, MyFile[] files, boolean existingOnly) throws IOException {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.getNamed(namenode, conf);
     Path root = new Path(topdir);
-    FileStatus[] ret = new FileStatus[NFILES];
+    List<FileStatus> statuses = new ArrayList<FileStatus>();
     for (int idx = 0; idx < NFILES; ++idx) {
-      ret[idx] = fs.getFileStatus(new Path(root, files[idx].getName()));
+      try {
+        statuses.add(fs.getFileStatus(new Path(root, files[idx].getName())));
+      } catch(FileNotFoundException fnfe) {
+        if (!existingOnly) {
+          throw fnfe;
+        }
+      }
     }
-    return ret;
+    return statuses.toArray(new FileStatus[statuses.size()]);
   }
 
   private static boolean checkUpdate(FileStatus[] old, String namenode,
@@ -567,4 +608,85 @@
     }
   }
 
+  public void testLimits() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster(conf, 2, true, null);
+      final String nnUri = FileSystem.getDefaultUri(conf).toString();
+      final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+      final DistCp distcp = new DistCp(conf);
+
+      final String srcrootdir =  "/src_root";
+      final Path srcrootpath = new Path(srcrootdir); 
+      final String dstrootdir =  "/dst_root";
+      final Path dstrootpath = new Path(dstrootdir); 
+
+      {//test -filelimit
+        MyFile[] files = createFiles(URI.create(nnUri), srcrootdir);
+        int filelimit = files.length / 2;
+        System.out.println("filelimit=" + filelimit);
+
+        ToolRunner.run(distcp,
+            new String[]{"-filelimit", ""+filelimit, nnUri+srcrootdir, nnUri+dstrootdir});
+        
+        FileStatus[] dststat = getFileStatus(nnUri, dstrootdir, files, true);
+        assertEquals(filelimit, dststat.length);
+        deldir(nnUri, dstrootdir);
+        deldir(nnUri, srcrootdir);
+      }
+
+      {//test -sizelimit
+        createFiles(URI.create(nnUri), srcrootdir);
+        long sizelimit = fs.getContentSummary(srcrootpath).getLength()/2;
+        System.out.println("sizelimit=" + sizelimit);
+
+        ToolRunner.run(distcp,
+            new String[]{"-sizelimit", ""+sizelimit, nnUri+srcrootdir, nnUri+dstrootdir});
+        
+        ContentSummary summary = fs.getContentSummary(dstrootpath);
+        System.out.println("summary=" + summary);
+        assertTrue(summary.getLength() <= sizelimit);
+        deldir(nnUri, dstrootdir);
+        deldir(nnUri, srcrootdir);
+      }
+
+      {//test update
+        final MyFile[] srcs = createFiles(URI.create(nnUri), srcrootdir);
+        final long totalsize = fs.getContentSummary(srcrootpath).getLength();
+        System.out.println("src.length=" + srcs.length);
+        System.out.println("totalsize =" + totalsize);
+        fs.mkdirs(dstrootpath);
+        final int parts = RAN.nextInt(NFILES/3 - 1) + 2;
+        final int filelimit = srcs.length/parts;
+        final long sizelimit = totalsize/parts;
+        System.out.println("filelimit=" + filelimit);
+        System.out.println("sizelimit=" + sizelimit);
+        System.out.println("parts    =" + parts);
+        final String[] args = {"-filelimit", ""+filelimit, "-sizelimit", ""+sizelimit,
+            "-update", nnUri+srcrootdir, nnUri+dstrootdir};
+
+        int dstfilecount = 0;
+        long dstsize = 0;
+        for(int i = 0; i <= parts; i++) {
+          ToolRunner.run(distcp, args);
+        
+          FileStatus[] dststat = getFileStatus(nnUri, dstrootdir, srcs, true);
+          System.out.println(i + ") dststat.length=" + dststat.length);
+          assertTrue(dststat.length - dstfilecount <= filelimit);
+          ContentSummary summary = fs.getContentSummary(dstrootpath);
+          System.out.println(i + ") summary.getLength()=" + summary.getLength());
+          assertTrue(summary.getLength() - dstsize <= sizelimit);
+          assertTrue(checkFiles(nnUri, dstrootdir, srcs, true));
+          dstfilecount = dststat.length;
+          dstsize = summary.getLength();
+        }
+
+        deldir(nnUri, dstrootdir);
+        deldir(nnUri, srcrootdir);
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/util/TestStringUtils.java Wed Aug 13 16:35:05 2008
@@ -104,4 +104,18 @@
     assertEquals(STR_WITH_BOTH2,
         StringUtils.unEscapeString(ESCAPED_STR_WITH_BOTH2));
   }
+  
+  public void testTraditionalBinaryPrefix() throws Exception {
+    String[] symbol = {"k", "m", "g", "t", "p", "e"};
+    long m = 1024;
+    for(String s : symbol) {
+      assertEquals(0, StringUtils.TraditionalBinaryPrefix.string2long(0 + s));
+      assertEquals(m, StringUtils.TraditionalBinaryPrefix.string2long(1 + s));
+      m *= 1024;
+    }
+    
+    assertEquals(0L, StringUtils.TraditionalBinaryPrefix.string2long("0"));
+    assertEquals(-1259520L, StringUtils.TraditionalBinaryPrefix.string2long("-1230k"));
+    assertEquals(956703965184L, StringUtils.TraditionalBinaryPrefix.string2long("891g"));
+  }
 }

Modified: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=685727&r1=685726&r2=685727&view=diff
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Wed Aug 13 16:35:05 2008
@@ -21,10 +21,12 @@
 import java.io.BufferedReader;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.Stack;
@@ -66,7 +68,7 @@
  * different file-systems.
  */
 public class DistCp implements Tool {
-  private static final Log LOG = LogFactory.getLog(DistCp.class);
+  public static final Log LOG = LogFactory.getLog(DistCp.class);
 
   private static final String NAME = "distcp";
 
@@ -86,20 +88,32 @@
     "\n-overwrite             Overwrite destination" +
     "\n-update                Overwrite if src size different from dst size" +
     "\n-f <urilist_uri>       Use list at <urilist_uri> as src list" +
-    "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
+    "\n-filelimit <n>         Limit the total number of files to be <= n" +
+    "\n-sizelimit <n>         Limit the total size to be <= n bytes" +
+    
+    "\n\nNOTE 1: if -overwrite or -update are set, each source URI is " +
     "\n      interpreted as an isomorphic update to an existing directory." +
     "\nFor example:" +
     "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " +
     "\"hdfs://B:8020/user/foo/baz\"\n" +
     "\n     would update all descendants of 'baz' also in 'bar'; it would " +
-    "\n     *not* update /user/foo/baz/bar\n";
+    "\n     *not* update /user/foo/baz/bar" + 
 
+    "\n\nNOTE 2: The parameter <n> in -filelimit and -sizelimit can be " +
+    "\n     specified with symbolic representation.  For examples," +
+    "\n       1230k = 1230 * 1024 = 1259520" +
+    "\n       891g = 891 * 1024^3 = 956703965184" +
+    
+    "\n";
+  
   private static final long BYTES_PER_MAP =  256 * 1024 * 1024;
   private static final int MAX_MAPS_PER_NODE = 20;
   private static final int SYNC_FILE_MAX = 10;
 
   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
   static enum Options {
+    FILE_LIMIT("-filelimit", NAME + ".limit.file"),
+    SIZE_LIMIT("-sizelimit", NAME + ".limit.size"),
     IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
     PRESERVE_STATUS("-p", NAME + ".preserve.status"),
     OVERWRITE("-overwrite", NAME + ".overwrite.always"),
@@ -111,6 +125,17 @@
       this.cmd = cmd;
       this.propertyname = propertyname;
     }
+    
+    private long parseLong(String[] args, int offset) {
+      if (offset ==  args.length) {
+        throw new IllegalArgumentException("<n> not specified in " + cmd);
+      }
+      long n = StringUtils.TraditionalBinaryPrefix.string2long(args[offset]);
+      if (n <= 0) {
+        throw new IllegalArgumentException("n = " + n + " <= 0 in " + cmd);
+      }
+      return n;
+    }
   }
   static enum FileAttribute {
     BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION;
@@ -303,9 +328,11 @@
      * Right now, this merely checks that the src and dst len are not equal. 
      * This should be improved on once modification times, CRCs, etc. can
      * be meaningful in this context.
+     * @throws IOException 
      */
-    private boolean needsUpdate(FileStatus src, FileStatus dst) {
-      return update && src.getLen() != dst.getLen();
+    private boolean needsUpdate(FileStatus src, FileSystem dstfs, Path dstpath
+        ) throws IOException {
+      return update && !sameFile(src, dstfs, dstpath);
     }
     
     private FSDataOutputStream create(Path f, Reporter reporter,
@@ -355,7 +382,7 @@
       }
 
       if (destFileSys.exists(absdst) && !overwrite
-          && !needsUpdate(srcstat, destFileSys.getFileStatus(absdst))) {
+          && !needsUpdate(srcstat, destFileSys, absdst)) {
         outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
         ++skipcount;
         reporter.incrCounter(Counter.SKIP, 1);
@@ -567,7 +594,10 @@
     EnumSet<Options> flags = ignoreReadFailures
       ? EnumSet.of(Options.IGNORE_READ_FAILURES)
       : EnumSet.noneOf(Options.class);
-    copy(conf, tmp, new Path(destPath), logPath, flags, null);
+
+    final Path dst = new Path(destPath);
+    copy(conf, new Arguments(tmp, dst, logPath, flags, null,
+        Long.MAX_VALUE, Long.MAX_VALUE));
   }
 
   /** Sanity check for srcPath */
@@ -587,28 +617,24 @@
 
   /**
    * Driver to copy srcPath to destPath depending on required protocol.
-   * @param srcPaths list of source paths
-   * @param destPath Destination path
-   * @param logPath Log output directory
-   * @param flags Command-line flags
+   * @param args arguments
    */
-  static void copy(Configuration conf, List<Path> srcPaths,
-      Path destPath, Path logPath, EnumSet<Options> flags,
-      String presevedAttributes) throws IOException {
-    LOG.info("srcPaths=" + srcPaths);
-    LOG.info("destPath=" + destPath);
-    checkSrcPath(conf, srcPaths);
+  static void copy(final Configuration conf, final Arguments args
+      ) throws IOException {
+    LOG.info("srcPaths=" + args.srcs);
+    LOG.info("destPath=" + args.dst);
+    checkSrcPath(conf, args.srcs);
 
     JobConf job = createJobConf(conf);
-    if (presevedAttributes != null) {
-      job.set(PRESERVE_STATUS_LABEL, presevedAttributes);
+    if (args.preservedAttributes != null) {
+      job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
     }
     
     //Initialize the mapper
     try {
-      setup(conf, job, srcPaths, destPath, logPath, flags);
+      setup(conf, job, args);
       JobClient.runJob(job);
-      finalize(conf, job, destPath, presevedAttributes);
+      finalize(conf, job, args.dst, args.preservedAttributes);
     } finally {
       //delete tmp
       fullyDelete(job.get(TMP_DIR_LABEL), job);
@@ -669,29 +695,50 @@
     }
   }
 
-  static private class CommandArgument {
+  static private class Arguments {
     final List<Path> srcs;
     final Path dst;
     final Path log;
     final EnumSet<Options> flags;
-    final String presevedAttributes;
+    final String preservedAttributes;
+    final long filelimit;
+    final long sizelimit;
     
-    CommandArgument(List<Path> srcs, Path dst, Path log,
-        EnumSet<Options> flags, String presevedAttributes) {
+    /**
+     * Arguments for distcp
+     * @param srcs List of source paths
+     * @param dst Destination path
+     * @param log Log output directory
+     * @param flags Command-line flags
+     * @param preservedAttributes Preserved attributes 
+     * @param filelimit File limit
+     * @param sizelimit Size limit
+     */
+    Arguments(List<Path> srcs, Path dst, Path log,
+        EnumSet<Options> flags, String preservedAttributes,
+        long filelimit, long sizelimit) {
       this.srcs = srcs;
       this.dst = dst;
       this.log = log;
       this.flags = flags;
-      this.presevedAttributes = presevedAttributes;      
+      this.preservedAttributes = preservedAttributes;
+      this.filelimit = filelimit;
+      this.sizelimit = sizelimit;
+      
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("this = " + this);
+      }
     }
 
-    static CommandArgument valueOf(String[] args, Configuration conf
+    static Arguments valueOf(String[] args, Configuration conf
         ) throws IOException {
       List<Path> srcs = new ArrayList<Path>();
       Path dst = null;
       Path log = null;
       EnumSet<Options> flags = EnumSet.noneOf(Options.class);
       String presevedAttributes = null;
+      long filelimit = Long.MAX_VALUE;
+      long sizelimit = Long.MAX_VALUE;
 
       for (int idx = 0; idx < args.length; idx++) {
         Options[] opt = Options.values();
@@ -704,6 +751,12 @@
             presevedAttributes =  args[idx].substring(2);         
             FileAttribute.parse(presevedAttributes); //validation
           }
+          else if (opt[i] == Options.FILE_LIMIT) {
+            filelimit = Options.FILE_LIMIT.parseLong(args, ++idx);
+          }
+          else if (opt[i] == Options.SIZE_LIMIT) {
+            sizelimit = Options.SIZE_LIMIT.parseLong(args, ++idx);
+          }
         } else if ("-f".equals(args[idx])) {
           if (++idx ==  args.length) {
             throw new IllegalArgumentException("urilist_uri not specified in -f");
@@ -741,7 +794,21 @@
       if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
         throw new IllegalArgumentException("Conflicting overwrite policies");
       }
-      return new CommandArgument(srcs, dst, log, flags, presevedAttributes);
+      return new Arguments(srcs, dst, log, flags, presevedAttributes,
+          filelimit, sizelimit);
+    }
+    
+    /** {@inheritDoc} */
+    public String toString() {
+      return getClass().getName() + "{"
+          + "\n  srcs = " + srcs 
+          + "\n  dst = " + dst 
+          + "\n  log = " + log 
+          + "\n  flags = " + flags
+          + "\n  preservedAttributes = " + preservedAttributes 
+          + "\n  filelimit = " + filelimit 
+          + "\n  sizelimit = " + sizelimit
+          + "\n}"; 
     }
   }
 
@@ -755,8 +822,7 @@
    */
   public int run(String[] args) throws Exception {
     try {
-      CommandArgument p = CommandArgument.valueOf(args, conf);
-      copy(conf, p.srcs, p.dst, p.log, p.flags, p.presevedAttributes);
+      copy(conf, Arguments.valueOf(args, conf));
       return 0;
     } catch (IllegalArgumentException e) {
       System.err.println(StringUtils.stringifyException(e) + "\n" + usage);
@@ -858,61 +924,55 @@
     return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
   }
 
-  private static boolean setBooleans(JobConf jobConf, EnumSet<Options> flags) {
-    boolean update = flags.contains(Options.UPDATE);
-    boolean overwrite = !update && flags.contains(Options.OVERWRITE);
-    jobConf.setBoolean(Options.UPDATE.propertyname, update);
-    jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
-    jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
-        flags.contains(Options.IGNORE_READ_FAILURES));
-    jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
-        flags.contains(Options.PRESERVE_STATUS));
-    return update || overwrite;
-  }
-
   /**
    * Initialize DFSCopyFileMapper specific job-configuration.
    * @param conf : The dfs/mapred configuration.
    * @param jobConf : The handle to the jobConf object to be initialized.
-   * @param srcPaths : The source URIs.
-   * @param destPath : The destination URI.
-   * @param logPath : Log output directory
-   * @param flags : Command-line flags
+   * @param args Arguments
    */
   private static void setup(Configuration conf, JobConf jobConf,
-                            List<Path> srcPaths, final Path destPath,
-                            Path logPath, EnumSet<Options> flags)
+                            final Arguments args)
       throws IOException {
-    jobConf.set(DST_DIR_LABEL, destPath.toUri().toString());
-    final boolean updateORoverwrite = setBooleans(jobConf, flags);
+    jobConf.set(DST_DIR_LABEL, args.dst.toUri().toString());
+
+    //set boolean values
+    final boolean update = args.flags.contains(Options.UPDATE);
+    final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE);
+    jobConf.setBoolean(Options.UPDATE.propertyname, update);
+    jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
+    jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
+        args.flags.contains(Options.IGNORE_READ_FAILURES));
+    jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
+        args.flags.contains(Options.PRESERVE_STATUS));
 
     final String randomId = getRandomId();
     JobClient jClient = new JobClient(jobConf);
     Path jobDirectory = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
     jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
 
-    FileSystem dstfs = destPath.getFileSystem(conf);
-    boolean dstExists = dstfs.exists(destPath);
+    FileSystem dstfs = args.dst.getFileSystem(conf);
+    boolean dstExists = dstfs.exists(args.dst);
     boolean dstIsDir = false;
     if (dstExists) {
-      dstIsDir = dstfs.getFileStatus(destPath).isDir();
+      dstIsDir = dstfs.getFileStatus(args.dst).isDir();
     }
 
     // default logPath
+    Path logPath = args.log; 
     if (logPath == null) {
       String filename = "_distcp_logs_" + randomId;
       if (!dstExists || !dstIsDir) {
-        Path parent = destPath.getParent();
+        Path parent = args.dst.getParent();
         if (!dstfs.exists(parent)) {
           dstfs.mkdirs(parent);
         }
         logPath = new Path(parent, filename);
       } else {
-        logPath = new Path(destPath, filename);
+        logPath = new Path(args.dst, filename);
       }
     }
     FileOutputFormat.setOutputPath(jobConf, logPath);
-    
+
     // create src list, dst list
     FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
 
@@ -937,45 +997,69 @@
     // and we've only a single src directory OR we're updating/overwriting
     // the contents of the destination directory.
     final boolean special =
-      (srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
+      (args.srcs.size() == 1 && !dstExists) || update || overwrite;
     int srcCount = 0, cnsyncf = 0, dirsyn = 0;
-    long cbsize = 0L, cbsyncs = 0L;
+    long fileCount = 0L, byteCount = 0L, cbsyncs = 0L;
+    boolean exceededlimit = false;
     try {
-      for (Path src : srcPaths) {
-        FileSystem fs = src.getFileSystem(conf);
-        FileStatus srcfilestat = fs.getFileStatus(src);
+      for(Iterator<Path> srcItr = args.srcs.iterator();
+          !exceededlimit && srcItr.hasNext(); ) {
+        final Path src = srcItr.next();
+        FileSystem srcfs = src.getFileSystem(conf);
+        FileStatus srcfilestat = srcfs.getFileStatus(src);
         Path root = special && srcfilestat.isDir()? src: src.getParent();
         if (srcfilestat.isDir()) {
           ++srcCount;
         }
 
         Stack<FileStatus> pathstack = new Stack<FileStatus>();
-        for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
+        for(pathstack.push(srcfilestat); !exceededlimit && !pathstack.empty(); ) {
           FileStatus cur = pathstack.pop();
-          for(FileStatus child : fs.listStatus(cur.getPath())) {
+          FileStatus[] children = srcfs.listStatus(cur.getPath());
+          for(int i = 0; !exceededlimit && i < children.length; i++) {
+            boolean skipfile = false;
+            final FileStatus child = children[i]; 
+            final String dst = makeRelative(root, child.getPath());
             ++srcCount;
 
             if (child.isDir()) {
               pathstack.push(child);
             }
             else {
-              ++cnsyncf;
-              cbsyncs += child.getLen();
-              cbsize += child.getLen();
-
-              if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
-                src_writer.sync();
-                dst_writer.sync();
-                cnsyncf = 0;
-                cbsyncs = 0L;
+              //skip file if the src and the dst files are the same.
+              final Path absdst = new Path(args.dst, dst);
+              skipfile = update && sameFile(child, dstfs, absdst);
+              
+              if (!skipfile) {
+                ++fileCount;
+                byteCount += child.getLen();
+  
+                exceededlimit |= fileCount > args.filelimit
+                                 || byteCount > args.sizelimit;
+
+                if (!exceededlimit) {
+                  if (LOG.isTraceEnabled()) {
+                    LOG.trace("adding file " + child.getPath());
+                  }
+
+                  ++cnsyncf;
+                  cbsyncs += child.getLen();
+                  if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
+                    src_writer.sync();
+                    dst_writer.sync();
+                    cnsyncf = 0;
+                    cbsyncs = 0L;
+                  }
+                }
               }
             }
 
-            String dst = makeRelative(root, child.getPath());
-            src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
-                new FilePair(child, dst));
-            dst_writer.append(new Text(dst),
-                new Text(child.getPath().toString()));
+            if (!skipfile && !exceededlimit) {
+              src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
+                  new FilePair(child, dst));
+              dst_writer.append(new Text(dst),
+                  new Text(child.getPath().toString()));
+            }
           }
 
           if (cur.isDir()) {
@@ -995,9 +1079,9 @@
     }
 
     // create dest path dir if copying > 1 file
-    if (!dstfs.exists(destPath)) {
-      if (srcCount > 1 && !dstfs.mkdirs(destPath)) {
-        throw new IOException("Failed to create" + destPath);
+    if (!dstfs.exists(args.dst)) {
+      if (srcCount > 1 && !dstfs.mkdirs(args.dst)) {
+        throw new IOException("Failed to create" + args.dst);
       }
     }
 
@@ -1006,14 +1090,30 @@
 
     Path tmpDir = new Path(
         (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
-        destPath.getParent(): destPath, "_distcp_tmp_" + randomId);
+        args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId);
     jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
     LOG.info("srcCount=" + srcCount);
     jobConf.setInt(SRC_COUNT_LABEL, srcCount);
-    jobConf.setLong(TOTAL_SIZE_LABEL, cbsize);
-    setMapCount(cbsize, jobConf);
+    jobConf.setLong(TOTAL_SIZE_LABEL, byteCount);
+    setMapCount(byteCount, jobConf);
+  }
+
+  /**
+   * Check whether the src and the dst are the same.
+   * Two files are considered as the same if they have the same size.
+   */
+  static private boolean sameFile(FileStatus src, FileSystem dstfs, Path dstpath
+      ) throws IOException {
+    FileStatus dst = null;
+    try {
+      dst = dstfs.getFileStatus(dstpath);
+    } catch (FileNotFoundException fnfe) {
+      return false;
+    }
+    return src.getLen() == dst.getLen();
   }
 
+  /** Check whether the file list have duplication. */
   static private void checkDuplication(FileSystem fs, Path file, Path sorted,
     Configuration conf) throws IOException {
     SequenceFile.Reader in = null;