You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yj...@apache.org on 2017/05/25 02:15:03 UTC

[1/4] hadoop git commit: Revert "HADOOP-14407. DistCp - Introduce a configurable copy buffer size. (Omkar Aradhya K S via Yongjun Zhang)"

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 bd0138ea0 -> dd552a97b


Revert "HADOOP-14407. DistCp - Introduce a configurable copy buffer size. (Omkar Aradhya K S via Yongjun Zhang)"

This reverts commit fe185e2c3a07d4e6a82ed4fd255c98ec8b561ad6.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/39474ed5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/39474ed5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/39474ed5

Branch: refs/heads/branch-2
Commit: 39474ed50d2d84cb0ee5452b5fa5b19494168c94
Parents: bd0138e
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Wed May 24 15:38:50 2017 -0700
Committer: Yongjun Zhang <yz...@cloudera.com>
Committed: Wed May 24 18:54:50 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/tools/DistCpConstants.java    |  6 ---
 .../apache/hadoop/tools/DistCpOptionSwitch.java |  8 ----
 .../org/apache/hadoop/tools/DistCpOptions.java  | 21 +---------
 .../org/apache/hadoop/tools/OptionsParser.java  | 27 +------------
 .../tools/mapred/RetriableFileCopyCommand.java  | 11 ++---
 .../src/site/markdown/DistCp.md.vm              |  1 -
 .../apache/hadoop/tools/TestOptionsParser.java  | 42 +-------------------
 7 files changed, 8 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/39474ed5/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 0541b75..6cea583 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -113,10 +113,6 @@ public class DistCpConstants {
   /* DistCp CopyListing class override param */
   public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
 
-  /* DistCp Copy Buffer Size */
-  public static final String CONF_LABEL_COPY_BUFFER_SIZE =
-      "distcp.copy.buffer.size";
-
   /**
    * Conf label for SSL Trust-store location.
    */
@@ -161,6 +157,4 @@ public class DistCpConstants {
   public static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw";
 
   static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
-
-  public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39474ed5/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index 49ec035..e76a48e 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -189,14 +189,6 @@ public enum DistCpOptionSwitch {
           + "system implements concat method")),
 
   /**
-   * Configurable copy buffer size.
-   */
-  COPY_BUFFER_SIZE(DistCpConstants.CONF_LABEL_COPY_BUFFER_SIZE,
-      new Option("copybuffersize", true, "Size of the copy buffer to use. "
-          + "By default <copybuffersize> is "
-          + DistCpConstants.COPY_BUFFER_SIZE_DEFAULT + "B.")),
-
-  /**
    * Specify bandwidth per map in MB
    */
   BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39474ed5/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index b3c843f..2efb96b 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -104,11 +104,6 @@ public class DistCpOptions {
   // to copy in parallel. Default is 0 and file are not splitted.
   private int blocksPerChunk = 0;
 
-  /**
-   * The copyBufferSize to use in RetriableFileCopyCommand
-   */
-  private int copyBufferSize = DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
-
   public static enum FileAttribute{
     REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES;
 
@@ -179,7 +174,6 @@ public class DistCpOptions {
       this.targetPathExists = that.getTargetPathExists();
       this.filtersFile = that.getFiltersFile();
       this.blocksPerChunk = that.blocksPerChunk;
-      this.copyBufferSize = that.copyBufferSize;
     }
   }
 
@@ -470,7 +464,7 @@ public class DistCpOptions {
   }
 
   /**
-   * Checks if the input attribute should be preserved or not.
+   * Checks if the input attribute should be preserved or not
    *
    * @param attribute - Attribute to check
    * @return True if attribute should be preserved, false otherwise
@@ -646,16 +640,6 @@ public class DistCpOptions {
     return blocksPerChunk > 0;
   }
 
-  public final void setCopyBufferSize(int newCopyBufferSize) {
-    this.copyBufferSize =
-        newCopyBufferSize > 0 ? newCopyBufferSize
-            : DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
-  }
-
-  public int getCopyBufferSize() {
-    return this.copyBufferSize;
-  }
-
   public void validate(DistCpOptionSwitch option, boolean value) {
 
     boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
@@ -752,8 +736,6 @@ public class DistCpOptions {
     }
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
         String.valueOf(blocksPerChunk));
-    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.COPY_BUFFER_SIZE,
-        String.valueOf(copyBufferSize));
   }
 
   /**
@@ -791,7 +773,6 @@ public class DistCpOptions {
         ", targetPathExists=" + targetPathExists +
         ", filtersFile='" + filtersFile + '\'' +
         ", blocksPerChunk=" + blocksPerChunk +
-        ", copyBufferSize=" + copyBufferSize +
         '}';
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39474ed5/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 868ae73..c68102d 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -186,8 +186,6 @@ public class OptionsParser {
 
     parseBlocksPerChunk(command, option);
 
-    parseCopyBufferSize(command, option);
-
     return option;
   }
 
@@ -223,29 +221,8 @@ public class OptionsParser {
   }
 
   /**
-   * A helper method to parse copyBufferSize.
-   *
-   * @param command command line arguments
-   */
-  private static void parseCopyBufferSize(CommandLine command,
-      DistCpOptions option) {
-    if (command.hasOption(DistCpOptionSwitch.COPY_BUFFER_SIZE.getSwitch())) {
-      String copyBufferSizeStr =
-          getVal(command, DistCpOptionSwitch.COPY_BUFFER_SIZE.getSwitch()
-              .trim());
-      try {
-        int copyBufferSize = Integer.parseInt(copyBufferSizeStr);
-        option.setCopyBufferSize(copyBufferSize);
-      } catch (NumberFormatException e) {
-        throw new IllegalArgumentException("copyBufferSize is invalid: "
-            + copyBufferSizeStr, e);
-      }
-    }
-  }
-
-  /**
-   * parseSizeLimit is a helper method for parsing the deprecated argument
-   * SIZE_LIMIT.
+   * parseSizeLimit is a helper method for parsing the deprecated
+   * argument SIZE_LIMIT.
    *
    * @param command command line arguments
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39474ed5/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index ddf2725..58a51af 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpConstants;
-import org.apache.hadoop.tools.DistCpOptionSwitch;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
 import org.apache.hadoop.tools.util.DistCpUtils;
@@ -54,6 +53,7 @@ import com.google.common.annotations.VisibleForTesting;
 public class RetriableFileCopyCommand extends RetriableCommand {
 
   private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
+  private static int BUFFER_SIZE = 8 * 1024;
   private boolean skipCrc = false;
   private FileAction action;
 
@@ -169,9 +169,6 @@ public class RetriableFileCopyCommand extends RetriableCommand {
       throws IOException {
     FsPermission permission = FsPermission.getFileDefault().applyUMask(
         FsPermission.getUMask(targetFS.getConf()));
-    int copyBufferSize = context.getConfiguration().getInt(
-        DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(),
-        DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
     final OutputStream outStream;
     if (action == FileAction.OVERWRITE) {
       final short repl = getReplicationFactor(fileAttributes, source,
@@ -180,14 +177,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
           targetFS, targetPath);
       FSDataOutputStream out = targetFS.create(targetPath, permission,
           EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
-          copyBufferSize, repl, blockSize, context,
+          BUFFER_SIZE, repl, blockSize, context,
           getChecksumOpt(fileAttributes, sourceChecksum));
       outStream = new BufferedOutputStream(out);
     } else {
       outStream = new BufferedOutputStream(targetFS.append(targetPath,
-          copyBufferSize));
+          BUFFER_SIZE));
     }
-    return copyBytes(source, sourceOffset, outStream, copyBufferSize,
+    return copyBytes(source, sourceOffset, outStream, BUFFER_SIZE,
         context);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39474ed5/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index 32c71dd..e6cff10 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -240,7 +240,6 @@ Flag              | Description                          | Notes
 `-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads.
 `-skipcrccheck` | Whether to skip CRC checks between source and target paths. |
 `-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
-`-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B |
 
 Architecture of DistCp
 ----------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39474ed5/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
index d1ef56a..acffb76 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
@@ -407,8 +407,7 @@ public class TestOptionsParser {
         + "copyStrategy='uniformsize', preserveStatus=[], "
         + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
         + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
-        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0, "
-        + "copyBufferSize=8192}";
+        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}";
     String optionString = option.toString();
     Assert.assertEquals(val, optionString);
     Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
@@ -774,43 +773,4 @@ public class TestOptionsParser {
         "hdfs://localhost:8020/target/"});
     Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt");
   }
-
-  @Test
-  public void testParseCopyBufferSize() {
-    DistCpOptions options =
-        OptionsParser.parse(new String[] {
-            "hdfs://localhost:8020/source/first",
-            "hdfs://localhost:8020/target/" });
-    Assert.assertEquals(options.getCopyBufferSize(),
-        DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
-
-    options =
-        OptionsParser.parse(new String[] { "-copybuffersize", "0",
-            "hdfs://localhost:8020/source/first",
-            "hdfs://localhost:8020/target/" });
-    Assert.assertEquals(options.getCopyBufferSize(),
-        DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
-
-    options =
-        OptionsParser.parse(new String[] { "-copybuffersize", "-1",
-            "hdfs://localhost:8020/source/first",
-            "hdfs://localhost:8020/target/" });
-    Assert.assertEquals(options.getCopyBufferSize(),
-        DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
-
-    options =
-        OptionsParser.parse(new String[] { "-copybuffersize", "4194304",
-            "hdfs://localhost:8020/source/first",
-            "hdfs://localhost:8020/target/" });
-    Assert.assertEquals(options.getCopyBufferSize(), 4194304);
-
-    try {
-      OptionsParser
-          .parse(new String[] { "-copybuffersize", "hello",
-              "hdfs://localhost:8020/source/first",
-              "hdfs://localhost:8020/target/" });
-      Assert.fail("Non numberic copybuffersize parsed successfully!");
-    } catch (IllegalArgumentException ignore) {
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/4] hadoop git commit: Revert "HADOOP-11794. Enable distcp to copy blocks in parallel. Contributed by Yongjun Zhang, Wei-Chiu Chuang, Xiao Chen, Rosie Li."

Posted by yj...@apache.org.
Revert "HADOOP-11794. Enable distcp to copy blocks in parallel. Contributed by Yongjun Zhang, Wei-Chiu Chuang, Xiao Chen, Rosie Li."

This reverts commit c411adac32cda42433b15979253248336487d977.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/97317b1a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/97317b1a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/97317b1a

Branch: refs/heads/branch-2
Commit: 97317b1aaa289201832013b946b96b4410b7ad8c
Parents: 39474ed
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Wed May 24 16:10:28 2017 -0700
Committer: Yongjun Zhang <yz...@cloudera.com>
Committed: Wed May 24 18:54:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  22 +-
 .../org/apache/hadoop/tools/CopyListing.java    |  37 +-
 .../hadoop/tools/CopyListingFileStatus.java     |  87 +----
 .../java/org/apache/hadoop/tools/DistCp.java    |  52 ---
 .../apache/hadoop/tools/DistCpOptionSwitch.java |  10 -
 .../org/apache/hadoop/tools/DistCpOptions.java  |  22 +-
 .../org/apache/hadoop/tools/OptionsParser.java  |  36 +-
 .../apache/hadoop/tools/SimpleCopyListing.java  |  83 ++---
 .../hadoop/tools/mapred/CopyCommitter.java      | 174 +--------
 .../apache/hadoop/tools/mapred/CopyMapper.java  |  40 +-
 .../tools/mapred/RetriableFileCopyCommand.java  |  26 +-
 .../tools/mapred/UniformSizeInputFormat.java    |   5 +-
 .../apache/hadoop/tools/util/DistCpUtils.java   | 111 +-----
 .../src/site/markdown/DistCp.md.vm              |   1 -
 .../apache/hadoop/tools/TestDistCpSystem.java   | 368 ++-----------------
 .../apache/hadoop/tools/TestOptionsParser.java  |   2 +-
 .../hadoop/tools/mapred/TestCopyCommitter.java  |   5 +-
 17 files changed, 110 insertions(+), 971 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 087d8f4..033b81d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -841,27 +841,7 @@ public class DFSTestUtil {
       out.write(toAppend);
     }
   }
-
-  /**
-   * Append specified length of bytes to a given file, starting with new block.
-   * @param fs The file system
-   * @param p Path of the file to append
-   * @param length Length of bytes to append to the file
-   * @throws IOException
-   */
-  public static void appendFileNewBlock(DistributedFileSystem fs,
-      Path p, int length) throws IOException {
-    assert fs.exists(p);
-    assert length >= 0;
-    byte[] toAppend = new byte[length];
-    Random random = new Random();
-    random.nextBytes(toAppend);
-    try (FSDataOutputStream out = fs.append(p,
-        EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) {
-      out.write(toAppend);
-    }
-  }
-
+  
   /**
    * @return url content as string (UTF-8 encoding assumed)
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
index 9ebf9d2..481aa61 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
@@ -145,22 +145,12 @@ public abstract class CopyListing extends Configured {
     Configuration config = getConf();
     FileSystem fs = pathToListFile.getFileSystem(config);
 
-    final boolean splitLargeFile = options.splitLargeFile();
-
-    // When splitLargeFile is enabled, we don't randomize the copylist
-    // earlier, so we don't do the sorting here. For a file that has
-    // multiple entries due to split, we check here that their
-    // <chunkOffset, chunkLength> is continuous.
-    //
-    Path checkPath = splitLargeFile?
-        pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile);
+    Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile);
 
     SequenceFile.Reader reader = new SequenceFile.Reader(
-                          config, SequenceFile.Reader.file(checkPath));
+                          config, SequenceFile.Reader.file(sortedList));
     try {
       Text lastKey = new Text("*"); //source relative path can never hold *
-      long lastChunkOffset = -1;
-      long lastChunkLength = -1;
       CopyListingFileStatus lastFileStatus = new CopyListingFileStatus();
 
       Text currentKey = new Text();
@@ -171,21 +161,8 @@ public abstract class CopyListing extends Configured {
         if (currentKey.equals(lastKey)) {
           CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
           reader.getCurrentValue(currentFileStatus);
-          if (!splitLargeFile) {
-            throw new DuplicateFileException("File " + lastFileStatus.getPath()
-                + " and " + currentFileStatus.getPath()
-                + " would cause duplicates. Aborting");
-          } else {
-            if (lastChunkOffset + lastChunkLength !=
-                currentFileStatus.getChunkOffset()) {
-              throw new InvalidInputException("File " + lastFileStatus.getPath()
-                  + " " + lastChunkOffset + "," + lastChunkLength
-                  + " and " + currentFileStatus.getPath()
-                  + " " + currentFileStatus.getChunkOffset() + ","
-                  + currentFileStatus.getChunkLength()
-                  + " are not continuous. Aborting");
-            }
-          }
+          throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
+              currentFileStatus.getPath() + " would cause duplicates. Aborting");
         }
         reader.getCurrentValue(lastFileStatus);
         if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
@@ -204,12 +181,8 @@ public abstract class CopyListing extends Configured {
             xAttrSupportCheckFsSet.add(lastFsUri);
           }
         }
-
         lastKey.set(currentKey);
-        if (splitLargeFile) {
-          lastChunkOffset = lastFileStatus.getChunkOffset();
-          lastChunkLength = lastFileStatus.getChunkLength();
-        }
+
         if (options.shouldUseDiff() && LOG.isDebugEnabled()) {
           LOG.debug("Copy list entry " + idx + ": " +
                   lastFileStatus.getPath().toUri().getPath());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
index 5395fa9..2b1e7e4 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
@@ -74,14 +74,6 @@ public final class CopyListingFileStatus implements Writable {
   private List<AclEntry> aclEntries;
   private Map<String, byte[]> xAttrs;
 
-  // <chunkOffset, chunkLength> represents the offset and length of a file
-  // chunk in number of bytes.
-  // used when splitting a large file to chunks to copy in parallel.
-  // If a file is not large enough to split, chunkOffset would be 0 and
-  // chunkLength would be the length of the file.
-  private long chunkOffset = 0;
-  private long chunkLength = Long.MAX_VALUE;
-
   /**
    * Default constructor.
    */
@@ -104,32 +96,11 @@ public final class CopyListingFileStatus implements Writable {
         fileStatus.getPath());
   }
 
-  public CopyListingFileStatus(FileStatus fileStatus,
-      long chunkOffset, long chunkLength) {
-    this(fileStatus.getLen(), fileStatus.isDirectory(),
-        fileStatus.getReplication(), fileStatus.getBlockSize(),
-        fileStatus.getModificationTime(), fileStatus.getAccessTime(),
-        fileStatus.getPermission(), fileStatus.getOwner(),
-        fileStatus.getGroup(),
-        fileStatus.getPath());
-    this.chunkOffset = chunkOffset;
-    this.chunkLength = chunkLength;
-  }
-
   @SuppressWarnings("checkstyle:parameternumber")
   public CopyListingFileStatus(long length, boolean isdir,
       int blockReplication, long blocksize, long modificationTime,
       long accessTime, FsPermission permission, String owner, String group,
       Path path) {
-    this(length, isdir, blockReplication, blocksize, modificationTime,
-        accessTime, permission, owner, group, path, 0, Long.MAX_VALUE);
-  }
-
-  @SuppressWarnings("checkstyle:parameternumber")
-  public CopyListingFileStatus(long length, boolean isdir,
-      int blockReplication, long blocksize, long modificationTime,
-      long accessTime, FsPermission permission, String owner, String group,
-      Path path, long chunkOffset, long chunkLength) {
     this.length = length;
     this.isdir = isdir;
     this.blockReplication = (short)blockReplication;
@@ -146,23 +117,6 @@ public final class CopyListingFileStatus implements Writable {
     this.owner = (owner == null) ? "" : owner;
     this.group = (group == null) ? "" : group;
     this.path = path;
-    this.chunkOffset = chunkOffset;
-    this.chunkLength = chunkLength;
-  }
-
-  public CopyListingFileStatus(CopyListingFileStatus other) {
-    this.length = other.length;
-    this.isdir = other.isdir;
-    this.blockReplication = other.blockReplication;
-    this.blocksize = other.blocksize;
-    this.modificationTime = other.modificationTime;
-    this.accessTime = other.accessTime;
-    this.permission = other.permission;
-    this.owner = other.owner;
-    this.group = other.group;
-    this.path = new Path(other.path.toUri());
-    this.chunkOffset = other.chunkOffset;
-    this.chunkLength = other.chunkLength;
   }
 
   public Path getPath() {
@@ -242,31 +196,6 @@ public final class CopyListingFileStatus implements Writable {
     this.xAttrs = xAttrs;
   }
 
-  public long getChunkOffset() {
-    return chunkOffset;
-  }
-
-  public void setChunkOffset(long offset) {
-    this.chunkOffset = offset;
-  }
-
-  public long getChunkLength() {
-    return chunkLength;
-  }
-
-  public void setChunkLength(long chunkLength) {
-    this.chunkLength = chunkLength;
-  }
-
-  public boolean isSplit() {
-    return getChunkLength() != Long.MAX_VALUE &&
-        getChunkLength() != getLen();
-  }
-
-  public long getSizeToCopy() {
-    return isSplit()? getChunkLength() : getLen();
-  }
-
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, getPath().toString(), Text.DEFAULT_MAX_LEN);
@@ -311,9 +240,6 @@ public final class CopyListingFileStatus implements Writable {
     } else {
       out.writeInt(NO_XATTRS);
     }
-
-    out.writeLong(chunkOffset);
-    out.writeLong(chunkLength);
   }
 
   @Override
@@ -362,9 +288,6 @@ public final class CopyListingFileStatus implements Writable {
     } else {
       xAttrs = null;
     }
-
-    chunkOffset = in.readLong();
-    chunkLength = in.readLong();
   }
 
   @Override
@@ -390,14 +313,8 @@ public final class CopyListingFileStatus implements Writable {
   public String toString() {
     StringBuilder sb = new StringBuilder(super.toString());
     sb.append('{');
-    sb.append(this.getPath().toString());
-    sb.append(" length = ").append(this.getLen());
-    sb.append(" aclEntries = ").append(aclEntries);
-    sb.append(", xAttrs = ").append(xAttrs);
-    if (isSplit()) {
-      sb.append(", chunkOffset = ").append(this.getChunkOffset());
-      sb.append(", chunkLength = ").append(this.getChunkLength());
-    }
+    sb.append("aclEntries = " + aclEntries);
+    sb.append(", xAttrs = " + xAttrs);
     sb.append('}');
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
index ddf67ff..7b0d9f2 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.CopyListing.*;
 import org.apache.hadoop.tools.mapred.CopyMapper;
 import org.apache.hadoop.tools.mapred.CopyOutputFormat;
@@ -134,7 +133,6 @@ public class DistCp extends Configured implements Tool {
     
     try {
       inputOptions = (OptionsParser.parse(argv));
-      setOptionsForSplitLargeFile();
       setTargetPathExists();
       LOG.info("Input Options: " + inputOptions);
     } catch (Throwable e) {
@@ -236,56 +234,6 @@ public class DistCp extends Configured implements Tool {
     getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 
         targetExists);
   }
-
-  /**
-   * Check if concat is supported by fs.
-   * Throws UnsupportedOperationException if not.
-   */
-  private void checkConcatSupport(FileSystem fs) {
-    try {
-      Path[] src = null;
-      Path tgt = null;
-      fs.concat(tgt, src);
-    } catch (UnsupportedOperationException use) {
-      throw new UnsupportedOperationException(
-          DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() +
-          " is not supported since the target file system doesn't" +
-          " support concat.", use);
-    } catch (Exception e) {
-      // Ignore other exception
-    }
-  }
-
-  /**
-   * Set up needed options for splitting large files.
-   */
-  private void setOptionsForSplitLargeFile() throws IOException {
-    if (!inputOptions.splitLargeFile()) {
-      return;
-    }
-    Path target = inputOptions.getTargetPath();
-    FileSystem targetFS = target.getFileSystem(getConf());
-    checkConcatSupport(targetFS);
-
-    LOG.info("Enabling preserving blocksize since "
-        + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed.");
-    inputOptions.preserve(FileAttribute.BLOCKSIZE);
-
-    LOG.info("Set " +
-        DistCpOptionSwitch.APPEND.getSwitch()
-        + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
-        + " is passed.");
-    inputOptions.setAppend(false);
-
-    LOG.info("Set " +
-        DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES
-        + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
-        + " is passed.");
-    getConf().setBoolean(
-        DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
-  }
-
-
   /**
    * Create Job object for submitting it, with all the configuration
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index e76a48e..b000791 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -178,16 +178,6 @@ public enum DistCpOptionSwitch {
       new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
               "copied to <= n bytes")),
 
-  BLOCKS_PER_CHUNK("",
-      new Option("blocksperchunk", true, "If set to a positive value, files"
-          + "with more blocks than this value will be split into chunks of "
-          + "<blocksperchunk> blocks to be transferred in parallel, and "
-          + "reassembled on the destination. By default, <blocksperchunk> is "
-          + "0 and the files will be transmitted in their entirety without "
-          + "splitting. This switch is only applicable when the source file "
-          + "system implements getBlockLocations method and the target file "
-          + "system implements concat method")),
-
   /**
    * Specify bandwidth per map in MB
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 2efb96b..c61816a 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -99,11 +99,7 @@ public class DistCpOptions {
   // targetPathExist is a derived field, it's initialized in the
   // beginning of distcp.
   private boolean targetPathExists = true;
-
-  // Size of chunk in number of blocks when splitting large file into chunks
-  // to copy in parallel. Default is 0 and file are not splitted.
-  private int blocksPerChunk = 0;
-
+  
   public static enum FileAttribute{
     REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES;
 
@@ -173,7 +169,6 @@ public class DistCpOptions {
       this.targetPath = that.getTargetPath();
       this.targetPathExists = that.getTargetPathExists();
       this.filtersFile = that.getFiltersFile();
-      this.blocksPerChunk = that.blocksPerChunk;
     }
   }
 
@@ -628,18 +623,6 @@ public class DistCpOptions {
     this.filtersFile = filtersFilename;
   }
 
-  public final void setBlocksPerChunk(int csize) {
-    this.blocksPerChunk = csize;
-  }
-
-  public final int getBlocksPerChunk() {
-    return blocksPerChunk;
-  }
-
-  public final boolean splitLargeFile() {
-    return blocksPerChunk > 0;
-  }
-
   public void validate(DistCpOptionSwitch option, boolean value) {
 
     boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
@@ -734,8 +717,6 @@ public class DistCpOptions {
       DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS,
           filtersFile);
     }
-    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
-        String.valueOf(blocksPerChunk));
   }
 
   /**
@@ -772,7 +753,6 @@ public class DistCpOptions {
         ", targetPath=" + targetPath +
         ", targetPathExists=" + targetPathExists +
         ", filtersFile='" + filtersFile + '\'' +
-        ", blocksPerChunk=" + blocksPerChunk +
         '}';
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index c68102d..af3cb92 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -184,42 +184,9 @@ public class OptionsParser {
           DistCpOptionSwitch.FILTERS.getSwitch()));
     }
 
-    parseBlocksPerChunk(command, option);
-
     return option;
   }
 
-
-  /**
-   * A helper method to parse chunk size in number of blocks.
-   * Used when breaking large file into chunks to copy in parallel.
-   *
-   * @param command command line arguments
-   */
-  private static void parseBlocksPerChunk(CommandLine command,
-      DistCpOptions option) {
-    boolean hasOption =
-        command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch());
-    LOG.info("parseChunkSize: " +
-        DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " " + hasOption);
-    if (hasOption) {
-      String chunkSizeString = getVal(command,
-          DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim());
-      try {
-        int csize = Integer.parseInt(chunkSizeString);
-        if (csize < 0) {
-          csize = 0;
-        }
-        LOG.info("Set distcp blocksPerChunk to " + csize);
-        option.setBlocksPerChunk(csize);
-      }
-      catch (NumberFormatException e) {
-        throw new IllegalArgumentException("blocksPerChunk is invalid: "
-            + chunkSizeString, e);
-      }
-    }
-  }
-
   /**
    * parseSizeLimit is a helper method for parsing the deprecated
    * argument SIZE_LIMIT.
@@ -254,7 +221,8 @@ public class OptionsParser {
                               DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
       try {
         Integer.parseInt(fileLimitString);
-      } catch (NumberFormatException e) {
+      }
+      catch (NumberFormatException e) {
         throw new IllegalArgumentException("File-limit is invalid: "
                                             + fileLimitString, e);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
index af91347..105e4f2 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.tools;
 
 import com.google.common.collect.Lists;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -48,7 +47,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
-import java.util.LinkedList;
 
 import static org.apache.hadoop.tools.DistCpConstants
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -242,10 +240,10 @@ public class SimpleCopyListing extends CopyListing {
     final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
     final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
     final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
-    LinkedList<CopyListingFileStatus> fileCopyListingStatus =
+    CopyListingFileStatus fileCopyListingStatus =
         DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus,
-            preserveAcls, preserveXAttrs, preserveRawXAttrs,
-            options.getBlocksPerChunk());
+            preserveAcls, preserveXAttrs, preserveRawXAttrs);
+
     writeToFileListingRoot(fileListWriter, fileCopyListingStatus,
         sourceRoot, options);
   }
@@ -350,10 +348,9 @@ public class SimpleCopyListing extends CopyListing {
         FileStatus[] sourceFiles = sourceFS.listStatus(path);
         boolean explore = (sourceFiles != null && sourceFiles.length > 0);
         if (!explore || rootStatus.isDirectory()) {
-          LinkedList<CopyListingFileStatus> rootCopyListingStatus =
-              DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
-                  preserveAcls, preserveXAttrs, preserveRawXAttrs,
-                  options.getBlocksPerChunk());
+          CopyListingFileStatus rootCopyListingStatus =
+            DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
+                preserveAcls, preserveXAttrs, preserveRawXAttrs);
           writeToFileListingRoot(fileListWriter, rootCopyListingStatus,
               sourcePathRoot, options);
         }
@@ -363,20 +360,20 @@ public class SimpleCopyListing extends CopyListing {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
             }
-            LinkedList<CopyListingFileStatus> sourceCopyListingStatus =
-                DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
-                    preserveAcls && sourceStatus.isDirectory(),
-                    preserveXAttrs && sourceStatus.isDirectory(),
-                    preserveRawXAttrs && sourceStatus.isDirectory(),
-                    options.getBlocksPerChunk());
-            for (CopyListingFileStatus fs : sourceCopyListingStatus) {
-              if (randomizeFileListing) {
-                addToFileListing(statusList,
-                    new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
-              } else {
-                writeToFileListing(fileListWriter, fs, sourcePathRoot);
-              }
+            CopyListingFileStatus sourceCopyListingStatus =
+              DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
+                  preserveAcls && sourceStatus.isDirectory(),
+                  preserveXAttrs && sourceStatus.isDirectory(),
+                  preserveRawXAttrs && sourceStatus.isDirectory());
+            if (randomizeFileListing) {
+              addToFileListing(statusList,
+                  new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot),
+                  fileListWriter);
+            } else {
+              writeToFileListing(fileListWriter, sourceCopyListingStatus,
+                  sourcePathRoot);
             }
+
             if (sourceStatus.isDirectory()) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath());
@@ -644,20 +641,18 @@ public class SimpleCopyListing extends CopyListing {
             LOG.debug("Recording source-path: " + child.getPath() + " for copy.");
           }
           if (workResult.getSuccess()) {
-            LinkedList<CopyListingFileStatus> childCopyListingStatus =
+            CopyListingFileStatus childCopyListingStatus =
               DistCpUtils.toCopyListingFileStatus(sourceFS, child,
                 preserveAcls && child.isDirectory(),
                 preserveXAttrs && child.isDirectory(),
-                preserveRawXattrs && child.isDirectory(),
-                options.getBlocksPerChunk());
-
-            for (CopyListingFileStatus fs : childCopyListingStatus) {
-              if (randomizeFileListing) {
-                addToFileListing(fileStatuses,
-                    new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
-              } else {
-                writeToFileListing(fileListWriter, fs, sourcePathRoot);
-              }
+                preserveRawXattrs && child.isDirectory());
+            if (randomizeFileListing) {
+              addToFileListing(fileStatuses,
+                  new FileStatusInfo(childCopyListingStatus, sourcePathRoot),
+                  fileListWriter);
+            } else {
+              writeToFileListing(fileListWriter, childCopyListingStatus,
+                  sourcePathRoot);
             }
           }
           if (retry < maxRetries) {
@@ -680,21 +675,19 @@ public class SimpleCopyListing extends CopyListing {
   }
 
   private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
-      LinkedList<CopyListingFileStatus> fileStatus, Path sourcePathRoot,
+      CopyListingFileStatus fileStatus, Path sourcePathRoot,
       DistCpOptions options) throws IOException {
     boolean syncOrOverwrite = options.shouldSyncFolder() ||
         options.shouldOverwrite();
-    for (CopyListingFileStatus fs : fileStatus) {
-      if (fs.getPath().equals(sourcePathRoot) &&
-          fs.isDirectory() && syncOrOverwrite) {
-        // Skip the root-paths when syncOrOverwrite
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skip " + fs.getPath());
-        }
-        return;
-      }
-      writeToFileListing(fileListWriter, fs, sourcePathRoot);
+    if (fileStatus.getPath().equals(sourcePathRoot) && 
+        fileStatus.isDirectory() && syncOrOverwrite) {
+      // Skip the root-paths when syncOrOverwrite
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip " + fileStatus.getPath());
+      }      
+      return;
     }
+    writeToFileListing(fileListWriter, fileStatus, sourcePathRoot);
   }
 
   private void writeToFileListing(SequenceFile.Writer fileListWriter,
@@ -714,7 +707,7 @@ public class SimpleCopyListing extends CopyListing {
     fileListWriter.sync();
 
     if (!fileStatus.isDirectory()) {
-      totalBytesToCopy += fileStatus.getSizeToCopy();
+      totalBytesToCopy += fileStatus.getLen();
     } else {
       totalDirs++;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index 6ddaab9..75cefb4 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -35,17 +34,14 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.tools.CopyListing;
 import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpConstants;
-import org.apache.hadoop.tools.DistCpOptionSwitch;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.GlobbedCopyListing;
 import org.apache.hadoop.tools.util.DistCpUtils;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -67,8 +63,7 @@ public class CopyCommitter extends FileOutputCommitter {
   private boolean syncFolder = false;
   private boolean overwrite = false;
   private boolean targetPathExists = true;
-  private boolean ignoreFailures = false;
-
+  
   /**
    * Create a output committer
    *
@@ -87,13 +82,8 @@ public class CopyCommitter extends FileOutputCommitter {
     Configuration conf = jobContext.getConfiguration();
     syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false);
     overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false);
-    targetPathExists = conf.getBoolean(
-        DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true);
-    ignoreFailures = conf.getBoolean(
-        DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
-
-    concatFileChunks(conf);
-
+    targetPathExists = conf.getBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true);
+    
     super.commitJob(jobContext);
 
     cleanupTempFiles(jobContext);
@@ -179,112 +169,9 @@ public class CopyCommitter extends FileOutputCommitter {
     }
   }
 
-  private boolean isFileNotFoundException(IOException e) {
-    if (e instanceof FileNotFoundException) {
-      return true;
-    }
-
-    if (e instanceof RemoteException) {
-      return ((RemoteException)e).unwrapRemoteException()
-          instanceof FileNotFoundException;
-    }
-
-    return false;
-  }
-
-  /**
-   * Concat chunk files for the same file into one.
-   * Iterate through copy listing, identify chunk files for the same file,
-   * concat them into one.
-   */
-  private void concatFileChunks(Configuration conf) throws IOException {
-
-    LOG.info("concat file chunks ...");
-
-    String spath = conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH);
-    if (spath == null || spath.isEmpty()) {
-      return;
-    }
-    Path sourceListing = new Path(spath);
-    SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
-                                      SequenceFile.Reader.file(sourceListing));
-    Path targetRoot =
-        new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
-
-    try {
-      CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
-      Text srcRelPath = new Text();
-      CopyListingFileStatus lastFileStatus = null;
-      LinkedList<Path> allChunkPaths = new LinkedList<Path>();
-
-      // Iterate over every source path that was copied.
-      while (sourceReader.next(srcRelPath, srcFileStatus)) {
-        if (srcFileStatus.isDirectory()) {
-          continue;
-        }
-        Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
-        Path targetFileChunkPath =
-            DistCpUtils.getSplitChunkPath(targetFile, srcFileStatus);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("  add " + targetFileChunkPath + " to concat.");
-        }
-        allChunkPaths.add(targetFileChunkPath);
-        if (srcFileStatus.getChunkOffset() + srcFileStatus.getChunkLength()
-            == srcFileStatus.getLen()) {
-          // This is the last chunk of the splits, consolidate allChunkPaths
-          try {
-            concatFileChunks(conf, targetFile, allChunkPaths);
-          } catch (IOException e) {
-            // If the concat failed because a chunk file doesn't exist,
-            // then we assume that the CopyMapper has skipped copying this
-            // file, and we ignore the exception here.
-            // If a chunk file should have been created but it was not, then
-            // the CopyMapper would have failed.
-            if (!isFileNotFoundException(e)) {
-              String emsg = "Failed to concat chunk files for " + targetFile;
-              if (!ignoreFailures) {
-                throw new IOException(emsg, e);
-              } else {
-                LOG.warn(emsg, e);
-              }
-            }
-          }
-          allChunkPaths.clear();
-          lastFileStatus = null;
-        } else {
-          if (lastFileStatus == null) {
-            lastFileStatus = new CopyListingFileStatus(srcFileStatus);
-          } else {
-            // Two neighboring chunks have to be consecutive ones for the same
-            // file, for them to be merged
-            if (!srcFileStatus.getPath().equals(lastFileStatus.getPath()) ||
-                srcFileStatus.getChunkOffset() !=
-                (lastFileStatus.getChunkOffset() +
-                lastFileStatus.getChunkLength())) {
-              String emsg = "Inconsistent sequence file: current " +
-                  "chunk file " + srcFileStatus + " doesnt match prior " +
-                  "entry " + lastFileStatus;
-              if (!ignoreFailures) {
-                throw new IOException(emsg);
-              } else {
-                LOG.warn(emsg + ", skipping concat this set.");
-              }
-            } else {
-              lastFileStatus.setChunkOffset(srcFileStatus.getChunkOffset());
-              lastFileStatus.setChunkLength(srcFileStatus.getChunkLength());
-            }
-          }
-        }
-      }
-    } finally {
-      IOUtils.closeStream(sourceReader);
-    }
-  }
-
   // This method changes the target-directories' file-attributes (owner,
   // user/group permissions, etc.) based on the corresponding source directories.
-  private void preserveFileAttributesForDirectories(Configuration conf)
-      throws IOException {
+  private void preserveFileAttributesForDirectories(Configuration conf) throws IOException {
     String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
     final boolean syncOrOverwrite = syncFolder || overwrite;
 
@@ -438,57 +325,4 @@ public class CopyCommitter extends FileOutputCommitter {
         ", Unable to move to " + finalDir);
     }
   }
-
-  /**
-   * Concat the passed chunk files into one and rename it the targetFile.
-   */
-  private void concatFileChunks(Configuration conf, Path targetFile,
-      LinkedList<Path> allChunkPaths) throws IOException {
-    if (allChunkPaths.size() == 1) {
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("concat " + targetFile + " allChunkSize+ "
-          + allChunkPaths.size());
-    }
-    FileSystem dstfs = targetFile.getFileSystem(conf);
-
-    Path firstChunkFile = allChunkPaths.removeFirst();
-    Path[] restChunkFiles = new Path[allChunkPaths.size()];
-    allChunkPaths.toArray(restChunkFiles);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("concat: firstchunk: " + dstfs.getFileStatus(firstChunkFile));
-      int i = 0;
-      for (Path f : restChunkFiles) {
-        LOG.debug("concat: other chunk: " + i + ": " + dstfs.getFileStatus(f));
-        ++i;
-      }
-    }
-    dstfs.concat(firstChunkFile, restChunkFiles);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile));
-    }
-    rename(dstfs, firstChunkFile, targetFile);
-  }
-
-  /**
-   * Rename tmp to dst on destFileSys.
-   * @param destFileSys the file ssystem
-   * @param tmp the source path
-   * @param dst the destination path
-   * @throws IOException if renaming failed
-   */
-  private static void rename(FileSystem destFileSys, Path tmp, Path dst)
-      throws IOException {
-    try {
-      if (destFileSys.exists(dst)) {
-        destFileSys.delete(dst, true);
-      }
-      destFileSys.rename(tmp, dst);
-    } catch (IOException ioe) {
-      throw new IOException("Fail to rename tmp file (=" + tmp
-          + ") to destination file (=" + dst + ")", ioe);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
index 53a95ee..41c5d78 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
@@ -219,12 +219,10 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
         sourceFS = sourcePath.getFileSystem(conf);
         final boolean preserveXAttrs =
             fileAttributes.contains(FileAttribute.XATTR);
-        sourceCurrStatus = DistCpUtils.toCopyListingFileStatusHelper(sourceFS,
-            sourceFS.getFileStatus(sourcePath),
-            fileAttributes.contains(FileAttribute.ACL),
-            preserveXAttrs, preserveRawXattrs,
-            sourceFileStatus.getChunkOffset(),
-            sourceFileStatus.getChunkLength());
+        sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
+          sourceFS.getFileStatus(sourcePath),
+          fileAttributes.contains(FileAttribute.ACL), 
+          preserveXAttrs, preserveRawXattrs);
       } catch (FileNotFoundException e) {
         throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
       }
@@ -238,8 +236,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
           LOG.debug("Path could not be found: " + target, ignore);
       }
 
-      if (targetStatus != null &&
-          (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
+      if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
         throw new IOException("Can't replace " + target + ". Target is " +
             getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
       }
@@ -249,28 +246,19 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
         return;
       }
 
-      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target,
-          targetStatus);
-
-      Path tmpTarget = target;
+      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target, targetStatus);
       if (action == FileAction.SKIP) {
         LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
                  + " to " + target);
         updateSkipCounters(context, sourceCurrStatus);
         context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
-
       } else {
-        if (sourceCurrStatus.isSplit()) {
-          tmpTarget = DistCpUtils.getSplitChunkPath(target, sourceCurrStatus);
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("copying " + sourceCurrStatus + " " + tmpTarget);
-        }
-        copyFileWithRetry(description, sourceCurrStatus, tmpTarget, context,
+        copyFileWithRetry(description, sourceCurrStatus, target, context,
             action, fileAttributes);
       }
-      DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget,
-          sourceCurrStatus, fileAttributes, preserveRawXattrs);
+
+      DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
+          fileAttributes, preserveRawXattrs);
     } catch (IOException exception) {
       handleFailures(exception, sourceFileStatus, target, context);
     }
@@ -335,12 +323,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
   private void handleFailures(IOException exception,
       CopyListingFileStatus sourceFileStatus, Path target, Context context)
       throws IOException, InterruptedException {
-    LOG.error("Failure in copying " + sourceFileStatus.getPath() +
-        (sourceFileStatus.isSplit()? ","
-            + " offset=" + sourceFileStatus.getChunkOffset()
-            + " chunkLength=" + sourceFileStatus.getChunkLength()
-            : "") +
-        " to " + target, exception);
+    LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " +
+                target, exception);
 
     if (ignoreFailures &&
         ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 58a51af..1777364 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -118,21 +118,17 @@ public class RetriableFileCopyCommand extends RetriableCommand {
           .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
           .getFileChecksum(sourcePath) : null;
 
-      long offset = (action == FileAction.APPEND) ?
-          targetFS.getFileStatus(target).getLen() : source.getChunkOffset();
+      final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
+          target).getLen() : 0;
       long bytesRead = copyToFile(targetPath, targetFS, source,
           offset, context, fileAttributes, sourceChecksum);
 
-      if (!source.isSplit()) {
-        compareFileLengths(source, targetPath, configuration, bytesRead
-            + offset);
-      }
+      compareFileLengths(source, targetPath, configuration, bytesRead
+          + offset);
       //At this point, src&dest lengths are same. if length==0, we skip checksum
       if ((bytesRead != 0) && (!skipCrc)) {
-        if (!source.isSplit()) {
-          compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
-              targetFS, targetPath);
-        }
+        compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
+            targetFS, targetPath);
       }
       // it's not append case, thus we first write to a temporary file, rename
       // it to the target path.
@@ -250,26 +246,16 @@ public class RetriableFileCopyCommand extends RetriableCommand {
     ThrottledInputStream inStream = null;
     long totalBytesRead = 0;
 
-    long chunkLength = source2.getChunkLength();
-    boolean finished = false;
     try {
       inStream = getInputStream(source, context.getConfiguration());
       int bytesRead = readBytes(inStream, buf, sourceOffset);
       while (bytesRead >= 0) {
-        if (chunkLength > 0 &&
-            (totalBytesRead + bytesRead) >= chunkLength) {
-          bytesRead = (int)(chunkLength - totalBytesRead);
-          finished = true;
-        }
         totalBytesRead += bytesRead;
         if (action == FileAction.APPEND) {
           sourceOffset += bytesRead;
         }
         outStream.write(buf, 0, bytesRead);
         updateContextStatus(totalBytesRead, context, source2);
-        if (finished) {
-          break;
-        }
         bytesRead = readBytes(inStream, buf, sourceOffset);
       }
       outStream.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
index d1c18ea..3e86d09 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
@@ -99,8 +99,7 @@ public class UniformSizeInputFormat
       while (reader.next(srcRelPath, srcFileStatus)) {
         // If adding the current file would cause the bytes per map to exceed
         // limit. Add the current file to new split
-        if (currentSplitSize + srcFileStatus.getChunkLength() > nBytesPerSplit
-            && lastPosition != 0) {
+        if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) {
           FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
               lastPosition - lastSplitStart, null);
           if (LOG.isDebugEnabled()) {
@@ -110,7 +109,7 @@ public class UniformSizeInputFormat
           lastSplitStart = lastPosition;
           currentSplitSize = 0;
         }
-        currentSplitSize += srcFileStatus.getChunkLength();
+        currentSplitSize += srcFileStatus.getLen();
         lastPosition = reader.getPosition();
       }
       if (lastPosition > lastSplitStart) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
index 29715bb..c308e6f 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
@@ -19,11 +19,9 @@
 package org.apache.hadoop.tools.util;
 
 import com.google.common.collect.Maps;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,7 +30,6 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclUtil;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -47,7 +44,6 @@ import org.apache.hadoop.util.StringUtils;
 import java.io.IOException;
 import java.text.DecimalFormat;
 import java.util.EnumSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -120,7 +116,7 @@ public class DistCpUtils {
    * @return Class implementing the strategy specified in options.
    */
   public static Class<? extends InputFormat> getStrategy(Configuration conf,
-      DistCpOptions options) {
+                                                                 DistCpOptions options) {
     String confLabel = "distcp."
         + StringUtils.toLowerCase(options.getCopyStrategy())
         + ".strategy" + ".impl";
@@ -297,86 +293,6 @@ public class DistCpUtils {
   }
 
   /**
-   * Converts FileStatus to a list of CopyListingFileStatus.
-   * The resulted list contains either one CopyListingFileStatus per chunk of
-   * file-blocks (if file-size exceeds blockSize * blocksPerChunk, and there
-   * are more blocks in the file than blocksperChunk), or a single
-   * CopyListingFileStatus for the entire file (if file-size is too small to
-   * split).
-   * If preserving ACLs, populates the CopyListingFileStatus with the ACLs.
-   * If preserving XAttrs, populates the CopyListingFileStatus with the XAttrs.
-   *
-   * @param fileSystem FileSystem containing the file
-   * @param fileStatus FileStatus of file
-   * @param preserveAcls boolean true if preserving ACLs
-   * @param preserveXAttrs boolean true if preserving XAttrs
-   * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs
-   * @param blocksPerChunk size of chunks when copying chunks in parallel
-   * @return list of CopyListingFileStatus
-   * @throws IOException if there is an I/O error
-   */
-  public static LinkedList<CopyListingFileStatus> toCopyListingFileStatus(
-      FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls,
-      boolean preserveXAttrs, boolean preserveRawXAttrs, int blocksPerChunk)
-          throws IOException {
-    LinkedList<CopyListingFileStatus> copyListingFileStatus =
-        new LinkedList<CopyListingFileStatus>();
-
-    final CopyListingFileStatus clfs = toCopyListingFileStatusHelper(
-        fileSystem, fileStatus, preserveAcls,
-        preserveXAttrs, preserveRawXAttrs,
-        0, fileStatus.getLen());
-    final long blockSize = fileStatus.getBlockSize();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("toCopyListing: " + fileStatus + " chunkSize: "
-          + blocksPerChunk + " isDFS: " +
-          (fileSystem instanceof DistributedFileSystem));
-    }
-    if ((blocksPerChunk > 0) &&
-        !fileStatus.isDirectory() &&
-        (fileStatus.getLen() > blockSize * blocksPerChunk)) {
-      // split only when the file size is larger than the intended chunk size
-      final BlockLocation[] blockLocations;
-      blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0,
-            fileStatus.getLen());
-
-      int numBlocks = blockLocations.length;
-      long curPos = 0;
-      if (numBlocks <= blocksPerChunk) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("  add file " + clfs);
-        }
-        copyListingFileStatus.add(clfs);
-      } else {
-        int i = 0;
-        while (i < numBlocks) {
-          long curLength = 0;
-          for (int j = 0; j < blocksPerChunk && i < numBlocks; ++j, ++i) {
-            curLength += blockLocations[i].getLength();
-          }
-          if (curLength > 0) {
-            CopyListingFileStatus clfs1 = new CopyListingFileStatus(clfs);
-            clfs1.setChunkOffset(curPos);
-            clfs1.setChunkLength(curLength);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("  add file chunk " + clfs1);
-            }
-            copyListingFileStatus.add(clfs1);
-            curPos += curLength;
-          }
-        }
-      }
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("  add file/dir " + clfs);
-      }
-      copyListingFileStatus.add(clfs);
-    }
-
-    return copyListingFileStatus;
-  }
-
-  /**
    * Converts a FileStatus to a CopyListingFileStatus.  If preserving ACLs,
    * populates the CopyListingFileStatus with the ACLs. If preserving XAttrs,
    * populates the CopyListingFileStatus with the XAttrs.
@@ -386,17 +302,13 @@ public class DistCpUtils {
    * @param preserveAcls boolean true if preserving ACLs
    * @param preserveXAttrs boolean true if preserving XAttrs
    * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs
-   * @param chunkOffset chunk offset in bytes
-   * @param chunkLength chunk length in bytes
-   * @return CopyListingFileStatus
    * @throws IOException if there is an I/O error
    */
-  public static CopyListingFileStatus toCopyListingFileStatusHelper(
+  public static CopyListingFileStatus toCopyListingFileStatus(
       FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, 
-      boolean preserveXAttrs, boolean preserveRawXAttrs,
-      long chunkOffset, long chunkLength) throws IOException {
+      boolean preserveXAttrs, boolean preserveRawXAttrs) throws IOException {
     CopyListingFileStatus copyListingFileStatus =
-        new CopyListingFileStatus(fileStatus, chunkOffset, chunkLength);
+      new CopyListingFileStatus(fileStatus);
     if (preserveAcls) {
       FsPermission perm = fileStatus.getPermission();
       if (perm.getAclBit()) {
@@ -553,19 +465,4 @@ public class DistCpUtils {
     return (sourceChecksum == null || targetChecksum == null ||
             sourceChecksum.equals(targetChecksum));
   }
-
-  /*
-   * Return the Path for a given chunk.
-   * Used when splitting large file into chunks to copy in parallel.
-   * @param targetFile path to target file
-   * @param srcFileStatus source file status in copy listing
-   * @return path to the chunk specified by the parameters to store
-   * in target cluster temporarily
-   */
-  public static Path getSplitChunkPath(Path targetFile,
-      CopyListingFileStatus srcFileStatus) {
-    return new Path(targetFile.toString()
-        + ".____distcpSplit____" + srcFileStatus.getChunkOffset()
-        + "." + srcFileStatus.getChunkLength());
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index e6cff10..d153485 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -239,7 +239,6 @@ Flag              | Description                          | Notes
 `-rdiff <newSnapshot> <oldSnapshot>` | Use snapshot diff report between given two snapshots to identify what has been changed on the target since the snapshot `<oldSnapshot>` was created on the target, and apply the diff reversely to the target, and copy modified files from the source's `<oldSnapshot>`, to make the target the same as `<oldSnapshot>`. | This option is valid only with `-update` option and the following conditions should be satisfied. <ol><li>Both the source and the target FileSystem must be DistributedFileSystem. The source and the target can be two different clusters/paths, or they can be exactly the same cluster/path. In the latter case, modified files are copied from target's `<oldSnapshot>` to target's current state).</li>  <li> Two snapshots `<newSnapshot>` and `<oldSnapshot>` have been created on the target FS, and `<oldSnapshot>` is older than `<newSnapshot>`. No change has been made on target since `<newSnapshot>` was created on the target. </li> <li> The sour
 ce has the same snapshot `<oldSnapshot>`, which has the same content as the `<oldSnapshot>` on the target. All the files/directories in the target's `<oldSnapshot>` are the same with source's `<oldSnapshot>`.</li> </ol> |
 `-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads.
 `-skipcrccheck` | Whether to skip CRC checks between source and target paths. |
-`-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
 
 Architecture of DistCp
 ----------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
index b2266b3..e3018a0 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
@@ -23,27 +23,17 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.PrintStream;
+import java.io.OutputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -57,15 +47,11 @@ import org.junit.rules.Timeout;
  */
 
 public class TestDistCpSystem {
-  private static final Log LOG =
-      LogFactory.getLog(TestDistCpSystem.class);
-
   @Rule
   public Timeout globalTimeout = new Timeout(30000);
 
   private static final String SRCDAT = "srcdat";
   private static final String DSTDAT = "dstdat";
-  private static final long BLOCK_SIZE = 1024;
 
   private static MiniDFSCluster cluster;
   private static Configuration conf;
@@ -77,76 +63,27 @@ public class TestDistCpSystem {
       this.path = path;
       this.isDir = isDir;
     }
-
-    String getPath() {
-      return path;
-    }
-
-    boolean isDirectory() {
-      return isDir;
-    }
-  }
-
-  @BeforeClass
-  public static void beforeClass() throws IOException {
-    conf = new Configuration();
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    cluster.waitActive();
-  }
-
-  @AfterClass
-  public static void afterClass() throws IOException {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  static String execCmd(FsShell shell, String... args) throws Exception {
-    ByteArrayOutputStream baout = new ByteArrayOutputStream();
-    PrintStream out = new PrintStream(baout, true);
-    PrintStream old = System.out;
-    System.setOut(out);
-    shell.run(args);
-    out.close();
-    System.setOut(old);
-    return baout.toString();
+    String getPath() { return path; }
+    boolean isDirectory() { return isDir; }
   }
   
-  private void createFiles(DistributedFileSystem fs, String topdir,
-      FileEntry[] entries, long chunkSize) throws IOException {
-    long seed = System.currentTimeMillis();
-    Random rand = new Random(seed);
-    short replicationFactor = 2;
+  private void createFiles(FileSystem fs, String topdir,
+      FileEntry[] entries) throws IOException {
     for (FileEntry entry : entries) {
-      Path newPath = new Path(topdir + "/" + entry.getPath());
+      Path newpath = new Path(topdir + "/" + entry.getPath());
       if (entry.isDirectory()) {
-        fs.mkdirs(newPath);
+        fs.mkdirs(newpath);
       } else {
-        long fileSize = BLOCK_SIZE *100;
-        int bufSize = 128;
-        if (chunkSize == -1) {
-          DFSTestUtil.createFile(fs, newPath, bufSize,
-              fileSize, BLOCK_SIZE, replicationFactor, seed);
-        } else {
-          // Create a variable length block file, by creating
-          // one block of half block size at the chunk boundary
-          long seg1 = chunkSize * BLOCK_SIZE - BLOCK_SIZE / 2;
-          long seg2 = fileSize - seg1;
-          DFSTestUtil.createFile(fs, newPath, bufSize,
-              seg1, BLOCK_SIZE, replicationFactor, seed);
-          DFSTestUtil.appendFileNewBlock(fs, newPath, (int)seg2);
+        OutputStream out = fs.create(newpath);
+        try {
+          out.write((topdir + "/" + entry).getBytes());
+          out.write("\n".getBytes());
+        } finally {
+          out.close();
         }
       }
-      seed = System.currentTimeMillis() + rand.nextLong();
     }
   }
-
-  private void createFiles(DistributedFileSystem fs, String topdir,
-      FileEntry[] entries) throws IOException {
-    createFiles(fs, topdir, entries, -1);
-  }
    
   private static FileStatus[] getFileStatus(FileSystem fs,
       String topdir, FileEntry[] files) throws IOException {
@@ -167,19 +104,18 @@ public class TestDistCpSystem {
   }
 
   private void testPreserveUserHelper(String testRoot,
-      FileEntry[] srcEntries,
-      FileEntry[] dstEntries,
-      boolean createSrcDir,
-      boolean createTgtDir,
-      boolean update) throws Exception {
+                                      FileEntry[] srcEntries,
+                                      FileEntry[] dstEntries,
+                                      boolean createSrcDir,
+                                      boolean createTgtDir,
+                                      boolean update) throws Exception {
     final String testSrcRel = SRCDAT;
     final String testSrc = testRoot + "/" + testSrcRel;
     final String testDstRel = DSTDAT;
     final String testDst = testRoot + "/" + testDstRel;
 
     String nnUri = FileSystem.getDefaultUri(conf).toString();
-    DistributedFileSystem fs = (DistributedFileSystem)
-        FileSystem.get(URI.create(nnUri), conf);
+    FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
     fs.mkdirs(new Path(testRoot));
     if (createSrcDir) {
       fs.mkdirs(new Path(testSrc));
@@ -193,8 +129,8 @@ public class TestDistCpSystem {
     for(int i = 0; i < srcEntries.length; i++) {
       fs.setOwner(srcstats[i].getPath(), "u" + i, null);
     }
-    String[] args = update? new String[]{"-pub", "-update", nnUri+testSrc,
-        nnUri+testDst} : new String[]{"-pub", nnUri+testSrc, nnUri+testDst};
+    String[] args = update? new String[]{"-pu", "-update", nnUri+testSrc,
+        nnUri+testDst} : new String[]{"-pu", nnUri+testSrc, nnUri+testDst};
 
     ToolRunner.run(conf, new DistCp(), args);
 
@@ -209,261 +145,18 @@ public class TestDistCpSystem {
     deldir(fs, testRoot);
   }
 
-  private void compareFiles(FileSystem fs, FileStatus srcStat,
-      FileStatus dstStat) throws Exception {
-    LOG.info("Comparing " + srcStat + " and " + dstStat);
-    assertEquals(srcStat.isDirectory(), dstStat.isDirectory());
-    assertEquals(srcStat.getReplication(), dstStat.getReplication());
-    assertEquals("File POSIX permission should match",
-        srcStat.getPermission(), dstStat.getPermission());
-    assertEquals("File user ownership should match",
-        srcStat.getOwner(), dstStat.getOwner());
-    assertEquals("File group ownership should match",
-        srcStat.getGroup(), dstStat.getGroup());
-    // TODO; check ACL attributes
-
-    if (srcStat.isDirectory()) {
-      return;
-    }
-
-    assertEquals("File length should match (" + srcStat.getPath() + ")",
-        srcStat.getLen(), dstStat.getLen());
-
-    FSDataInputStream srcIn = fs.open(srcStat.getPath());
-    FSDataInputStream dstIn = fs.open(dstStat.getPath());
-    try {
-      byte[] readSrc = new byte[(int)
-                                HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT];
-      byte[] readDst = new byte[(int)
-                                HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT];
-
-      int srcBytesRead = 0, tgtBytesRead = 0;
-      int srcIdx = 0, tgtIdx = 0;
-      long totalComparedBytes = 0;
-      while (true) {
-        if (srcBytesRead == 0) {
-          srcBytesRead = srcIn.read(readSrc);
-          srcIdx = 0;
-        }
-        if (tgtBytesRead == 0) {
-          tgtBytesRead = dstIn.read(readDst);
-          tgtIdx = 0;
-        }
-        if (srcBytesRead == 0 || tgtBytesRead == 0) {
-          LOG.info("______ compared src and dst files for "
-              + totalComparedBytes + " bytes, content match.");
-          if (srcBytesRead != tgtBytesRead) {
-            Assert.fail("Read mismatching size, compared "
-                + totalComparedBytes + " bytes between src and dst file "
-                + srcStat + " and " + dstStat);
-          }
-          if (totalComparedBytes != srcStat.getLen()) {
-            Assert.fail("Only read/compared " + totalComparedBytes +
-                " bytes between src and dst file " + srcStat +
-                " and " + dstStat);
-          } else {
-            // success
-            break;
-          }
-        }
-        for (; srcIdx < srcBytesRead && tgtIdx < tgtBytesRead;
-            ++srcIdx, ++tgtIdx) {
-          if (readSrc[srcIdx] != readDst[tgtIdx]) {
-            Assert.fail("src and dst file does not match at "
-                + totalComparedBytes + " between "
-                + srcStat + " and " + dstStat);
-          }
-          ++totalComparedBytes;
-        }
-        LOG.info("______ compared src and dst files for "
-            + totalComparedBytes + " bytes, content match. FileLength: "
-            + srcStat.getLen());
-        if (totalComparedBytes == srcStat.getLen()) {
-          LOG.info("______ Final:" + srcIdx + " "
-              + srcBytesRead + " " + tgtIdx + " " + tgtBytesRead);
-          break;
-        }
-        if (srcIdx == srcBytesRead) {
-          srcBytesRead = 0;
-        }
-        if (tgtIdx == tgtBytesRead) {
-          tgtBytesRead = 0;
-        }
-      }
-    } finally {
-      if (srcIn != null) {
-        srcIn.close();
-      }
-      if (dstIn != null) {
-        dstIn.close();
-      }
-    }
-  }
-
-  // WC: needed because the current distcp does not create target dirs
-  private void createDestDir(FileSystem fs, String testDst,
-      FileStatus[] srcStats, FileEntry[] srcFiles) throws IOException {
-    fs.mkdirs(new Path(testDst));
-
-    for (int i=0; i<srcStats.length; i++) {
-      FileStatus srcStat = srcStats[i];
-      if (srcStat.isDirectory()) {
-        Path dstPath = new Path(testDst, srcFiles[i].getPath());
-        fs.mkdirs(dstPath);
-        fs.setOwner(dstPath, srcStat.getOwner(), srcStat.getGroup());
-      }
-    }
-  }
-
-  private void copyAndVerify(final DistributedFileSystem fs,
-      final FileEntry[] srcFiles, final FileStatus[] srcStats,
-      final String testDst,
-      final String[] args) throws Exception {
-    final String testRoot = "/testdir";
-    FsShell shell = new FsShell(fs.getConf());
-
-    LOG.info("ls before distcp");
-    LOG.info(execCmd(shell, "-lsr", testRoot));
-
-    LOG.info("_____ running distcp: " + args[0] + " " + args[1]);
-    ToolRunner.run(conf, new DistCp(), args);
-
-    LOG.info("ls after distcp");
-    LOG.info(execCmd(shell, "-lsr", testRoot));
-
-    FileStatus[] dstStat = getFileStatus(fs, testDst, srcFiles);
-    for (int i=0; i< dstStat.length; i++) {
-      compareFiles(fs, srcStats[i], dstStat[i]);
-    }
-  }
-
-  private void chunkCopy(FileEntry[] srcFiles) throws Exception {
-    final String testRoot = "/testdir";
-    final String testSrcRel = SRCDAT;
-    final String testSrc = testRoot + "/" + testSrcRel;
-    final String testDstRel = DSTDAT;
-    final String testDst = testRoot + "/" + testDstRel;
-    long chunkSize = 8;
-
-    String nnUri = FileSystem.getDefaultUri(conf).toString();
-    DistributedFileSystem fs = (DistributedFileSystem)
-        FileSystem.get(URI.create(nnUri), conf);
-
-    createFiles(fs, testRoot, srcFiles, chunkSize);
-
-    FileStatus[] srcStats = getFileStatus(fs, testRoot, srcFiles);
-    for (int i = 0; i < srcFiles.length; i++) {
-      fs.setOwner(srcStats[i].getPath(), "u" + i,  "g" + i);
-    }
-    // get file status after updating owners
-    srcStats = getFileStatus(fs, testRoot, srcFiles);
-
-    createDestDir(fs, testDst, srcStats, srcFiles);
-
-    String[] args = new String[] {"-pugp", "-blocksperchunk",
-        String.valueOf(chunkSize),
-        nnUri + testSrc, nnUri + testDst};
-
-    copyAndVerify(fs, srcFiles, srcStats, testDst, args);
-    // Do it again
-    copyAndVerify(fs, srcFiles, srcStats, testDst, args);
-
-    // modify last file and rerun distcp with -update option
-    LOG.info("Modify a file and copy again");
-    for(int i=srcFiles.length-1; i >=0; --i) {
-      if (!srcFiles[i].isDirectory()) {
-        LOG.info("Modifying " + srcStats[i].getPath());
-        DFSTestUtil.appendFileNewBlock(fs, srcStats[i].getPath(),
-            (int)BLOCK_SIZE * 3);
-        break;
-      }
-    }
-    // get file status after modifying file
-    srcStats = getFileStatus(fs, testRoot, srcFiles);
-
-    args = new String[] {"-pugp", "-update", "-blocksperchunk",
-        String.valueOf(chunkSize),
-        nnUri + testSrc, nnUri + testDst + "/" + testSrcRel};
-
-    copyAndVerify(fs, srcFiles, srcStats, testDst, args);
-
-    deldir(fs, testRoot);
-  }
-
-  @Test
-  public void testRecursiveChunkCopy() throws Exception {
-    FileEntry[] srcFiles = {
-        new FileEntry(SRCDAT, true),
-        new FileEntry(SRCDAT + "/file0", false),
-        new FileEntry(SRCDAT + "/dir1", true),
-        new FileEntry(SRCDAT + "/dir2", true),
-        new FileEntry(SRCDAT + "/dir1/file1", false)
-    };
-    chunkCopy(srcFiles);
-  }
-
-  @Test
-  public void testChunkCopyOneFile() throws Exception {
-    FileEntry[] srcFiles = {
-        new FileEntry(SRCDAT, true),
-        new FileEntry(SRCDAT + "/file0", false)
-    };
-    chunkCopy(srcFiles);
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
   }
 
-  @Test
-  public void testDistcpLargeFile() throws Exception {
-    FileEntry[] srcfiles = {
-        new FileEntry(SRCDAT, true),
-        new FileEntry(SRCDAT + "/file", false)
-    };
-
-    final String testRoot = "/testdir";
-    final String testSrcRel = SRCDAT;
-    final String testSrc = testRoot + "/" + testSrcRel;
-    final String testDstRel = DSTDAT;
-    final String testDst = testRoot + "/" + testDstRel;
-
-    String nnUri = FileSystem.getDefaultUri(conf).toString();
-    DistributedFileSystem fs =
-        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
-    fs.mkdirs(new Path(testRoot));
-    fs.mkdirs(new Path(testSrc));
-    fs.mkdirs(new Path(testDst));
-    long chunkSize = 6;
-    createFiles(fs, testRoot, srcfiles, chunkSize);
-
-    String srcFileName = testRoot + Path.SEPARATOR + srcfiles[1].getPath();
-    Path srcfile = new Path(srcFileName);
-
-    if(!cluster.getFileSystem().exists(srcfile)){
-      throw new Exception("src not exist");
-    }
-
-    final long srcLen = fs.getFileStatus(srcfile).getLen();
-
-    FileStatus[] srcstats = getFileStatus(fs, testRoot, srcfiles);
-    for (int i = 0; i < srcfiles.length; i++) {
-      fs.setOwner(srcstats[i].getPath(), "u" + i, null);
+  @AfterClass
+  public static void afterClass() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
     }
-    String[] args = new String[] {
-        "-blocksperchunk",
-        String.valueOf(chunkSize),
-        nnUri + testSrc,
-        nnUri + testDst
-    };
-
-    LOG.info("_____ running distcp: " + args[0] + " " + args[1]);
-    ToolRunner.run(conf, new DistCp(), args);
-
-    String realTgtPath = testDst;
-    FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles);
-    assertEquals("File length should match", srcLen,
-        dststat[dststat.length - 1].getLen());
-
-    this.compareFiles(fs,  srcstats[srcstats.length-1],
-        dststat[dststat.length-1]);
-    deldir(fs, testRoot);
   }
 
   @Test
@@ -487,6 +180,7 @@ public class TestDistCpSystem {
     testPreserveUserHelper(testRoot, srcfiles, dstfiles, false, false, false);
   }
 
+
   @Test
   public void testPreserveUserEmptyDir() throws Exception {
     String testRoot = "/testdir." + getMethodName();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
index acffb76..35778d2 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
@@ -407,7 +407,7 @@ public class TestOptionsParser {
         + "copyStrategy='uniformsize', preserveStatus=[], "
         + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
         + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
-        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}";
+        + "targetPathExists=true, filtersFile='null'}";
     String optionString = option.toString();
     Assert.assertEquals(val, optionString);
     Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97317b1a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index 2452d6f..2e9a350 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -81,10 +81,6 @@ public class TestCopyCommitter {
   @Before
   public void createMetaFolder() {
     config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
-    // Unset listing file path since the config is shared by
-    // multiple tests, and some test doesn't set it, such as
-    // testNoCommitAction, but the distcp code will check it.
-    config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
     Path meta = new Path("/meta");
     try {
       cluster.getFileSystem().mkdirs(meta);
@@ -330,6 +326,7 @@ public class TestCopyCommitter {
       committer.commitJob(jobContext);
       Assert.assertFalse(fs.exists(new Path(workPath)));
       Assert.assertTrue(fs.exists(new Path(finalPath)));
+
     } catch (IOException e) {
       LOG.error("Exception encountered while testing for preserve status", e);
       Assert.fail("Atomic commit failure");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/4] hadoop git commit: HADOOP-11794. Enable distcp to copy blocks in parallel. Contributed by Yongjun Zhang, Wei-Chiu Chuang, Xiao Chen, Rosie Li.

Posted by yj...@apache.org.
HADOOP-11794. Enable distcp to copy blocks in parallel. Contributed by Yongjun Zhang, Wei-Chiu Chuang, Xiao Chen, Rosie Li.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/49dc5472
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/49dc5472
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/49dc5472

Branch: refs/heads/branch-2
Commit: 49dc5472529ecf58cadfd041b378b9f99b4b979f
Parents: 97317b1
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Fri Apr 14 10:14:02 2017 -0700
Committer: Yongjun Zhang <yz...@cloudera.com>
Committed: Wed May 24 18:54:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  22 +-
 .../org/apache/hadoop/tools/CopyListing.java    |  37 +-
 .../hadoop/tools/CopyListingFileStatus.java     |  87 ++++-
 .../java/org/apache/hadoop/tools/DistCp.java    |  52 +++
 .../apache/hadoop/tools/DistCpOptionSwitch.java |  10 +
 .../org/apache/hadoop/tools/DistCpOptions.java  |  22 +-
 .../org/apache/hadoop/tools/OptionsParser.java  |  36 +-
 .../apache/hadoop/tools/SimpleCopyListing.java  |  83 +++--
 .../hadoop/tools/mapred/CopyCommitter.java      | 174 ++++++++-
 .../apache/hadoop/tools/mapred/CopyMapper.java  |  40 +-
 .../tools/mapred/RetriableFileCopyCommand.java  |  26 +-
 .../tools/mapred/UniformSizeInputFormat.java    |   5 +-
 .../apache/hadoop/tools/util/DistCpUtils.java   | 111 +++++-
 .../src/site/markdown/DistCp.md.vm              |   1 +
 .../apache/hadoop/tools/TestDistCpOptions.java  |   2 +-
 .../apache/hadoop/tools/TestDistCpSystem.java   | 368 +++++++++++++++++--
 .../apache/hadoop/tools/TestOptionsParser.java  |   2 +-
 .../hadoop/tools/mapred/TestCopyCommitter.java  |   5 +-
 18 files changed, 972 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 033b81d..087d8f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -841,7 +841,27 @@ public class DFSTestUtil {
       out.write(toAppend);
     }
   }
-  
+
+  /**
+   * Append specified length of bytes to a given file, starting with new block.
+   * @param fs The file system
+   * @param p Path of the file to append
+   * @param length Length of bytes to append to the file
+   * @throws IOException
+   */
+  public static void appendFileNewBlock(DistributedFileSystem fs,
+      Path p, int length) throws IOException {
+    assert fs.exists(p);
+    assert length >= 0;
+    byte[] toAppend = new byte[length];
+    Random random = new Random();
+    random.nextBytes(toAppend);
+    try (FSDataOutputStream out = fs.append(p,
+        EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) {
+      out.write(toAppend);
+    }
+  }
+
   /**
    * @return url content as string (UTF-8 encoding assumed)
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
index 481aa61..9ebf9d2 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
@@ -145,12 +145,22 @@ public abstract class CopyListing extends Configured {
     Configuration config = getConf();
     FileSystem fs = pathToListFile.getFileSystem(config);
 
-    Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile);
+    final boolean splitLargeFile = options.splitLargeFile();
+
+    // When splitLargeFile is enabled, we don't randomize the copylist
+    // earlier, so we don't do the sorting here. For a file that has
+    // multiple entries due to split, we check here that their
+    // <chunkOffset, chunkLength> is continuous.
+    //
+    Path checkPath = splitLargeFile?
+        pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile);
 
     SequenceFile.Reader reader = new SequenceFile.Reader(
-                          config, SequenceFile.Reader.file(sortedList));
+                          config, SequenceFile.Reader.file(checkPath));
     try {
       Text lastKey = new Text("*"); //source relative path can never hold *
+      long lastChunkOffset = -1;
+      long lastChunkLength = -1;
       CopyListingFileStatus lastFileStatus = new CopyListingFileStatus();
 
       Text currentKey = new Text();
@@ -161,8 +171,21 @@ public abstract class CopyListing extends Configured {
         if (currentKey.equals(lastKey)) {
           CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
           reader.getCurrentValue(currentFileStatus);
-          throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
-              currentFileStatus.getPath() + " would cause duplicates. Aborting");
+          if (!splitLargeFile) {
+            throw new DuplicateFileException("File " + lastFileStatus.getPath()
+                + " and " + currentFileStatus.getPath()
+                + " would cause duplicates. Aborting");
+          } else {
+            if (lastChunkOffset + lastChunkLength !=
+                currentFileStatus.getChunkOffset()) {
+              throw new InvalidInputException("File " + lastFileStatus.getPath()
+                  + " " + lastChunkOffset + "," + lastChunkLength
+                  + " and " + currentFileStatus.getPath()
+                  + " " + currentFileStatus.getChunkOffset() + ","
+                  + currentFileStatus.getChunkLength()
+                  + " are not continuous. Aborting");
+            }
+          }
         }
         reader.getCurrentValue(lastFileStatus);
         if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
@@ -181,8 +204,12 @@ public abstract class CopyListing extends Configured {
             xAttrSupportCheckFsSet.add(lastFsUri);
           }
         }
-        lastKey.set(currentKey);
 
+        lastKey.set(currentKey);
+        if (splitLargeFile) {
+          lastChunkOffset = lastFileStatus.getChunkOffset();
+          lastChunkLength = lastFileStatus.getChunkLength();
+        }
         if (options.shouldUseDiff() && LOG.isDebugEnabled()) {
           LOG.debug("Copy list entry " + idx + ": " +
                   lastFileStatus.getPath().toUri().getPath());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
index 2b1e7e4..5395fa9 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
@@ -74,6 +74,14 @@ public final class CopyListingFileStatus implements Writable {
   private List<AclEntry> aclEntries;
   private Map<String, byte[]> xAttrs;
 
+  // <chunkOffset, chunkLength> represents the offset and length of a file
+  // chunk in number of bytes.
+  // used when splitting a large file to chunks to copy in parallel.
+  // If a file is not large enough to split, chunkOffset would be 0 and
+  // chunkLength would be the length of the file.
+  private long chunkOffset = 0;
+  private long chunkLength = Long.MAX_VALUE;
+
   /**
    * Default constructor.
    */
@@ -96,11 +104,32 @@ public final class CopyListingFileStatus implements Writable {
         fileStatus.getPath());
   }
 
+  public CopyListingFileStatus(FileStatus fileStatus,
+      long chunkOffset, long chunkLength) {
+    this(fileStatus.getLen(), fileStatus.isDirectory(),
+        fileStatus.getReplication(), fileStatus.getBlockSize(),
+        fileStatus.getModificationTime(), fileStatus.getAccessTime(),
+        fileStatus.getPermission(), fileStatus.getOwner(),
+        fileStatus.getGroup(),
+        fileStatus.getPath());
+    this.chunkOffset = chunkOffset;
+    this.chunkLength = chunkLength;
+  }
+
   @SuppressWarnings("checkstyle:parameternumber")
   public CopyListingFileStatus(long length, boolean isdir,
       int blockReplication, long blocksize, long modificationTime,
       long accessTime, FsPermission permission, String owner, String group,
       Path path) {
+    this(length, isdir, blockReplication, blocksize, modificationTime,
+        accessTime, permission, owner, group, path, 0, Long.MAX_VALUE);
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  public CopyListingFileStatus(long length, boolean isdir,
+      int blockReplication, long blocksize, long modificationTime,
+      long accessTime, FsPermission permission, String owner, String group,
+      Path path, long chunkOffset, long chunkLength) {
     this.length = length;
     this.isdir = isdir;
     this.blockReplication = (short)blockReplication;
@@ -117,6 +146,23 @@ public final class CopyListingFileStatus implements Writable {
     this.owner = (owner == null) ? "" : owner;
     this.group = (group == null) ? "" : group;
     this.path = path;
+    this.chunkOffset = chunkOffset;
+    this.chunkLength = chunkLength;
+  }
+
+  public CopyListingFileStatus(CopyListingFileStatus other) {
+    this.length = other.length;
+    this.isdir = other.isdir;
+    this.blockReplication = other.blockReplication;
+    this.blocksize = other.blocksize;
+    this.modificationTime = other.modificationTime;
+    this.accessTime = other.accessTime;
+    this.permission = other.permission;
+    this.owner = other.owner;
+    this.group = other.group;
+    this.path = new Path(other.path.toUri());
+    this.chunkOffset = other.chunkOffset;
+    this.chunkLength = other.chunkLength;
   }
 
   public Path getPath() {
@@ -196,6 +242,31 @@ public final class CopyListingFileStatus implements Writable {
     this.xAttrs = xAttrs;
   }
 
+  public long getChunkOffset() {
+    return chunkOffset;
+  }
+
+  public void setChunkOffset(long offset) {
+    this.chunkOffset = offset;
+  }
+
+  public long getChunkLength() {
+    return chunkLength;
+  }
+
+  public void setChunkLength(long chunkLength) {
+    this.chunkLength = chunkLength;
+  }
+
+  public boolean isSplit() {
+    return getChunkLength() != Long.MAX_VALUE &&
+        getChunkLength() != getLen();
+  }
+
+  public long getSizeToCopy() {
+    return isSplit()? getChunkLength() : getLen();
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, getPath().toString(), Text.DEFAULT_MAX_LEN);
@@ -240,6 +311,9 @@ public final class CopyListingFileStatus implements Writable {
     } else {
       out.writeInt(NO_XATTRS);
     }
+
+    out.writeLong(chunkOffset);
+    out.writeLong(chunkLength);
   }
 
   @Override
@@ -288,6 +362,9 @@ public final class CopyListingFileStatus implements Writable {
     } else {
       xAttrs = null;
     }
+
+    chunkOffset = in.readLong();
+    chunkLength = in.readLong();
   }
 
   @Override
@@ -313,8 +390,14 @@ public final class CopyListingFileStatus implements Writable {
   public String toString() {
     StringBuilder sb = new StringBuilder(super.toString());
     sb.append('{');
-    sb.append("aclEntries = " + aclEntries);
-    sb.append(", xAttrs = " + xAttrs);
+    sb.append(this.getPath().toString());
+    sb.append(" length = ").append(this.getLen());
+    sb.append(" aclEntries = ").append(aclEntries);
+    sb.append(", xAttrs = ").append(xAttrs);
+    if (isSplit()) {
+      sb.append(", chunkOffset = ").append(this.getChunkOffset());
+      sb.append(", chunkLength = ").append(this.getChunkLength());
+    }
     sb.append('}');
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
index 7b0d9f2..ddf67ff 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.CopyListing.*;
 import org.apache.hadoop.tools.mapred.CopyMapper;
 import org.apache.hadoop.tools.mapred.CopyOutputFormat;
@@ -133,6 +134,7 @@ public class DistCp extends Configured implements Tool {
     
     try {
       inputOptions = (OptionsParser.parse(argv));
+      setOptionsForSplitLargeFile();
       setTargetPathExists();
       LOG.info("Input Options: " + inputOptions);
     } catch (Throwable e) {
@@ -234,6 +236,56 @@ public class DistCp extends Configured implements Tool {
     getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 
         targetExists);
   }
+
+  /**
+   * Check if concat is supported by fs.
+   * Throws UnsupportedOperationException if not.
+   */
+  private void checkConcatSupport(FileSystem fs) {
+    try {
+      Path[] src = null;
+      Path tgt = null;
+      fs.concat(tgt, src);
+    } catch (UnsupportedOperationException use) {
+      throw new UnsupportedOperationException(
+          DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() +
+          " is not supported since the target file system doesn't" +
+          " support concat.", use);
+    } catch (Exception e) {
+      // Ignore other exception
+    }
+  }
+
+  /**
+   * Set up needed options for splitting large files.
+   */
+  private void setOptionsForSplitLargeFile() throws IOException {
+    if (!inputOptions.splitLargeFile()) {
+      return;
+    }
+    Path target = inputOptions.getTargetPath();
+    FileSystem targetFS = target.getFileSystem(getConf());
+    checkConcatSupport(targetFS);
+
+    LOG.info("Enabling preserving blocksize since "
+        + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed.");
+    inputOptions.preserve(FileAttribute.BLOCKSIZE);
+
+    LOG.info("Set " +
+        DistCpOptionSwitch.APPEND.getSwitch()
+        + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
+        + " is passed.");
+    inputOptions.setAppend(false);
+
+    LOG.info("Set " +
+        DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES
+        + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
+        + " is passed.");
+    getConf().setBoolean(
+        DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
+  }
+
+
   /**
    * Create Job object for submitting it, with all the configuration
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index b000791..e76a48e 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -178,6 +178,16 @@ public enum DistCpOptionSwitch {
       new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
               "copied to <= n bytes")),
 
+  BLOCKS_PER_CHUNK("",
+      new Option("blocksperchunk", true, "If set to a positive value, files"
+          + "with more blocks than this value will be split into chunks of "
+          + "<blocksperchunk> blocks to be transferred in parallel, and "
+          + "reassembled on the destination. By default, <blocksperchunk> is "
+          + "0 and the files will be transmitted in their entirety without "
+          + "splitting. This switch is only applicable when the source file "
+          + "system implements getBlockLocations method and the target file "
+          + "system implements concat method")),
+
   /**
    * Specify bandwidth per map in MB
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index c61816a..2efb96b 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -99,7 +99,11 @@ public class DistCpOptions {
   // targetPathExist is a derived field, it's initialized in the
   // beginning of distcp.
   private boolean targetPathExists = true;
-  
+
+  // Size of chunk in number of blocks when splitting large file into chunks
+  // to copy in parallel. Default is 0 and file are not splitted.
+  private int blocksPerChunk = 0;
+
   public static enum FileAttribute{
     REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES;
 
@@ -169,6 +173,7 @@ public class DistCpOptions {
       this.targetPath = that.getTargetPath();
       this.targetPathExists = that.getTargetPathExists();
       this.filtersFile = that.getFiltersFile();
+      this.blocksPerChunk = that.blocksPerChunk;
     }
   }
 
@@ -623,6 +628,18 @@ public class DistCpOptions {
     this.filtersFile = filtersFilename;
   }
 
+  public final void setBlocksPerChunk(int csize) {
+    this.blocksPerChunk = csize;
+  }
+
+  public final int getBlocksPerChunk() {
+    return blocksPerChunk;
+  }
+
+  public final boolean splitLargeFile() {
+    return blocksPerChunk > 0;
+  }
+
   public void validate(DistCpOptionSwitch option, boolean value) {
 
     boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
@@ -717,6 +734,8 @@ public class DistCpOptions {
       DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS,
           filtersFile);
     }
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
+        String.valueOf(blocksPerChunk));
   }
 
   /**
@@ -753,6 +772,7 @@ public class DistCpOptions {
         ", targetPath=" + targetPath +
         ", targetPathExists=" + targetPathExists +
         ", filtersFile='" + filtersFile + '\'' +
+        ", blocksPerChunk=" + blocksPerChunk +
         '}';
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index af3cb92..c68102d 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -184,9 +184,42 @@ public class OptionsParser {
           DistCpOptionSwitch.FILTERS.getSwitch()));
     }
 
+    parseBlocksPerChunk(command, option);
+
     return option;
   }
 
+
+  /**
+   * A helper method to parse chunk size in number of blocks.
+   * Used when breaking large file into chunks to copy in parallel.
+   *
+   * @param command command line arguments
+   */
+  private static void parseBlocksPerChunk(CommandLine command,
+      DistCpOptions option) {
+    boolean hasOption =
+        command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch());
+    LOG.info("parseChunkSize: " +
+        DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " " + hasOption);
+    if (hasOption) {
+      String chunkSizeString = getVal(command,
+          DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim());
+      try {
+        int csize = Integer.parseInt(chunkSizeString);
+        if (csize < 0) {
+          csize = 0;
+        }
+        LOG.info("Set distcp blocksPerChunk to " + csize);
+        option.setBlocksPerChunk(csize);
+      }
+      catch (NumberFormatException e) {
+        throw new IllegalArgumentException("blocksPerChunk is invalid: "
+            + chunkSizeString, e);
+      }
+    }
+  }
+
   /**
    * parseSizeLimit is a helper method for parsing the deprecated
    * argument SIZE_LIMIT.
@@ -221,8 +254,7 @@ public class OptionsParser {
                               DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
       try {
         Integer.parseInt(fileLimitString);
-      }
-      catch (NumberFormatException e) {
+      } catch (NumberFormatException e) {
         throw new IllegalArgumentException("File-limit is invalid: "
                                             + fileLimitString, e);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
index 105e4f2..af91347 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.tools;
 
 import com.google.common.collect.Lists;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -47,6 +48,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.LinkedList;
 
 import static org.apache.hadoop.tools.DistCpConstants
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -240,10 +242,10 @@ public class SimpleCopyListing extends CopyListing {
     final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
     final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
     final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
-    CopyListingFileStatus fileCopyListingStatus =
+    LinkedList<CopyListingFileStatus> fileCopyListingStatus =
         DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus,
-            preserveAcls, preserveXAttrs, preserveRawXAttrs);
-
+            preserveAcls, preserveXAttrs, preserveRawXAttrs,
+            options.getBlocksPerChunk());
     writeToFileListingRoot(fileListWriter, fileCopyListingStatus,
         sourceRoot, options);
   }
@@ -348,9 +350,10 @@ public class SimpleCopyListing extends CopyListing {
         FileStatus[] sourceFiles = sourceFS.listStatus(path);
         boolean explore = (sourceFiles != null && sourceFiles.length > 0);
         if (!explore || rootStatus.isDirectory()) {
-          CopyListingFileStatus rootCopyListingStatus =
-            DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
-                preserveAcls, preserveXAttrs, preserveRawXAttrs);
+          LinkedList<CopyListingFileStatus> rootCopyListingStatus =
+              DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
+                  preserveAcls, preserveXAttrs, preserveRawXAttrs,
+                  options.getBlocksPerChunk());
           writeToFileListingRoot(fileListWriter, rootCopyListingStatus,
               sourcePathRoot, options);
         }
@@ -360,20 +363,20 @@ public class SimpleCopyListing extends CopyListing {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
             }
-            CopyListingFileStatus sourceCopyListingStatus =
-              DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
-                  preserveAcls && sourceStatus.isDirectory(),
-                  preserveXAttrs && sourceStatus.isDirectory(),
-                  preserveRawXAttrs && sourceStatus.isDirectory());
-            if (randomizeFileListing) {
-              addToFileListing(statusList,
-                  new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot),
-                  fileListWriter);
-            } else {
-              writeToFileListing(fileListWriter, sourceCopyListingStatus,
-                  sourcePathRoot);
+            LinkedList<CopyListingFileStatus> sourceCopyListingStatus =
+                DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
+                    preserveAcls && sourceStatus.isDirectory(),
+                    preserveXAttrs && sourceStatus.isDirectory(),
+                    preserveRawXAttrs && sourceStatus.isDirectory(),
+                    options.getBlocksPerChunk());
+            for (CopyListingFileStatus fs : sourceCopyListingStatus) {
+              if (randomizeFileListing) {
+                addToFileListing(statusList,
+                    new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+              } else {
+                writeToFileListing(fileListWriter, fs, sourcePathRoot);
+              }
             }
-
             if (sourceStatus.isDirectory()) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath());
@@ -641,18 +644,20 @@ public class SimpleCopyListing extends CopyListing {
             LOG.debug("Recording source-path: " + child.getPath() + " for copy.");
           }
           if (workResult.getSuccess()) {
-            CopyListingFileStatus childCopyListingStatus =
+            LinkedList<CopyListingFileStatus> childCopyListingStatus =
               DistCpUtils.toCopyListingFileStatus(sourceFS, child,
                 preserveAcls && child.isDirectory(),
                 preserveXAttrs && child.isDirectory(),
-                preserveRawXattrs && child.isDirectory());
-            if (randomizeFileListing) {
-              addToFileListing(fileStatuses,
-                  new FileStatusInfo(childCopyListingStatus, sourcePathRoot),
-                  fileListWriter);
-            } else {
-              writeToFileListing(fileListWriter, childCopyListingStatus,
-                  sourcePathRoot);
+                preserveRawXattrs && child.isDirectory(),
+                options.getBlocksPerChunk());
+
+            for (CopyListingFileStatus fs : childCopyListingStatus) {
+              if (randomizeFileListing) {
+                addToFileListing(fileStatuses,
+                    new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+              } else {
+                writeToFileListing(fileListWriter, fs, sourcePathRoot);
+              }
             }
           }
           if (retry < maxRetries) {
@@ -675,19 +680,21 @@ public class SimpleCopyListing extends CopyListing {
   }
 
   private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
-      CopyListingFileStatus fileStatus, Path sourcePathRoot,
+      LinkedList<CopyListingFileStatus> fileStatus, Path sourcePathRoot,
       DistCpOptions options) throws IOException {
     boolean syncOrOverwrite = options.shouldSyncFolder() ||
         options.shouldOverwrite();
-    if (fileStatus.getPath().equals(sourcePathRoot) && 
-        fileStatus.isDirectory() && syncOrOverwrite) {
-      // Skip the root-paths when syncOrOverwrite
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skip " + fileStatus.getPath());
-      }      
-      return;
+    for (CopyListingFileStatus fs : fileStatus) {
+      if (fs.getPath().equals(sourcePathRoot) &&
+          fs.isDirectory() && syncOrOverwrite) {
+        // Skip the root-paths when syncOrOverwrite
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip " + fs.getPath());
+        }
+        return;
+      }
+      writeToFileListing(fileListWriter, fs, sourcePathRoot);
     }
-    writeToFileListing(fileListWriter, fileStatus, sourcePathRoot);
   }
 
   private void writeToFileListing(SequenceFile.Writer fileListWriter,
@@ -707,7 +714,7 @@ public class SimpleCopyListing extends CopyListing {
     fileListWriter.sync();
 
     if (!fileStatus.isDirectory()) {
-      totalBytesToCopy += fileStatus.getLen();
+      totalBytesToCopy += fileStatus.getSizeToCopy();
     } else {
       totalDirs++;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index 75cefb4..6ddaab9 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -34,14 +35,17 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.tools.CopyListing;
 import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptionSwitch;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.GlobbedCopyListing;
 import org.apache.hadoop.tools.util.DistCpUtils;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -63,7 +67,8 @@ public class CopyCommitter extends FileOutputCommitter {
   private boolean syncFolder = false;
   private boolean overwrite = false;
   private boolean targetPathExists = true;
-  
+  private boolean ignoreFailures = false;
+
   /**
    * Create a output committer
    *
@@ -82,8 +87,13 @@ public class CopyCommitter extends FileOutputCommitter {
     Configuration conf = jobContext.getConfiguration();
     syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false);
     overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false);
-    targetPathExists = conf.getBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true);
-    
+    targetPathExists = conf.getBoolean(
+        DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true);
+    ignoreFailures = conf.getBoolean(
+        DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
+
+    concatFileChunks(conf);
+
     super.commitJob(jobContext);
 
     cleanupTempFiles(jobContext);
@@ -169,9 +179,112 @@ public class CopyCommitter extends FileOutputCommitter {
     }
   }
 
+  private boolean isFileNotFoundException(IOException e) {
+    if (e instanceof FileNotFoundException) {
+      return true;
+    }
+
+    if (e instanceof RemoteException) {
+      return ((RemoteException)e).unwrapRemoteException()
+          instanceof FileNotFoundException;
+    }
+
+    return false;
+  }
+
+  /**
+   * Concat chunk files for the same file into one.
+   * Iterate through copy listing, identify chunk files for the same file,
+   * concat them into one.
+   */
+  private void concatFileChunks(Configuration conf) throws IOException {
+
+    LOG.info("concat file chunks ...");
+
+    String spath = conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH);
+    if (spath == null || spath.isEmpty()) {
+      return;
+    }
+    Path sourceListing = new Path(spath);
+    SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
+                                      SequenceFile.Reader.file(sourceListing));
+    Path targetRoot =
+        new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+    try {
+      CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+      Text srcRelPath = new Text();
+      CopyListingFileStatus lastFileStatus = null;
+      LinkedList<Path> allChunkPaths = new LinkedList<Path>();
+
+      // Iterate over every source path that was copied.
+      while (sourceReader.next(srcRelPath, srcFileStatus)) {
+        if (srcFileStatus.isDirectory()) {
+          continue;
+        }
+        Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
+        Path targetFileChunkPath =
+            DistCpUtils.getSplitChunkPath(targetFile, srcFileStatus);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("  add " + targetFileChunkPath + " to concat.");
+        }
+        allChunkPaths.add(targetFileChunkPath);
+        if (srcFileStatus.getChunkOffset() + srcFileStatus.getChunkLength()
+            == srcFileStatus.getLen()) {
+          // This is the last chunk of the splits, consolidate allChunkPaths
+          try {
+            concatFileChunks(conf, targetFile, allChunkPaths);
+          } catch (IOException e) {
+            // If the concat failed because a chunk file doesn't exist,
+            // then we assume that the CopyMapper has skipped copying this
+            // file, and we ignore the exception here.
+            // If a chunk file should have been created but it was not, then
+            // the CopyMapper would have failed.
+            if (!isFileNotFoundException(e)) {
+              String emsg = "Failed to concat chunk files for " + targetFile;
+              if (!ignoreFailures) {
+                throw new IOException(emsg, e);
+              } else {
+                LOG.warn(emsg, e);
+              }
+            }
+          }
+          allChunkPaths.clear();
+          lastFileStatus = null;
+        } else {
+          if (lastFileStatus == null) {
+            lastFileStatus = new CopyListingFileStatus(srcFileStatus);
+          } else {
+            // Two neighboring chunks have to be consecutive ones for the same
+            // file, for them to be merged
+            if (!srcFileStatus.getPath().equals(lastFileStatus.getPath()) ||
+                srcFileStatus.getChunkOffset() !=
+                (lastFileStatus.getChunkOffset() +
+                lastFileStatus.getChunkLength())) {
+              String emsg = "Inconsistent sequence file: current " +
+                  "chunk file " + srcFileStatus + " doesnt match prior " +
+                  "entry " + lastFileStatus;
+              if (!ignoreFailures) {
+                throw new IOException(emsg);
+              } else {
+                LOG.warn(emsg + ", skipping concat this set.");
+              }
+            } else {
+              lastFileStatus.setChunkOffset(srcFileStatus.getChunkOffset());
+              lastFileStatus.setChunkLength(srcFileStatus.getChunkLength());
+            }
+          }
+        }
+      }
+    } finally {
+      IOUtils.closeStream(sourceReader);
+    }
+  }
+
   // This method changes the target-directories' file-attributes (owner,
   // user/group permissions, etc.) based on the corresponding source directories.
-  private void preserveFileAttributesForDirectories(Configuration conf) throws IOException {
+  private void preserveFileAttributesForDirectories(Configuration conf)
+      throws IOException {
     String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
     final boolean syncOrOverwrite = syncFolder || overwrite;
 
@@ -325,4 +438,57 @@ public class CopyCommitter extends FileOutputCommitter {
         ", Unable to move to " + finalDir);
     }
   }
+
+  /**
+   * Concat the passed chunk files into one and rename it the targetFile.
+   */
+  private void concatFileChunks(Configuration conf, Path targetFile,
+      LinkedList<Path> allChunkPaths) throws IOException {
+    if (allChunkPaths.size() == 1) {
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("concat " + targetFile + " allChunkSize+ "
+          + allChunkPaths.size());
+    }
+    FileSystem dstfs = targetFile.getFileSystem(conf);
+
+    Path firstChunkFile = allChunkPaths.removeFirst();
+    Path[] restChunkFiles = new Path[allChunkPaths.size()];
+    allChunkPaths.toArray(restChunkFiles);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("concat: firstchunk: " + dstfs.getFileStatus(firstChunkFile));
+      int i = 0;
+      for (Path f : restChunkFiles) {
+        LOG.debug("concat: other chunk: " + i + ": " + dstfs.getFileStatus(f));
+        ++i;
+      }
+    }
+    dstfs.concat(firstChunkFile, restChunkFiles);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile));
+    }
+    rename(dstfs, firstChunkFile, targetFile);
+  }
+
+  /**
+   * Rename tmp to dst on destFileSys.
+   * @param destFileSys the file ssystem
+   * @param tmp the source path
+   * @param dst the destination path
+   * @throws IOException if renaming failed
+   */
+  private static void rename(FileSystem destFileSys, Path tmp, Path dst)
+      throws IOException {
+    try {
+      if (destFileSys.exists(dst)) {
+        destFileSys.delete(dst, true);
+      }
+      destFileSys.rename(tmp, dst);
+    } catch (IOException ioe) {
+      throw new IOException("Fail to rename tmp file (=" + tmp
+          + ") to destination file (=" + dst + ")", ioe);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
index 41c5d78..53a95ee 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
@@ -219,10 +219,12 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
         sourceFS = sourcePath.getFileSystem(conf);
         final boolean preserveXAttrs =
             fileAttributes.contains(FileAttribute.XATTR);
-        sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
-          sourceFS.getFileStatus(sourcePath),
-          fileAttributes.contains(FileAttribute.ACL), 
-          preserveXAttrs, preserveRawXattrs);
+        sourceCurrStatus = DistCpUtils.toCopyListingFileStatusHelper(sourceFS,
+            sourceFS.getFileStatus(sourcePath),
+            fileAttributes.contains(FileAttribute.ACL),
+            preserveXAttrs, preserveRawXattrs,
+            sourceFileStatus.getChunkOffset(),
+            sourceFileStatus.getChunkLength());
       } catch (FileNotFoundException e) {
         throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
       }
@@ -236,7 +238,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
           LOG.debug("Path could not be found: " + target, ignore);
       }
 
-      if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
+      if (targetStatus != null &&
+          (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
         throw new IOException("Can't replace " + target + ". Target is " +
             getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
       }
@@ -246,19 +249,28 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
         return;
       }
 
-      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target, targetStatus);
+      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target,
+          targetStatus);
+
+      Path tmpTarget = target;
       if (action == FileAction.SKIP) {
         LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
                  + " to " + target);
         updateSkipCounters(context, sourceCurrStatus);
         context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
+
       } else {
-        copyFileWithRetry(description, sourceCurrStatus, target, context,
+        if (sourceCurrStatus.isSplit()) {
+          tmpTarget = DistCpUtils.getSplitChunkPath(target, sourceCurrStatus);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("copying " + sourceCurrStatus + " " + tmpTarget);
+        }
+        copyFileWithRetry(description, sourceCurrStatus, tmpTarget, context,
             action, fileAttributes);
       }
-
-      DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
-          fileAttributes, preserveRawXattrs);
+      DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget,
+          sourceCurrStatus, fileAttributes, preserveRawXattrs);
     } catch (IOException exception) {
       handleFailures(exception, sourceFileStatus, target, context);
     }
@@ -323,8 +335,12 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
   private void handleFailures(IOException exception,
       CopyListingFileStatus sourceFileStatus, Path target, Context context)
       throws IOException, InterruptedException {
-    LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " +
-                target, exception);
+    LOG.error("Failure in copying " + sourceFileStatus.getPath() +
+        (sourceFileStatus.isSplit()? ","
+            + " offset=" + sourceFileStatus.getChunkOffset()
+            + " chunkLength=" + sourceFileStatus.getChunkLength()
+            : "") +
+        " to " + target, exception);
 
     if (ignoreFailures &&
         ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 1777364..58a51af 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -118,17 +118,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
           .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
           .getFileChecksum(sourcePath) : null;
 
-      final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
-          target).getLen() : 0;
+      long offset = (action == FileAction.APPEND) ?
+          targetFS.getFileStatus(target).getLen() : source.getChunkOffset();
       long bytesRead = copyToFile(targetPath, targetFS, source,
           offset, context, fileAttributes, sourceChecksum);
 
-      compareFileLengths(source, targetPath, configuration, bytesRead
-          + offset);
+      if (!source.isSplit()) {
+        compareFileLengths(source, targetPath, configuration, bytesRead
+            + offset);
+      }
       //At this point, src&dest lengths are same. if length==0, we skip checksum
       if ((bytesRead != 0) && (!skipCrc)) {
-        compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
-            targetFS, targetPath);
+        if (!source.isSplit()) {
+          compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
+              targetFS, targetPath);
+        }
       }
       // it's not append case, thus we first write to a temporary file, rename
       // it to the target path.
@@ -246,16 +250,26 @@ public class RetriableFileCopyCommand extends RetriableCommand {
     ThrottledInputStream inStream = null;
     long totalBytesRead = 0;
 
+    long chunkLength = source2.getChunkLength();
+    boolean finished = false;
     try {
       inStream = getInputStream(source, context.getConfiguration());
       int bytesRead = readBytes(inStream, buf, sourceOffset);
       while (bytesRead >= 0) {
+        if (chunkLength > 0 &&
+            (totalBytesRead + bytesRead) >= chunkLength) {
+          bytesRead = (int)(chunkLength - totalBytesRead);
+          finished = true;
+        }
         totalBytesRead += bytesRead;
         if (action == FileAction.APPEND) {
           sourceOffset += bytesRead;
         }
         outStream.write(buf, 0, bytesRead);
         updateContextStatus(totalBytesRead, context, source2);
+        if (finished) {
+          break;
+        }
         bytesRead = readBytes(inStream, buf, sourceOffset);
       }
       outStream.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
index 3e86d09..d1c18ea 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
@@ -99,7 +99,8 @@ public class UniformSizeInputFormat
       while (reader.next(srcRelPath, srcFileStatus)) {
         // If adding the current file would cause the bytes per map to exceed
         // limit. Add the current file to new split
-        if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) {
+        if (currentSplitSize + srcFileStatus.getChunkLength() > nBytesPerSplit
+            && lastPosition != 0) {
           FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
               lastPosition - lastSplitStart, null);
           if (LOG.isDebugEnabled()) {
@@ -109,7 +110,7 @@ public class UniformSizeInputFormat
           lastSplitStart = lastPosition;
           currentSplitSize = 0;
         }
-        currentSplitSize += srcFileStatus.getLen();
+        currentSplitSize += srcFileStatus.getChunkLength();
         lastPosition = reader.getPosition();
       }
       if (lastPosition > lastSplitStart) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
index c308e6f..29715bb 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
@@ -19,9 +19,11 @@
 package org.apache.hadoop.tools.util;
 
 import com.google.common.collect.Maps;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,6 +32,7 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclUtil;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -44,6 +47,7 @@ import org.apache.hadoop.util.StringUtils;
 import java.io.IOException;
 import java.text.DecimalFormat;
 import java.util.EnumSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -116,7 +120,7 @@ public class DistCpUtils {
    * @return Class implementing the strategy specified in options.
    */
   public static Class<? extends InputFormat> getStrategy(Configuration conf,
-                                                                 DistCpOptions options) {
+      DistCpOptions options) {
     String confLabel = "distcp."
         + StringUtils.toLowerCase(options.getCopyStrategy())
         + ".strategy" + ".impl";
@@ -293,6 +297,86 @@ public class DistCpUtils {
   }
 
   /**
+   * Converts FileStatus to a list of CopyListingFileStatus.
+   * The resulted list contains either one CopyListingFileStatus per chunk of
+   * file-blocks (if file-size exceeds blockSize * blocksPerChunk, and there
+   * are more blocks in the file than blocksperChunk), or a single
+   * CopyListingFileStatus for the entire file (if file-size is too small to
+   * split).
+   * If preserving ACLs, populates the CopyListingFileStatus with the ACLs.
+   * If preserving XAttrs, populates the CopyListingFileStatus with the XAttrs.
+   *
+   * @param fileSystem FileSystem containing the file
+   * @param fileStatus FileStatus of file
+   * @param preserveAcls boolean true if preserving ACLs
+   * @param preserveXAttrs boolean true if preserving XAttrs
+   * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs
+   * @param blocksPerChunk size of chunks when copying chunks in parallel
+   * @return list of CopyListingFileStatus
+   * @throws IOException if there is an I/O error
+   */
+  public static LinkedList<CopyListingFileStatus> toCopyListingFileStatus(
+      FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls,
+      boolean preserveXAttrs, boolean preserveRawXAttrs, int blocksPerChunk)
+          throws IOException {
+    LinkedList<CopyListingFileStatus> copyListingFileStatus =
+        new LinkedList<CopyListingFileStatus>();
+
+    final CopyListingFileStatus clfs = toCopyListingFileStatusHelper(
+        fileSystem, fileStatus, preserveAcls,
+        preserveXAttrs, preserveRawXAttrs,
+        0, fileStatus.getLen());
+    final long blockSize = fileStatus.getBlockSize();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("toCopyListing: " + fileStatus + " chunkSize: "
+          + blocksPerChunk + " isDFS: " +
+          (fileSystem instanceof DistributedFileSystem));
+    }
+    if ((blocksPerChunk > 0) &&
+        !fileStatus.isDirectory() &&
+        (fileStatus.getLen() > blockSize * blocksPerChunk)) {
+      // split only when the file size is larger than the intended chunk size
+      final BlockLocation[] blockLocations;
+      blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0,
+            fileStatus.getLen());
+
+      int numBlocks = blockLocations.length;
+      long curPos = 0;
+      if (numBlocks <= blocksPerChunk) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("  add file " + clfs);
+        }
+        copyListingFileStatus.add(clfs);
+      } else {
+        int i = 0;
+        while (i < numBlocks) {
+          long curLength = 0;
+          for (int j = 0; j < blocksPerChunk && i < numBlocks; ++j, ++i) {
+            curLength += blockLocations[i].getLength();
+          }
+          if (curLength > 0) {
+            CopyListingFileStatus clfs1 = new CopyListingFileStatus(clfs);
+            clfs1.setChunkOffset(curPos);
+            clfs1.setChunkLength(curLength);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("  add file chunk " + clfs1);
+            }
+            copyListingFileStatus.add(clfs1);
+            curPos += curLength;
+          }
+        }
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("  add file/dir " + clfs);
+      }
+      copyListingFileStatus.add(clfs);
+    }
+
+    return copyListingFileStatus;
+  }
+
+  /**
    * Converts a FileStatus to a CopyListingFileStatus.  If preserving ACLs,
    * populates the CopyListingFileStatus with the ACLs. If preserving XAttrs,
    * populates the CopyListingFileStatus with the XAttrs.
@@ -302,13 +386,17 @@ public class DistCpUtils {
    * @param preserveAcls boolean true if preserving ACLs
    * @param preserveXAttrs boolean true if preserving XAttrs
    * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs
+   * @param chunkOffset chunk offset in bytes
+   * @param chunkLength chunk length in bytes
+   * @return CopyListingFileStatus
    * @throws IOException if there is an I/O error
    */
-  public static CopyListingFileStatus toCopyListingFileStatus(
+  public static CopyListingFileStatus toCopyListingFileStatusHelper(
       FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, 
-      boolean preserveXAttrs, boolean preserveRawXAttrs) throws IOException {
+      boolean preserveXAttrs, boolean preserveRawXAttrs,
+      long chunkOffset, long chunkLength) throws IOException {
     CopyListingFileStatus copyListingFileStatus =
-      new CopyListingFileStatus(fileStatus);
+        new CopyListingFileStatus(fileStatus, chunkOffset, chunkLength);
     if (preserveAcls) {
       FsPermission perm = fileStatus.getPermission();
       if (perm.getAclBit()) {
@@ -465,4 +553,19 @@ public class DistCpUtils {
     return (sourceChecksum == null || targetChecksum == null ||
             sourceChecksum.equals(targetChecksum));
   }
+
+  /*
+   * Return the Path for a given chunk.
+   * Used when splitting large file into chunks to copy in parallel.
+   * @param targetFile path to target file
+   * @param srcFileStatus source file status in copy listing
+   * @return path to the chunk specified by the parameters to store
+   * in target cluster temporarily
+   */
+  public static Path getSplitChunkPath(Path targetFile,
+      CopyListingFileStatus srcFileStatus) {
+    return new Path(targetFile.toString()
+        + ".____distcpSplit____" + srcFileStatus.getChunkOffset()
+        + "." + srcFileStatus.getChunkLength());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index d153485..e6cff10 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -239,6 +239,7 @@ Flag              | Description                          | Notes
 `-rdiff <newSnapshot> <oldSnapshot>` | Use snapshot diff report between given two snapshots to identify what has been changed on the target since the snapshot `<oldSnapshot>` was created on the target, and apply the diff reversely to the target, and copy modified files from the source's `<oldSnapshot>`, to make the target the same as `<oldSnapshot>`. | This option is valid only with `-update` option and the following conditions should be satisfied. <ol><li>Both the source and the target FileSystem must be DistributedFileSystem. The source and the target can be two different clusters/paths, or they can be exactly the same cluster/path. In the latter case, modified files are copied from target's `<oldSnapshot>` to target's current state).</li>  <li> Two snapshots `<newSnapshot>` and `<oldSnapshot>` have been created on the target FS, and `<oldSnapshot>` is older than `<newSnapshot>`. No change has been made on target since `<newSnapshot>` was created on the target. </li> <li> The sour
 ce has the same snapshot `<oldSnapshot>`, which has the same content as the `<oldSnapshot>` on the target. All the files/directories in the target's `<oldSnapshot>` are the same with source's `<oldSnapshot>`.</li> </ol> |
 `-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads.
 `-skipcrccheck` | Whether to skip CRC checks between source and target paths. |
+`-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
 
 Architecture of DistCp
 ----------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
index 74a100c..df36c26 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
@@ -312,7 +312,7 @@ public class TestDistCpOptions {
         + "copyStrategy='uniformsize', preserveStatus=[], "
         + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
         + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
-        + "targetPathExists=true, filtersFile='null'}";
+        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}";
     String optionString = option.toString();
     Assert.assertEquals(val, optionString);
     Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
index e3018a0..b2266b3 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
@@ -23,17 +23,27 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
+import java.io.PrintStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -47,11 +57,15 @@ import org.junit.rules.Timeout;
  */
 
 public class TestDistCpSystem {
+  private static final Log LOG =
+      LogFactory.getLog(TestDistCpSystem.class);
+
   @Rule
   public Timeout globalTimeout = new Timeout(30000);
 
   private static final String SRCDAT = "srcdat";
   private static final String DSTDAT = "dstdat";
+  private static final long BLOCK_SIZE = 1024;
 
   private static MiniDFSCluster cluster;
   private static Configuration conf;
@@ -63,27 +77,76 @@ public class TestDistCpSystem {
       this.path = path;
       this.isDir = isDir;
     }
-    String getPath() { return path; }
-    boolean isDirectory() { return isDir; }
+
+    String getPath() {
+      return path;
+    }
+
+    boolean isDirectory() {
+      return isDir;
+    }
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  static String execCmd(FsShell shell, String... args) throws Exception {
+    ByteArrayOutputStream baout = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(baout, true);
+    PrintStream old = System.out;
+    System.setOut(out);
+    shell.run(args);
+    out.close();
+    System.setOut(old);
+    return baout.toString();
   }
   
-  private void createFiles(FileSystem fs, String topdir,
-      FileEntry[] entries) throws IOException {
+  private void createFiles(DistributedFileSystem fs, String topdir,
+      FileEntry[] entries, long chunkSize) throws IOException {
+    long seed = System.currentTimeMillis();
+    Random rand = new Random(seed);
+    short replicationFactor = 2;
     for (FileEntry entry : entries) {
-      Path newpath = new Path(topdir + "/" + entry.getPath());
+      Path newPath = new Path(topdir + "/" + entry.getPath());
       if (entry.isDirectory()) {
-        fs.mkdirs(newpath);
+        fs.mkdirs(newPath);
       } else {
-        OutputStream out = fs.create(newpath);
-        try {
-          out.write((topdir + "/" + entry).getBytes());
-          out.write("\n".getBytes());
-        } finally {
-          out.close();
+        long fileSize = BLOCK_SIZE *100;
+        int bufSize = 128;
+        if (chunkSize == -1) {
+          DFSTestUtil.createFile(fs, newPath, bufSize,
+              fileSize, BLOCK_SIZE, replicationFactor, seed);
+        } else {
+          // Create a variable length block file, by creating
+          // one block of half block size at the chunk boundary
+          long seg1 = chunkSize * BLOCK_SIZE - BLOCK_SIZE / 2;
+          long seg2 = fileSize - seg1;
+          DFSTestUtil.createFile(fs, newPath, bufSize,
+              seg1, BLOCK_SIZE, replicationFactor, seed);
+          DFSTestUtil.appendFileNewBlock(fs, newPath, (int)seg2);
         }
       }
+      seed = System.currentTimeMillis() + rand.nextLong();
     }
   }
+
+  private void createFiles(DistributedFileSystem fs, String topdir,
+      FileEntry[] entries) throws IOException {
+    createFiles(fs, topdir, entries, -1);
+  }
    
   private static FileStatus[] getFileStatus(FileSystem fs,
       String topdir, FileEntry[] files) throws IOException {
@@ -104,18 +167,19 @@ public class TestDistCpSystem {
   }
 
   private void testPreserveUserHelper(String testRoot,
-                                      FileEntry[] srcEntries,
-                                      FileEntry[] dstEntries,
-                                      boolean createSrcDir,
-                                      boolean createTgtDir,
-                                      boolean update) throws Exception {
+      FileEntry[] srcEntries,
+      FileEntry[] dstEntries,
+      boolean createSrcDir,
+      boolean createTgtDir,
+      boolean update) throws Exception {
     final String testSrcRel = SRCDAT;
     final String testSrc = testRoot + "/" + testSrcRel;
     final String testDstRel = DSTDAT;
     final String testDst = testRoot + "/" + testDstRel;
 
     String nnUri = FileSystem.getDefaultUri(conf).toString();
-    FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+    DistributedFileSystem fs = (DistributedFileSystem)
+        FileSystem.get(URI.create(nnUri), conf);
     fs.mkdirs(new Path(testRoot));
     if (createSrcDir) {
       fs.mkdirs(new Path(testSrc));
@@ -129,8 +193,8 @@ public class TestDistCpSystem {
     for(int i = 0; i < srcEntries.length; i++) {
       fs.setOwner(srcstats[i].getPath(), "u" + i, null);
     }
-    String[] args = update? new String[]{"-pu", "-update", nnUri+testSrc,
-        nnUri+testDst} : new String[]{"-pu", nnUri+testSrc, nnUri+testDst};
+    String[] args = update? new String[]{"-pub", "-update", nnUri+testSrc,
+        nnUri+testDst} : new String[]{"-pub", nnUri+testSrc, nnUri+testDst};
 
     ToolRunner.run(conf, new DistCp(), args);
 
@@ -145,20 +209,263 @@ public class TestDistCpSystem {
     deldir(fs, testRoot);
   }
 
-  @BeforeClass
-  public static void beforeClass() throws IOException {
-    conf = new Configuration();
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    cluster.waitActive();
+  private void compareFiles(FileSystem fs, FileStatus srcStat,
+      FileStatus dstStat) throws Exception {
+    LOG.info("Comparing " + srcStat + " and " + dstStat);
+    assertEquals(srcStat.isDirectory(), dstStat.isDirectory());
+    assertEquals(srcStat.getReplication(), dstStat.getReplication());
+    assertEquals("File POSIX permission should match",
+        srcStat.getPermission(), dstStat.getPermission());
+    assertEquals("File user ownership should match",
+        srcStat.getOwner(), dstStat.getOwner());
+    assertEquals("File group ownership should match",
+        srcStat.getGroup(), dstStat.getGroup());
+    // TODO; check ACL attributes
+
+    if (srcStat.isDirectory()) {
+      return;
+    }
+
+    assertEquals("File length should match (" + srcStat.getPath() + ")",
+        srcStat.getLen(), dstStat.getLen());
+
+    FSDataInputStream srcIn = fs.open(srcStat.getPath());
+    FSDataInputStream dstIn = fs.open(dstStat.getPath());
+    try {
+      byte[] readSrc = new byte[(int)
+                                HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT];
+      byte[] readDst = new byte[(int)
+                                HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT];
+
+      int srcBytesRead = 0, tgtBytesRead = 0;
+      int srcIdx = 0, tgtIdx = 0;
+      long totalComparedBytes = 0;
+      while (true) {
+        if (srcBytesRead == 0) {
+          srcBytesRead = srcIn.read(readSrc);
+          srcIdx = 0;
+        }
+        if (tgtBytesRead == 0) {
+          tgtBytesRead = dstIn.read(readDst);
+          tgtIdx = 0;
+        }
+        if (srcBytesRead == 0 || tgtBytesRead == 0) {
+          LOG.info("______ compared src and dst files for "
+              + totalComparedBytes + " bytes, content match.");
+          if (srcBytesRead != tgtBytesRead) {
+            Assert.fail("Read mismatching size, compared "
+                + totalComparedBytes + " bytes between src and dst file "
+                + srcStat + " and " + dstStat);
+          }
+          if (totalComparedBytes != srcStat.getLen()) {
+            Assert.fail("Only read/compared " + totalComparedBytes +
+                " bytes between src and dst file " + srcStat +
+                " and " + dstStat);
+          } else {
+            // success
+            break;
+          }
+        }
+        for (; srcIdx < srcBytesRead && tgtIdx < tgtBytesRead;
+            ++srcIdx, ++tgtIdx) {
+          if (readSrc[srcIdx] != readDst[tgtIdx]) {
+            Assert.fail("src and dst file does not match at "
+                + totalComparedBytes + " between "
+                + srcStat + " and " + dstStat);
+          }
+          ++totalComparedBytes;
+        }
+        LOG.info("______ compared src and dst files for "
+            + totalComparedBytes + " bytes, content match. FileLength: "
+            + srcStat.getLen());
+        if (totalComparedBytes == srcStat.getLen()) {
+          LOG.info("______ Final:" + srcIdx + " "
+              + srcBytesRead + " " + tgtIdx + " " + tgtBytesRead);
+          break;
+        }
+        if (srcIdx == srcBytesRead) {
+          srcBytesRead = 0;
+        }
+        if (tgtIdx == tgtBytesRead) {
+          tgtBytesRead = 0;
+        }
+      }
+    } finally {
+      if (srcIn != null) {
+        srcIn.close();
+      }
+      if (dstIn != null) {
+        dstIn.close();
+      }
+    }
   }
 
-  @AfterClass
-  public static void afterClass() throws IOException {
-    if (cluster != null) {
-      cluster.shutdown();
+  // WC: needed because the current distcp does not create target dirs
+  private void createDestDir(FileSystem fs, String testDst,
+      FileStatus[] srcStats, FileEntry[] srcFiles) throws IOException {
+    fs.mkdirs(new Path(testDst));
+
+    for (int i=0; i<srcStats.length; i++) {
+      FileStatus srcStat = srcStats[i];
+      if (srcStat.isDirectory()) {
+        Path dstPath = new Path(testDst, srcFiles[i].getPath());
+        fs.mkdirs(dstPath);
+        fs.setOwner(dstPath, srcStat.getOwner(), srcStat.getGroup());
+      }
+    }
+  }
+
+  private void copyAndVerify(final DistributedFileSystem fs,
+      final FileEntry[] srcFiles, final FileStatus[] srcStats,
+      final String testDst,
+      final String[] args) throws Exception {
+    final String testRoot = "/testdir";
+    FsShell shell = new FsShell(fs.getConf());
+
+    LOG.info("ls before distcp");
+    LOG.info(execCmd(shell, "-lsr", testRoot));
+
+    LOG.info("_____ running distcp: " + args[0] + " " + args[1]);
+    ToolRunner.run(conf, new DistCp(), args);
+
+    LOG.info("ls after distcp");
+    LOG.info(execCmd(shell, "-lsr", testRoot));
+
+    FileStatus[] dstStat = getFileStatus(fs, testDst, srcFiles);
+    for (int i=0; i< dstStat.length; i++) {
+      compareFiles(fs, srcStats[i], dstStat[i]);
     }
   }
 
+  private void chunkCopy(FileEntry[] srcFiles) throws Exception {
+    final String testRoot = "/testdir";
+    final String testSrcRel = SRCDAT;
+    final String testSrc = testRoot + "/" + testSrcRel;
+    final String testDstRel = DSTDAT;
+    final String testDst = testRoot + "/" + testDstRel;
+    long chunkSize = 8;
+
+    String nnUri = FileSystem.getDefaultUri(conf).toString();
+    DistributedFileSystem fs = (DistributedFileSystem)
+        FileSystem.get(URI.create(nnUri), conf);
+
+    createFiles(fs, testRoot, srcFiles, chunkSize);
+
+    FileStatus[] srcStats = getFileStatus(fs, testRoot, srcFiles);
+    for (int i = 0; i < srcFiles.length; i++) {
+      fs.setOwner(srcStats[i].getPath(), "u" + i,  "g" + i);
+    }
+    // get file status after updating owners
+    srcStats = getFileStatus(fs, testRoot, srcFiles);
+
+    createDestDir(fs, testDst, srcStats, srcFiles);
+
+    String[] args = new String[] {"-pugp", "-blocksperchunk",
+        String.valueOf(chunkSize),
+        nnUri + testSrc, nnUri + testDst};
+
+    copyAndVerify(fs, srcFiles, srcStats, testDst, args);
+    // Do it again
+    copyAndVerify(fs, srcFiles, srcStats, testDst, args);
+
+    // modify last file and rerun distcp with -update option
+    LOG.info("Modify a file and copy again");
+    for(int i=srcFiles.length-1; i >=0; --i) {
+      if (!srcFiles[i].isDirectory()) {
+        LOG.info("Modifying " + srcStats[i].getPath());
+        DFSTestUtil.appendFileNewBlock(fs, srcStats[i].getPath(),
+            (int)BLOCK_SIZE * 3);
+        break;
+      }
+    }
+    // get file status after modifying file
+    srcStats = getFileStatus(fs, testRoot, srcFiles);
+
+    args = new String[] {"-pugp", "-update", "-blocksperchunk",
+        String.valueOf(chunkSize),
+        nnUri + testSrc, nnUri + testDst + "/" + testSrcRel};
+
+    copyAndVerify(fs, srcFiles, srcStats, testDst, args);
+
+    deldir(fs, testRoot);
+  }
+
+  @Test
+  public void testRecursiveChunkCopy() throws Exception {
+    FileEntry[] srcFiles = {
+        new FileEntry(SRCDAT, true),
+        new FileEntry(SRCDAT + "/file0", false),
+        new FileEntry(SRCDAT + "/dir1", true),
+        new FileEntry(SRCDAT + "/dir2", true),
+        new FileEntry(SRCDAT + "/dir1/file1", false)
+    };
+    chunkCopy(srcFiles);
+  }
+
+  @Test
+  public void testChunkCopyOneFile() throws Exception {
+    FileEntry[] srcFiles = {
+        new FileEntry(SRCDAT, true),
+        new FileEntry(SRCDAT + "/file0", false)
+    };
+    chunkCopy(srcFiles);
+  }
+
+  @Test
+  public void testDistcpLargeFile() throws Exception {
+    FileEntry[] srcfiles = {
+        new FileEntry(SRCDAT, true),
+        new FileEntry(SRCDAT + "/file", false)
+    };
+
+    final String testRoot = "/testdir";
+    final String testSrcRel = SRCDAT;
+    final String testSrc = testRoot + "/" + testSrcRel;
+    final String testDstRel = DSTDAT;
+    final String testDst = testRoot + "/" + testDstRel;
+
+    String nnUri = FileSystem.getDefaultUri(conf).toString();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    fs.mkdirs(new Path(testRoot));
+    fs.mkdirs(new Path(testSrc));
+    fs.mkdirs(new Path(testDst));
+    long chunkSize = 6;
+    createFiles(fs, testRoot, srcfiles, chunkSize);
+
+    String srcFileName = testRoot + Path.SEPARATOR + srcfiles[1].getPath();
+    Path srcfile = new Path(srcFileName);
+
+    if(!cluster.getFileSystem().exists(srcfile)){
+      throw new Exception("src not exist");
+    }
+
+    final long srcLen = fs.getFileStatus(srcfile).getLen();
+
+    FileStatus[] srcstats = getFileStatus(fs, testRoot, srcfiles);
+    for (int i = 0; i < srcfiles.length; i++) {
+      fs.setOwner(srcstats[i].getPath(), "u" + i, null);
+    }
+    String[] args = new String[] {
+        "-blocksperchunk",
+        String.valueOf(chunkSize),
+        nnUri + testSrc,
+        nnUri + testDst
+    };
+
+    LOG.info("_____ running distcp: " + args[0] + " " + args[1]);
+    ToolRunner.run(conf, new DistCp(), args);
+
+    String realTgtPath = testDst;
+    FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles);
+    assertEquals("File length should match", srcLen,
+        dststat[dststat.length - 1].getLen());
+
+    this.compareFiles(fs,  srcstats[srcstats.length-1],
+        dststat[dststat.length-1]);
+    deldir(fs, testRoot);
+  }
+
   @Test
   public void testPreserveUseNonEmptyDir() throws Exception {
     String testRoot = "/testdir." + getMethodName();
@@ -180,7 +487,6 @@ public class TestDistCpSystem {
     testPreserveUserHelper(testRoot, srcfiles, dstfiles, false, false, false);
   }
 
-
   @Test
   public void testPreserveUserEmptyDir() throws Exception {
     String testRoot = "/testdir." + getMethodName();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
index 35778d2..acffb76 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
@@ -407,7 +407,7 @@ public class TestOptionsParser {
         + "copyStrategy='uniformsize', preserveStatus=[], "
         + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
         + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
-        + "targetPathExists=true, filtersFile='null'}";
+        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}";
     String optionString = option.toString();
     Assert.assertEquals(val, optionString);
     Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49dc5472/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index 2e9a350..2452d6f 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -81,6 +81,10 @@ public class TestCopyCommitter {
   @Before
   public void createMetaFolder() {
     config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
+    // Unset listing file path since the config is shared by
+    // multiple tests, and some test doesn't set it, such as
+    // testNoCommitAction, but the distcp code will check it.
+    config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
     Path meta = new Path("/meta");
     try {
       cluster.getFileSystem().mkdirs(meta);
@@ -326,7 +330,6 @@ public class TestCopyCommitter {
       committer.commitJob(jobContext);
       Assert.assertFalse(fs.exists(new Path(workPath)));
       Assert.assertTrue(fs.exists(new Path(finalPath)));
-
     } catch (IOException e) {
       LOG.error("Exception encountered while testing for preserve status", e);
       Assert.fail("Atomic commit failure");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/4] hadoop git commit: HADOOP-14407. DistCp - Introduce a configurable copy buffer size. (Omkar Aradhya K S via Yongjun Zhang)

Posted by yj...@apache.org.
HADOOP-14407. DistCp - Introduce a configurable copy buffer size. (Omkar Aradhya K S via Yongjun Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd552a97
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd552a97
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd552a97

Branch: refs/heads/branch-2
Commit: dd552a97b7633a50055dc1529b52a276e1f1af0e
Parents: 49dc547
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Fri May 19 21:11:38 2017 -0700
Committer: Yongjun Zhang <yz...@cloudera.com>
Committed: Wed May 24 19:05:40 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/tools/DistCpConstants.java    |  6 +++
 .../apache/hadoop/tools/DistCpOptionSwitch.java |  8 ++++
 .../org/apache/hadoop/tools/DistCpOptions.java  | 21 +++++++++-
 .../org/apache/hadoop/tools/OptionsParser.java  | 27 ++++++++++++-
 .../tools/mapred/RetriableFileCopyCommand.java  | 11 +++--
 .../src/site/markdown/DistCp.md.vm              |  1 +
 .../apache/hadoop/tools/TestDistCpOptions.java  |  3 +-
 .../apache/hadoop/tools/TestOptionsParser.java  | 42 +++++++++++++++++++-
 8 files changed, 110 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd552a97/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 6cea583..0541b75 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -113,6 +113,10 @@ public class DistCpConstants {
   /* DistCp CopyListing class override param */
   public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
 
+  /* DistCp Copy Buffer Size */
+  public static final String CONF_LABEL_COPY_BUFFER_SIZE =
+      "distcp.copy.buffer.size";
+
   /**
    * Conf label for SSL Trust-store location.
    */
@@ -157,4 +161,6 @@ public class DistCpConstants {
   public static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw";
 
   static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
+
+  public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd552a97/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index e76a48e..49ec035 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -189,6 +189,14 @@ public enum DistCpOptionSwitch {
           + "system implements concat method")),
 
   /**
+   * Configurable copy buffer size.
+   */
+  COPY_BUFFER_SIZE(DistCpConstants.CONF_LABEL_COPY_BUFFER_SIZE,
+      new Option("copybuffersize", true, "Size of the copy buffer to use. "
+          + "By default <copybuffersize> is "
+          + DistCpConstants.COPY_BUFFER_SIZE_DEFAULT + "B.")),
+
+  /**
    * Specify bandwidth per map in MB
    */
   BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd552a97/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 2efb96b..b3c843f 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -104,6 +104,11 @@ public class DistCpOptions {
   // to copy in parallel. Default is 0 and file are not splitted.
   private int blocksPerChunk = 0;
 
+  /**
+   * The copyBufferSize to use in RetriableFileCopyCommand
+   */
+  private int copyBufferSize = DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
+
   public static enum FileAttribute{
     REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES;
 
@@ -174,6 +179,7 @@ public class DistCpOptions {
       this.targetPathExists = that.getTargetPathExists();
       this.filtersFile = that.getFiltersFile();
       this.blocksPerChunk = that.blocksPerChunk;
+      this.copyBufferSize = that.copyBufferSize;
     }
   }
 
@@ -464,7 +470,7 @@ public class DistCpOptions {
   }
 
   /**
-   * Checks if the input attribute should be preserved or not
+   * Checks if the input attribute should be preserved or not.
    *
    * @param attribute - Attribute to check
    * @return True if attribute should be preserved, false otherwise
@@ -640,6 +646,16 @@ public class DistCpOptions {
     return blocksPerChunk > 0;
   }
 
+  public final void setCopyBufferSize(int newCopyBufferSize) {
+    this.copyBufferSize =
+        newCopyBufferSize > 0 ? newCopyBufferSize
+            : DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
+  }
+
+  public int getCopyBufferSize() {
+    return this.copyBufferSize;
+  }
+
   public void validate(DistCpOptionSwitch option, boolean value) {
 
     boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
@@ -736,6 +752,8 @@ public class DistCpOptions {
     }
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
         String.valueOf(blocksPerChunk));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.COPY_BUFFER_SIZE,
+        String.valueOf(copyBufferSize));
   }
 
   /**
@@ -773,6 +791,7 @@ public class DistCpOptions {
         ", targetPathExists=" + targetPathExists +
         ", filtersFile='" + filtersFile + '\'' +
         ", blocksPerChunk=" + blocksPerChunk +
+        ", copyBufferSize=" + copyBufferSize +
         '}';
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd552a97/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index c68102d..868ae73 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -186,6 +186,8 @@ public class OptionsParser {
 
     parseBlocksPerChunk(command, option);
 
+    parseCopyBufferSize(command, option);
+
     return option;
   }
 
@@ -221,8 +223,29 @@ public class OptionsParser {
   }
 
   /**
-   * parseSizeLimit is a helper method for parsing the deprecated
-   * argument SIZE_LIMIT.
+   * A helper method to parse copyBufferSize.
+   *
+   * @param command command line arguments
+   */
+  private static void parseCopyBufferSize(CommandLine command,
+      DistCpOptions option) {
+    if (command.hasOption(DistCpOptionSwitch.COPY_BUFFER_SIZE.getSwitch())) {
+      String copyBufferSizeStr =
+          getVal(command, DistCpOptionSwitch.COPY_BUFFER_SIZE.getSwitch()
+              .trim());
+      try {
+        int copyBufferSize = Integer.parseInt(copyBufferSizeStr);
+        option.setCopyBufferSize(copyBufferSize);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("copyBufferSize is invalid: "
+            + copyBufferSizeStr, e);
+      }
+    }
+  }
+
+  /**
+   * parseSizeLimit is a helper method for parsing the deprecated argument
+   * SIZE_LIMIT.
    *
    * @param command command line arguments
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd552a97/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 58a51af..ddf2725 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptionSwitch;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
 import org.apache.hadoop.tools.util.DistCpUtils;
@@ -53,7 +54,6 @@ import com.google.common.annotations.VisibleForTesting;
 public class RetriableFileCopyCommand extends RetriableCommand {
 
   private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
-  private static int BUFFER_SIZE = 8 * 1024;
   private boolean skipCrc = false;
   private FileAction action;
 
@@ -169,6 +169,9 @@ public class RetriableFileCopyCommand extends RetriableCommand {
       throws IOException {
     FsPermission permission = FsPermission.getFileDefault().applyUMask(
         FsPermission.getUMask(targetFS.getConf()));
+    int copyBufferSize = context.getConfiguration().getInt(
+        DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(),
+        DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
     final OutputStream outStream;
     if (action == FileAction.OVERWRITE) {
       final short repl = getReplicationFactor(fileAttributes, source,
@@ -177,14 +180,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
           targetFS, targetPath);
       FSDataOutputStream out = targetFS.create(targetPath, permission,
           EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
-          BUFFER_SIZE, repl, blockSize, context,
+          copyBufferSize, repl, blockSize, context,
           getChecksumOpt(fileAttributes, sourceChecksum));
       outStream = new BufferedOutputStream(out);
     } else {
       outStream = new BufferedOutputStream(targetFS.append(targetPath,
-          BUFFER_SIZE));
+          copyBufferSize));
     }
-    return copyBytes(source, sourceOffset, outStream, BUFFER_SIZE,
+    return copyBytes(source, sourceOffset, outStream, copyBufferSize,
         context);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd552a97/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index e6cff10..32c71dd 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -240,6 +240,7 @@ Flag              | Description                          | Notes
 `-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads.
 `-skipcrccheck` | Whether to skip CRC checks between source and target paths. |
 `-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
+`-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B |
 
 Architecture of DistCp
 ----------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd552a97/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
index df36c26..de75e7d 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
@@ -312,7 +312,8 @@ public class TestDistCpOptions {
         + "copyStrategy='uniformsize', preserveStatus=[], "
         + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
         + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
-        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}";
+        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0, "
+        + "copyBufferSize=8192}";
     String optionString = option.toString();
     Assert.assertEquals(val, optionString);
     Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd552a97/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
index acffb76..d1ef56a 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
@@ -407,7 +407,8 @@ public class TestOptionsParser {
         + "copyStrategy='uniformsize', preserveStatus=[], "
         + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
         + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
-        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}";
+        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0, "
+        + "copyBufferSize=8192}";
     String optionString = option.toString();
     Assert.assertEquals(val, optionString);
     Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
@@ -773,4 +774,43 @@ public class TestOptionsParser {
         "hdfs://localhost:8020/target/"});
     Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt");
   }
+
+  @Test
+  public void testParseCopyBufferSize() {
+    DistCpOptions options =
+        OptionsParser.parse(new String[] {
+            "hdfs://localhost:8020/source/first",
+            "hdfs://localhost:8020/target/" });
+    Assert.assertEquals(options.getCopyBufferSize(),
+        DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
+
+    options =
+        OptionsParser.parse(new String[] { "-copybuffersize", "0",
+            "hdfs://localhost:8020/source/first",
+            "hdfs://localhost:8020/target/" });
+    Assert.assertEquals(options.getCopyBufferSize(),
+        DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
+
+    options =
+        OptionsParser.parse(new String[] { "-copybuffersize", "-1",
+            "hdfs://localhost:8020/source/first",
+            "hdfs://localhost:8020/target/" });
+    Assert.assertEquals(options.getCopyBufferSize(),
+        DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
+
+    options =
+        OptionsParser.parse(new String[] { "-copybuffersize", "4194304",
+            "hdfs://localhost:8020/source/first",
+            "hdfs://localhost:8020/target/" });
+    Assert.assertEquals(options.getCopyBufferSize(), 4194304);
+
+    try {
+      OptionsParser
+          .parse(new String[] { "-copybuffersize", "hello",
+              "hdfs://localhost:8020/source/first",
+              "hdfs://localhost:8020/target/" });
+      Assert.fail("Non numberic copybuffersize parsed successfully!");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org