You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/05/22 20:50:21 UTC

svn commit: r1596937 - in /hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src: main/java/org/apache/hadoop/tools/ main/java/org/apache/hadoop/tools/mapred/ main/java/org/apache/hadoop/tools/util/ test/java/org/apache/hadoop/tools/ test/java...

Author: jing9
Date: Thu May 22 18:50:20 2014
New Revision: 1596937

URL: http://svn.apache.org/r1596937
Log:
MAPREDUCE-5899. Merge change r1596931 from trunk.

Modified:
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
    hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java Thu May 22 18:50:20 2014
@@ -50,6 +50,7 @@ public class DistCpConstants {
   public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
   public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
   public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
+  public static final String CONF_LABEL_APPEND = "distcp.copy.append";
   public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
   
   public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java Thu May 22 18:50:20 2014
@@ -138,6 +138,10 @@ public enum DistCpOptionSwitch {
       new Option("overwrite", false, "Choose to overwrite target files " +
           "unconditionally, even if they exist.")),
 
+  APPEND(DistCpConstants.CONF_LABEL_APPEND,
+      new Option("append", false,
+          "Reuse existing data in target files and append new data to them if possible")),
+
   /**
    * Should DisctpExecution be blocking
    */

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java Thu May 22 18:50:20 2014
@@ -39,6 +39,7 @@ public class DistCpOptions {
   private boolean deleteMissing = false;
   private boolean ignoreFailures = false;
   private boolean overwrite = false;
+  private boolean append = false;
   private boolean skipCRC = false;
   private boolean blocking = true;
 
@@ -245,6 +246,22 @@ public class DistCpOptions {
   }
 
   /**
+   * @return whether we can append new data to target files
+   */
+  public boolean shouldAppend() {
+    return append;
+  }
+
+  /**
+   * Set if we want to append new data to target files. This is valid only with
+   * update option and CRC is not skipped.
+   */
+  public void setAppend(boolean append) {
+    validate(DistCpOptionSwitch.APPEND, append);
+    this.append = append;
+  }
+
+  /**
    * Should CRC/checksum check be skipped while checking files are identical
    *
    * @return true if checksum check should be skipped while checking files are
@@ -472,6 +489,7 @@ public class DistCpOptions {
         value : this.atomicCommit);
     boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
         value : this.skipCRC);
+    boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append);
 
     if (syncFolder && atomicCommit) {
       throw new IllegalArgumentException("Atomic commit can't be used with " +
@@ -492,6 +510,14 @@ public class DistCpOptions {
       throw new IllegalArgumentException("Skip CRC is valid only with update options");
     }
 
+    if (!syncFolder && append) {
+      throw new IllegalArgumentException(
+          "Append is valid only with update options");
+    }
+    if (skipCRC && append) {
+      throw new IllegalArgumentException(
+          "Append is disallowed when skipping CRC");
+    }
   }
 
   /**
@@ -510,6 +536,8 @@ public class DistCpOptions {
         String.valueOf(deleteMissing));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
         String.valueOf(overwrite));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
+        String.valueOf(append));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
         String.valueOf(skipCRC));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java Thu May 22 18:50:20 2014
@@ -140,6 +140,10 @@ public class OptionsParser {
       option.setOverwrite(true);
     }
 
+    if (command.hasOption(DistCpOptionSwitch.APPEND.getSwitch())) {
+      option.setAppend(true);
+    }
+
     if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
       option.setDeleteMissing(true);
     }

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java Thu May 22 18:50:20 2014
@@ -18,13 +18,20 @@
 
 package org.apache.hadoop.tools.mapred;
 
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.EnumSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -36,11 +43,6 @@ import org.apache.hadoop.tools.DistCpOpt
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.util.StringUtils;
 
-import java.io.*;
-import java.util.EnumSet;
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * Mapper class that executes the DistCp copy operation.
  * Implements the o.a.h.mapreduce.Mapper<> interface.
@@ -62,6 +64,15 @@ public class CopyMapper extends Mapper<T
     BYTESSKIPPED, // Number of bytes that were skipped from copy.
   }
 
+  /**
+   * Indicate the action for each file
+   */
+  static enum FileAction {
+    SKIP,         // Skip copying the file since it's already in the target FS
+    APPEND,       // Only need to append new data to the file in the target FS 
+    OVERWRITE,    // Overwrite the whole file
+  }
+
   private static Log LOG = LogFactory.getLog(CopyMapper.class);
 
   private Configuration conf;
@@ -70,6 +81,7 @@ public class CopyMapper extends Mapper<T
   private boolean ignoreFailures = false;
   private boolean skipCrc = false;
   private boolean overWrite = false;
+  private boolean append = false;
   private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
 
   private FileSystem targetFS = null;
@@ -90,6 +102,7 @@ public class CopyMapper extends Mapper<T
     ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
     skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
     overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
+    append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
     preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
         PRESERVE_STATUS.getConfigLabel()));
 
@@ -224,20 +237,19 @@ public class CopyMapper extends Mapper<T
         return;
       }
 
-      if (skipFile(sourceFS, sourceCurrStatus, target)) {
+      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
+      if (action == FileAction.SKIP) {
         LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
                  + " to " + target);
         updateSkipCounters(context, sourceCurrStatus);
         context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
-      }
-      else {
+      } else {
         copyFileWithRetry(description, sourceCurrStatus, target, context,
-                          fileAttributes);
+            action, fileAttributes);
       }
 
       DistCpUtils.preserve(target.getFileSystem(conf), target,
                            sourceCurrStatus, fileAttributes);
-
     } catch (IOException exception) {
       handleFailures(exception, sourceFileStatus, target, context);
     }
@@ -254,14 +266,14 @@ public class CopyMapper extends Mapper<T
     return DistCpUtils.unpackAttributes(attributeString);
   }
 
-  private void copyFileWithRetry(String description, FileStatus sourceFileStatus,
-               Path target, Context context,
-               EnumSet<DistCpOptions.FileAttribute> fileAttributes) throws IOException {
-
+  private void copyFileWithRetry(String description,
+      FileStatus sourceFileStatus, Path target, Context context,
+      FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
+      throws IOException {
     long bytesCopied;
     try {
-      bytesCopied = (Long)new RetriableFileCopyCommand(skipCrc, description)
-                       .execute(sourceFileStatus, target, context, fileAttributes);
+      bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
+          action).execute(sourceFileStatus, target, context, fileAttributes);
     } catch (Exception e) {
       context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
       throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
@@ -311,25 +323,48 @@ public class CopyMapper extends Mapper<T
     context.getCounter(counter).increment(value);
   }
 
-  private boolean skipFile(FileSystem sourceFS, FileStatus source, Path target)
-                                          throws IOException {
-    return     targetFS.exists(target)
-            && !overWrite
-            && !mustUpdate(sourceFS, source, target);
-  }
-
-  private boolean mustUpdate(FileSystem sourceFS, FileStatus source, Path target)
-                                    throws IOException {
-    final FileStatus targetFileStatus = targetFS.getFileStatus(target);
-
-    return     syncFolders
-            && (
-                   targetFileStatus.getLen() != source.getLen()
-                || (!skipCrc &&
-                       !DistCpUtils.checksumsAreEqual(sourceFS,
-                          source.getPath(), null, targetFS, target))
-                || (source.getBlockSize() != targetFileStatus.getBlockSize() &&
-                      preserve.contains(FileAttribute.BLOCKSIZE))
-               );
+  private FileAction checkUpdate(FileSystem sourceFS, FileStatus source,
+      Path target) throws IOException {
+    final FileStatus targetFileStatus;
+    try {
+      targetFileStatus = targetFS.getFileStatus(target);
+    } catch (FileNotFoundException e) {
+      return FileAction.OVERWRITE;
+    }
+    if (targetFileStatus != null && !overWrite) {
+      if (canSkip(sourceFS, source, targetFileStatus)) {
+        return FileAction.SKIP;
+      } else if (append) {
+        long targetLen = targetFileStatus.getLen();
+        if (targetLen < source.getLen()) {
+          FileChecksum sourceChecksum = sourceFS.getFileChecksum(
+              source.getPath(), targetLen);
+          if (sourceChecksum != null
+              && sourceChecksum.equals(targetFS.getFileChecksum(target))) {
+            // We require that the checksum is not null. Thus currently only
+            // DistributedFileSystem is supported
+            return FileAction.APPEND;
+          }
+        }
+      }
+    }
+    return FileAction.OVERWRITE;
+  }
+
+  private boolean canSkip(FileSystem sourceFS, FileStatus source, 
+      FileStatus target) throws IOException {
+    if (!syncFolders) {
+      return true;
+    }
+    boolean sameLength = target.getLen() == source.getLen();
+    boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
+        || !preserve.contains(FileAttribute.BLOCKSIZE);
+    if (sameLength && sameBlockSize) {
+      return skipCrc ||
+          DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
+              targetFS, target.getPath());
+    } else {
+      return false;
+    }
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java Thu May 22 18:50:20 2014
@@ -18,10 +18,8 @@
 
 package org.apache.hadoop.tools.mapred;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.EnumSet;
 
@@ -29,6 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.tools.util.RetriableCommand;
 import org.apache.hadoop.tools.util.ThrottledInputStream;
@@ -54,13 +55,15 @@ public class RetriableFileCopyCommand ex
   private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
   private static int BUFFER_SIZE = 8 * 1024;
   private boolean skipCrc = false;
+  private FileAction action;
 
   /**
    * Constructor, taking a description of the action.
    * @param description Verbose description of the copy operation.
    */
-  public RetriableFileCopyCommand(String description) {
+  public RetriableFileCopyCommand(String description, FileAction action) {
     super(description);
+    this.action = action;
   }
 
   /**
@@ -68,9 +71,11 @@ public class RetriableFileCopyCommand ex
    *
    * @param skipCrc Whether to skip the crc check.
    * @param description A verbose description of the copy operation.
+   * @param action We should overwrite the target file or append new data to it.
    */
-  public RetriableFileCopyCommand(boolean skipCrc, String description) {
-    this(description);
+  public RetriableFileCopyCommand(boolean skipCrc, String description,
+      FileAction action) {
+    this(description, action);
     this.skipCrc = skipCrc;
   }
 
@@ -96,18 +101,17 @@ public class RetriableFileCopyCommand ex
   }
 
   private long doCopy(FileStatus sourceFileStatus, Path target,
-                      Mapper.Context context,
-                      EnumSet<FileAttribute> fileAttributes)
-          throws IOException {
-
-    Path tmpTargetPath = getTmpFile(target, context);
+      Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
+      throws IOException {
+    final boolean toAppend = action == FileAction.APPEND;
+    Path targetPath = toAppend ? target : getTmpFile(target, context);
     final Configuration configuration = context.getConfiguration();
     FileSystem targetFS = target.getFileSystem(configuration);
 
     try {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
-        LOG.debug("Tmp-file path: " + tmpTargetPath);
+        LOG.debug("Target file path: " + targetPath);
       }
       final Path sourcePath = sourceFileStatus.getPath();
       final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
@@ -115,22 +119,31 @@ public class RetriableFileCopyCommand ex
           .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
           .getFileChecksum(sourcePath) : null;
 
-      long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
-          context, fileAttributes, sourceChecksum);
+      final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
+          target).getLen() : 0;
+      long bytesRead = copyToFile(targetPath, targetFS, sourceFileStatus,
+          offset, context, fileAttributes, sourceChecksum);
 
-      compareFileLengths(sourceFileStatus, tmpTargetPath, configuration,
-          bytesRead);
+      compareFileLengths(sourceFileStatus, targetPath, configuration, bytesRead
+          + offset);
       //At this point, src&dest lengths are same. if length==0, we skip checksum
       if ((bytesRead != 0) && (!skipCrc)) {
         compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
-            targetFS, tmpTargetPath);
+            targetFS, targetPath);
+      }
+      // it's not append case, thus we first write to a temporary file, rename
+      // it to the target path.
+      if (!toAppend) {
+        promoteTmpToTarget(targetPath, target, targetFS);
       }
-      promoteTmpToTarget(tmpTargetPath, target, targetFS);
       return bytesRead;
-
     } finally {
-      if (targetFS.exists(tmpTargetPath))
-        targetFS.delete(tmpTargetPath, false);
+      // note that for append case, it is possible that we append partial data
+      // and then fail. In that case, for the next retry, we either reuse the
+      // partial appended data if it is good or we overwrite the whole file
+      if (!toAppend && targetFS.exists(targetPath)) {
+        targetFS.delete(targetPath, false);
+      }
     }
   }
 
@@ -147,29 +160,37 @@ public class RetriableFileCopyCommand ex
     return null;
   }
 
-  private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
-      FileStatus sourceFileStatus, Mapper.Context context,
+  private long copyToFile(Path targetPath, FileSystem targetFS,
+      FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context,
       EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
       throws IOException {
     FsPermission permission = FsPermission.getFileDefault().applyUMask(
         FsPermission.getUMask(targetFS.getConf()));
-    OutputStream outStream = new BufferedOutputStream(
-        targetFS.create(tmpTargetPath, permission,
-            EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE,
-            getReplicationFactor(fileAttributes, sourceFileStatus, targetFS,
-                tmpTargetPath),
-            getBlockSize(fileAttributes, sourceFileStatus, targetFS,
-                tmpTargetPath),
-            context, getChecksumOpt(fileAttributes, sourceChecksum)));
-    return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
+    final OutputStream outStream;
+    if (action == FileAction.OVERWRITE) {
+      final short repl = getReplicationFactor(fileAttributes, sourceFileStatus,
+          targetFS, targetPath);
+      final long blockSize = getBlockSize(fileAttributes, sourceFileStatus,
+          targetFS, targetPath);
+      FSDataOutputStream out = targetFS.create(targetPath, permission,
+          EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+          BUFFER_SIZE, repl, blockSize, context,
+          getChecksumOpt(fileAttributes, sourceChecksum));
+      outStream = new BufferedOutputStream(out);
+    } else {
+      outStream = new BufferedOutputStream(targetFS.append(targetPath,
+          BUFFER_SIZE));
+    }
+    return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE,
+        context);
   }
 
   private void compareFileLengths(FileStatus sourceFileStatus, Path target,
-                                  Configuration configuration, long bytesRead)
+                                  Configuration configuration, long targetLen)
                                   throws IOException {
     final Path sourcePath = sourceFileStatus.getPath();
     FileSystem fs = sourcePath.getFileSystem(configuration);
-    if (fs.getFileStatus(sourcePath).getLen() != bytesRead)
+    if (fs.getFileStatus(sourcePath).getLen() != targetLen)
       throw new IOException("Mismatch in length of source:" + sourcePath
                 + " and target:" + target);
   }
@@ -215,8 +236,8 @@ public class RetriableFileCopyCommand ex
   }
 
   @VisibleForTesting
-  long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
-                         int bufferSize, Mapper.Context context)
+  long copyBytes(FileStatus sourceFileStatus, long sourceOffset,
+      OutputStream outStream, int bufferSize, Mapper.Context context)
       throws IOException {
     Path source = sourceFileStatus.getPath();
     byte buf[] = new byte[bufferSize];
@@ -225,19 +246,21 @@ public class RetriableFileCopyCommand ex
 
     try {
       inStream = getInputStream(source, context.getConfiguration());
-      int bytesRead = readBytes(inStream, buf);
+      int bytesRead = readBytes(inStream, buf, sourceOffset);
       while (bytesRead >= 0) {
         totalBytesRead += bytesRead;
+        if (action == FileAction.APPEND) {
+          sourceOffset += bytesRead;
+        }
         outStream.write(buf, 0, bytesRead);
         updateContextStatus(totalBytesRead, context, sourceFileStatus);
-        bytesRead = inStream.read(buf);
+        bytesRead = readBytes(inStream, buf, sourceOffset);
       }
       outStream.close();
       outStream = null;
     } finally {
       IOUtils.cleanup(LOG, outStream, inStream);
     }
-
     return totalBytesRead;
   }
 
@@ -254,24 +277,27 @@ public class RetriableFileCopyCommand ex
     context.setStatus(message.toString());
   }
 
-  private static int readBytes(InputStream inStream, byte buf[])
-          throws IOException {
+  private static int readBytes(ThrottledInputStream inStream, byte buf[],
+      long position) throws IOException {
     try {
-      return inStream.read(buf);
-    }
-    catch (IOException e) {
+      if (position == 0) {
+        return inStream.read(buf);
+      } else {
+        return inStream.read(position, buf, 0, buf.length);
+      }
+    } catch (IOException e) {
       throw new CopyReadException(e);
     }
   }
 
-  private static ThrottledInputStream getInputStream(Path path, Configuration conf)
-          throws IOException {
+  private static ThrottledInputStream getInputStream(Path path,
+      Configuration conf) throws IOException {
     try {
       FileSystem fs = path.getFileSystem(conf);
       long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
               DistCpConstants.DEFAULT_BANDWIDTH_MB);
-      return new ThrottledInputStream(new BufferedInputStream(fs.open(path)),
-              bandwidthMB * 1024 * 1024);
+      FSDataInputStream in = fs.open(path);
+      return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
     }
     catch (IOException e) {
       throw new CopyReadException(e);

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java Thu May 22 18:50:20 2014
@@ -21,6 +21,11 @@ package org.apache.hadoop.tools.util;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+
+import com.google.common.base.Preconditions;
+
 /**
  * The ThrottleInputStream provides bandwidth throttling on a specified
  * InputStream. It is implemented as a wrapper on top of another InputStream
@@ -90,6 +95,25 @@ public class ThrottledInputStream extend
     return readLen;
   }
 
+  /**
+   * Read bytes starting from the specified position. This requires rawStream is
+   * an instance of {@link PositionedReadable}.
+   */
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    if (!(rawStream instanceof PositionedReadable)) {
+      throw new UnsupportedOperationException(
+          "positioned read is not supported by the internal stream");
+    }
+    throttle();
+    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
+        offset, length);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
   private void throttle() throws IOException {
     if (getBytesPerSec() > maxBytesPerSec) {
       try {

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java Thu May 22 18:50:20 2014
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.tools;
 
+import static org.junit.Assert.fail;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tools.DistCpOptions.*;
 import org.apache.hadoop.conf.Configuration;
 
@@ -554,4 +557,45 @@ public class TestOptionsParser {
     Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
     Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
   }
+
+  @Test
+  public void testAppendOption() {
+    Configuration conf = new Configuration();
+    Assert.assertFalse(conf.getBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+    Assert.assertFalse(conf.getBoolean(
+        DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+    DistCpOptions options = OptionsParser.parse(new String[] { "-update",
+        "-append", "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/" });
+    options.appendToConf(conf);
+    Assert.assertTrue(conf.getBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+    Assert.assertTrue(conf.getBoolean(
+        DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+    // make sure -append is only valid when -update is specified
+    try {
+      options = OptionsParser.parse(new String[] { "-append",
+              "hdfs://localhost:8020/source/first",
+              "hdfs://localhost:8020/target/" });
+      fail("Append should fail if update option is not specified");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Append is valid only with update options", e);
+    }
+
+    // make sure -append is invalid when skipCrc is specified
+    try {
+      options = OptionsParser.parse(new String[] {
+          "-append", "-update", "-skipcrccheck",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/" });
+      fail("Append should fail if skipCrc option is specified");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Append is disallowed when skipping CRC", e);
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Thu May 22 18:50:20 2014
@@ -25,11 +25,13 @@ import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
@@ -118,6 +120,16 @@ public class TestCopyMapper {
     touchFile(SOURCE_PATH + "/7/8/9");
   }
 
+  private static void appendSourceData() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    for (Path source : pathList) {
+      if (fs.getFileStatus(source).isFile()) {
+        // append 2048 bytes per file
+        appendFile(source, DEFAULT_FILE_SIZE * 2);
+      }
+    }
+  }
+
   private static void createSourceDataWithDifferentBlockSize() throws Exception {
     mkdirs(SOURCE_PATH + "/1");
     mkdirs(SOURCE_PATH + "/2");
@@ -201,85 +213,132 @@ public class TestCopyMapper {
     }
   }
 
+  /**
+   * Append specified length of bytes to a given file
+   */
+  private static void appendFile(Path p, int length) throws IOException {
+    byte[] toAppend = new byte[length];
+    Random random = new Random();
+    random.nextBytes(toAppend);
+    FSDataOutputStream out = cluster.getFileSystem().append(p);
+    try {
+      out.write(toAppend);
+    } finally {
+      IOUtils.closeStream(out);
+    }
+  }
+
   @Test
   public void testCopyWithDifferentChecksumType() throws Exception {
     testCopy(true);
   }
 
   @Test(timeout=40000)
-  public void testRun() {
+  public void testRun() throws Exception {
     testCopy(false);
   }
 
-  private void testCopy(boolean preserveChecksum) {
-    try {
-      deleteState();
-      if (preserveChecksum) {
-        createSourceDataWithDifferentChecksumType();
-      } else {
-        createSourceData();
-      }
+  @Test
+  public void testCopyWithAppend() throws Exception {
+    final FileSystem fs = cluster.getFileSystem();
+    // do the first distcp
+    testCopy(false);
+    // start appending data to source
+    appendSourceData();
 
-      FileSystem fs = cluster.getFileSystem();
-      CopyMapper copyMapper = new CopyMapper();
-      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
-              = stubContext.getContext();
+    // do the distcp again with -update and -append option
+    CopyMapper copyMapper = new CopyMapper();
+    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+        stubContext.getContext();
+    // Enable append 
+    context.getConfiguration().setBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), true);
+    copyMapper.setup(context);
+    for (Path path: pathList) {
+      copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+              new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
+                  path)), context);
+    }
+
+    verifyCopy(fs, false);
+    // verify that we only copied new appended data
+    Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
+        .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+        .getValue());
+    Assert.assertEquals(pathList.size(), stubContext.getReporter().
+        getCounter(CopyMapper.Counter.COPY).getValue());
+  }
+
+  private void testCopy(boolean preserveChecksum) throws Exception {
+    deleteState();
+    if (preserveChecksum) {
+      createSourceDataWithDifferentChecksumType();
+    } else {
+      createSourceData();
+    }
 
-      Configuration configuration = context.getConfiguration();
-      EnumSet<DistCpOptions.FileAttribute> fileAttributes
-              = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
-      if (preserveChecksum) {
-        fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
-      }
-      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
-              DistCpUtils.packAttributes(fileAttributes));
+    FileSystem fs = cluster.getFileSystem();
+    CopyMapper copyMapper = new CopyMapper();
+    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
+            = stubContext.getContext();
+
+    Configuration configuration = context.getConfiguration();
+    EnumSet<DistCpOptions.FileAttribute> fileAttributes
+            = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
+    if (preserveChecksum) {
+      fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
+    }
+    configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+            DistCpUtils.packAttributes(fileAttributes));
 
-      copyMapper.setup(context);
+    copyMapper.setup(context);
 
-      for (Path path: pathList) {
-        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
-                new CopyListingFileStatus(fs.getFileStatus(path)), context);
-      }
+    for (Path path: pathList) {
+      copyMapper.map(
+          new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+          new CopyListingFileStatus(fs.getFileStatus(path)), context);
+    }
 
-      // Check that the maps worked.
-      for (Path path : pathList) {
-        final Path targetPath = new Path(path.toString()
-                .replaceAll(SOURCE_PATH, TARGET_PATH));
-        Assert.assertTrue(fs.exists(targetPath));
-        Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
-        FileStatus sourceStatus = fs.getFileStatus(path);
-        FileStatus targetStatus = fs.getFileStatus(targetPath);
-        Assert.assertEquals(sourceStatus.getReplication(),
-            targetStatus.getReplication());
-        if (preserveChecksum) {
-          Assert.assertEquals(sourceStatus.getBlockSize(),
-              targetStatus.getBlockSize());
-        }
-        Assert.assertTrue(!fs.isFile(targetPath)
-            || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
-      }
-
-      Assert.assertEquals(pathList.size(),
-              stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
-      if (!preserveChecksum) {
-        Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
-            .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
-            .getValue());
-      } else {
-        Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
-            .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
-            .getValue());
-      }
-
-      testCopyingExistingFiles(fs, copyMapper, context);
-      for (Text value : stubContext.getWriter().values()) {
-        Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
-      }
+    // Check that the maps worked.
+    verifyCopy(fs, preserveChecksum);
+    Assert.assertEquals(pathList.size(), stubContext.getReporter()
+        .getCounter(CopyMapper.Counter.COPY).getValue());
+    if (!preserveChecksum) {
+      Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
+          .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+          .getValue());
+    } else {
+      Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
+          .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+          .getValue());
     }
-    catch (Exception e) {
-      LOG.error("Unexpected exception: ", e);
-      Assert.assertTrue(false);
+
+    testCopyingExistingFiles(fs, copyMapper, context);
+    for (Text value : stubContext.getWriter().values()) {
+      Assert.assertTrue(value.toString() + " is not skipped", value
+          .toString().startsWith("SKIP:"));
+    }
+  }
+
+  private void verifyCopy(FileSystem fs, boolean preserveChecksum)
+      throws Exception {
+    for (Path path : pathList) {
+      final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH,
+          TARGET_PATH));
+      Assert.assertTrue(fs.exists(targetPath));
+      Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+      FileStatus sourceStatus = fs.getFileStatus(path);
+      FileStatus targetStatus = fs.getFileStatus(targetPath);
+      Assert.assertEquals(sourceStatus.getReplication(),
+          targetStatus.getReplication());
+      if (preserveChecksum) {
+        Assert.assertEquals(sourceStatus.getBlockSize(),
+            targetStatus.getBlockSize());
+      }
+      Assert.assertTrue(!fs.isFile(targetPath)
+          || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java (original)
+++ hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java Thu May 22 18:50:20 2014
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
 import org.junit.Test;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -48,8 +49,8 @@ public class TestRetriableFileCopyComman
     
     Exception actualEx = null;
     try {
-      new RetriableFileCopyCommand("testFailOnCloseError")
-        .copyBytes(stat, out, 512, context);
+      new RetriableFileCopyCommand("testFailOnCloseError", FileAction.OVERWRITE)
+        .copyBytes(stat, 0, out, 512, context);
     } catch (Exception e) {
       actualEx = e;
     }