You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:07 UTC

svn commit: r1619012 [2/3] - in /hadoop/common/branches/HADOOP-10388: ./ hadoop-assemblies/src/main/resources/assemblies/ hadoop-client/ hadoop-dist/ hadoop-maven-plugins/ hadoop-maven-plugins/src/main/java/org/apache/hadoop/maven/plugin/protoc/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java Tue Aug 19 23:49:39 2014
@@ -23,11 +23,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -44,7 +44,8 @@ import java.util.ArrayList;
  * that the total-number of bytes to be copied for each input split is
  * uniform.
  */
-public class UniformSizeInputFormat extends InputFormat<Text, FileStatus> {
+public class UniformSizeInputFormat
+    extends InputFormat<Text, CopyListingFileStatus> {
   private static final Log LOG
                 = LogFactory.getLog(UniformSizeInputFormat.class);
 
@@ -76,7 +77,7 @@ public class UniformSizeInputFormat exte
     List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
     long nBytesPerSplit = (long) Math.ceil(totalSizeBytes * 1.0 / numSplits);
 
-    FileStatus srcFileStatus = new FileStatus();
+    CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
     Text srcRelPath = new Text();
     long currentSplitSize = 0;
     long lastSplitStart = 0;
@@ -161,9 +162,9 @@ public class UniformSizeInputFormat exte
    * @throws InterruptedException
    */
   @Override
-  public RecordReader<Text, FileStatus> createRecordReader(InputSplit split,
-                                                     TaskAttemptContext context)
-                                      throws IOException, InterruptedException {
-    return new SequenceFileRecordReader<Text, FileStatus>();
+  public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+      InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
@@ -90,7 +91,7 @@ class DynamicInputChunk<K, V> {
   private void openForWrite() throws IOException {
     writer = SequenceFile.createWriter(
             chunkFilePath.getFileSystem(configuration), configuration,
-            chunkFilePath, Text.class, FileStatus.class,
+            chunkFilePath, Text.class, CopyListingFileStatus.class,
             SequenceFile.CompressionType.NONE);
 
   }
@@ -117,7 +118,7 @@ class DynamicInputChunk<K, V> {
    * @param value Corresponding value from the listing file.
    * @throws IOException Exception onf failure to write to the file.
    */
-  public void write(Text key, FileStatus value) throws IOException {
+  public void write(Text key, CopyListingFileStatus value) throws IOException {
     writer.append(key, value);
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java Tue Aug 19 23:49:39 2014
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 
 import java.util.List;
 import java.util.ArrayList;
@@ -57,7 +57,7 @@ public class DynamicInputFormat<K, V> ex
           = "mapred.num.splits";
   private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
           = "mapred.num.entries.per.chunk";
-
+  
   /**
    * Implementation of InputFormat::getSplits(). This method splits up the
    * copy-listing file into chunks, and assigns the first batch to different
@@ -91,7 +91,7 @@ public class DynamicInputFormat<K, V> ex
           // Setting non-zero length for FileSplit size, to avoid a possible
           // future when 0-sized file-splits are considered "empty" and skipped
           // over.
-          MIN_RECORDS_PER_CHUNK,
+          getMinRecordsPerChunk(jobContext.getConfiguration()),
           null));
     }
     DistCpUtils.publish(jobContext.getConfiguration(),
@@ -107,9 +107,11 @@ public class DynamicInputFormat<K, V> ex
     final Configuration configuration = context.getConfiguration();
     int numRecords = getNumberOfRecords(configuration);
     int numMaps = getNumMapTasks(configuration);
+    int maxChunksTolerable = getMaxChunksTolerable(configuration);
+
     // Number of chunks each map will process, on average.
     int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
-    validateNumChunksUsing(splitRatio, numMaps);
+    validateNumChunksUsing(splitRatio, numMaps, maxChunksTolerable);
 
     int numEntriesPerChunk = (int)Math.ceil((float)numRecords
                                           /(splitRatio * numMaps));
@@ -131,7 +133,7 @@ public class DynamicInputFormat<K, V> ex
     
     List<DynamicInputChunk> chunksFinal = new ArrayList<DynamicInputChunk>();
 
-    FileStatus fileStatus = new FileStatus();
+    CopyListingFileStatus fileStatus = new CopyListingFileStatus();
     Text relPath = new Text();
     int recordCounter = 0;
     int chunkCount = 0;
@@ -168,9 +170,9 @@ public class DynamicInputFormat<K, V> ex
     return chunksFinal;
   }
 
-  private static void validateNumChunksUsing(int splitRatio, int numMaps)
-                                              throws IOException {
-    if (splitRatio * numMaps > MAX_CHUNKS_TOLERABLE)
+  private static void validateNumChunksUsing(int splitRatio, int numMaps,
+      int maxChunksTolerable) throws IOException {
+    if (splitRatio * numMaps > maxChunksTolerable)
       throw new IOException("Too many chunks created with splitRatio:"
                  + splitRatio + ", numMaps:" + numMaps
                  + ". Reduce numMaps or decrease split-ratio to proceed.");
@@ -238,14 +240,61 @@ public class DynamicInputFormat<K, V> ex
                                             int numMaps, int numPaths) {
     return configuration.getInt(
             CONF_LABEL_LISTING_SPLIT_RATIO,
-            getSplitRatio(numMaps, numPaths));
+            getSplitRatio(numMaps, numPaths, configuration));
+  }
+  
+  private static int getMaxChunksTolerable(Configuration conf) {
+    int maxChunksTolerable = conf.getInt(
+        DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE,
+        DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
+    if (maxChunksTolerable <= 0) {
+      LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE +
+          " should be positive. Fall back to default value: "
+          + DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
+      maxChunksTolerable = DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT;
+    }
+    return maxChunksTolerable;
+  }
+  
+  private static int getMaxChunksIdeal(Configuration conf) {
+    int maxChunksIdeal = conf.getInt(
+        DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL,
+        DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
+    if (maxChunksIdeal <= 0) {
+      LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL +
+          " should be positive. Fall back to default value: "
+          + DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
+      maxChunksIdeal = DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT;
+    }
+    return maxChunksIdeal;
+  }
+  
+  private static int getMinRecordsPerChunk(Configuration conf) {
+    int minRecordsPerChunk = conf.getInt(
+        DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK,
+        DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
+    if (minRecordsPerChunk <= 0) {
+      LOG.warn(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK +
+          " should be positive. Fall back to default value: "
+          + DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
+      minRecordsPerChunk = DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT;
+    }
+    return minRecordsPerChunk;
   }
 
-  private static final int MAX_CHUNKS_TOLERABLE = 400;
-  private static final int MAX_CHUNKS_IDEAL     = 100;
-  private static final int MIN_RECORDS_PER_CHUNK = 5;
-  private static final int SPLIT_RATIO_DEFAULT  = 2;
-
+  private static int getSplitRatio(Configuration conf) {
+    int splitRatio = conf.getInt(
+        DistCpConstants.CONF_LABEL_SPLIT_RATIO,
+        DistCpConstants.SPLIT_RATIO_DEFAULT);
+    if (splitRatio <= 0) {
+      LOG.warn(DistCpConstants.CONF_LABEL_SPLIT_RATIO +
+          " should be positive. Fall back to default value: "
+          + DistCpConstants.SPLIT_RATIO_DEFAULT);
+      splitRatio = DistCpConstants.SPLIT_RATIO_DEFAULT;
+    }
+    return splitRatio;
+  }
+  
   /**
    * Package private, for testability.
    * @param nMaps The number of maps requested for.
@@ -253,19 +302,34 @@ public class DynamicInputFormat<K, V> ex
    * @return The number of splits each map should handle, ideally.
    */
   static int getSplitRatio(int nMaps, int nRecords) {
+    return getSplitRatio(nMaps, nRecords,new Configuration());
+  }
+  
+  /**
+   * Package private, for testability.
+   * @param nMaps The number of maps requested for.
+   * @param nRecords The number of records to be copied.
+   * @param conf The configuration set by users.
+   * @return The number of splits each map should handle, ideally.
+   */
+  static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
+    int maxChunksIdeal = getMaxChunksIdeal(conf);
+    int minRecordsPerChunk = getMinRecordsPerChunk(conf);
+    int splitRatio = getSplitRatio(conf);
+    
     if (nMaps == 1) {
       LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
       return 1;
     }
 
-    if (nMaps > MAX_CHUNKS_IDEAL)
-      return SPLIT_RATIO_DEFAULT;
+    if (nMaps > maxChunksIdeal)
+      return splitRatio;
 
-    int nPickups = (int)Math.ceil((float)MAX_CHUNKS_IDEAL/nMaps);
+    int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
     int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
 
-    return nRecordsPerChunk < MIN_RECORDS_PER_CHUNK ?
-              SPLIT_RATIO_DEFAULT : nPickups;
+    return nRecordsPerChunk < minRecordsPerChunk ?
+              splitRatio : nPickups;
   }
 
   static int getNumEntriesPerChunk(Configuration configuration) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java Tue Aug 19 23:49:39 2014
@@ -25,16 +25,26 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclUtil;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
+import org.apache.hadoop.tools.CopyListing.AclsNotSupportedException;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.mapreduce.InputFormat;
 
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.text.DecimalFormat;
 import java.net.URI;
 import java.net.InetAddress;
@@ -181,7 +191,7 @@ public class DistCpUtils {
    *                       change or any transient error)
    */
   public static void preserve(FileSystem targetFS, Path path,
-                              FileStatus srcFileStatus,
+                              CopyListingFileStatus srcFileStatus,
                               EnumSet<FileAttribute> attributes) throws IOException {
 
     FileStatus targetFileStatus = targetFS.getFileStatus(path);
@@ -189,10 +199,33 @@ public class DistCpUtils {
     String user = targetFileStatus.getOwner();
     boolean chown = false;
 
-    if (attributes.contains(FileAttribute.PERMISSION) &&
+    if (attributes.contains(FileAttribute.ACL)) {
+      List<AclEntry> srcAcl = srcFileStatus.getAclEntries();
+      List<AclEntry> targetAcl = getAcl(targetFS, targetFileStatus);
+      if (!srcAcl.equals(targetAcl)) {
+        targetFS.setAcl(path, srcAcl);
+      }
+      // setAcl can't preserve sticky bit, so also call setPermission if needed.
+      if (srcFileStatus.getPermission().getStickyBit() !=
+          targetFileStatus.getPermission().getStickyBit()) {
+        targetFS.setPermission(path, srcFileStatus.getPermission());
+      }
+    } else if (attributes.contains(FileAttribute.PERMISSION) &&
       !srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) {
       targetFS.setPermission(path, srcFileStatus.getPermission());
     }
+    
+    if (attributes.contains(FileAttribute.XATTR)) {
+      Map<String, byte[]> srcXAttrs = srcFileStatus.getXAttrs();
+      Map<String, byte[]> targetXAttrs = getXAttrs(targetFS, path);
+      if (!srcXAttrs.equals(targetXAttrs)) {
+        Iterator<Entry<String, byte[]>> iter = srcXAttrs.entrySet().iterator();
+        while (iter.hasNext()) {
+          Entry<String, byte[]> entry = iter.next();
+          targetFS.setXAttr(path, entry.getKey(), entry.getValue());
+        }
+      }
+    }
 
     if (attributes.contains(FileAttribute.REPLICATION) && ! targetFileStatus.isDirectory() &&
         srcFileStatus.getReplication() != targetFileStatus.getReplication()) {
@@ -217,6 +250,65 @@ public class DistCpUtils {
   }
 
   /**
+   * Returns a file's full logical ACL.
+   *
+   * @param fileSystem FileSystem containing the file
+   * @param fileStatus FileStatus of file
+   * @return List<AclEntry> containing full logical ACL
+   * @throws IOException if there is an I/O error
+   */
+  public static List<AclEntry> getAcl(FileSystem fileSystem,
+      FileStatus fileStatus) throws IOException {
+    List<AclEntry> entries = fileSystem.getAclStatus(fileStatus.getPath())
+      .getEntries();
+    return AclUtil.getAclFromPermAndEntries(fileStatus.getPermission(), entries);
+  }
+  
+  /**
+   * Returns a file's all xAttrs.
+   * 
+   * @param fileSystem FileSystem containing the file
+   * @param path file path
+   * @return Map<String, byte[]> containing all xAttrs
+   * @throws IOException if there is an I/O error
+   */
+  public static Map<String, byte[]> getXAttrs(FileSystem fileSystem,
+      Path path) throws IOException {
+    return fileSystem.getXAttrs(path);
+  }
+
+  /**
+   * Converts a FileStatus to a CopyListingFileStatus.  If preserving ACLs,
+   * populates the CopyListingFileStatus with the ACLs. If preserving XAttrs,
+   * populates the CopyListingFileStatus with the XAttrs.
+   *
+   * @param fileSystem FileSystem containing the file
+   * @param fileStatus FileStatus of file
+   * @param preserveAcls boolean true if preserving ACLs
+   * @param preserveXAttrs boolean true if preserving XAttrs
+   * @throws IOException if there is an I/O error
+   */
+  public static CopyListingFileStatus toCopyListingFileStatus(
+      FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, 
+      boolean preserveXAttrs) throws IOException {
+    CopyListingFileStatus copyListingFileStatus =
+      new CopyListingFileStatus(fileStatus);
+    if (preserveAcls) {
+      FsPermission perm = fileStatus.getPermission();
+      if (perm.getAclBit()) {
+        List<AclEntry> aclEntries = fileSystem.getAclStatus(
+          fileStatus.getPath()).getEntries();
+        copyListingFileStatus.setAclEntries(aclEntries);
+      }
+    }
+    if (preserveXAttrs) {
+      Map<String, byte[]> xAttrs = fileSystem.getXAttrs(fileStatus.getPath());
+      copyListingFileStatus.setXAttrs(xAttrs);
+    }
+    return copyListingFileStatus;
+  }
+
+  /**
    * Sort sequence file containing FileStatus and Text as key and value respecitvely
    *
    * @param fs - File System
@@ -227,7 +319,8 @@ public class DistCpUtils {
    */
   public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing)
       throws IOException {
-    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, FileStatus.class, conf);
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class,
+      CopyListingFileStatus.class, conf);
     Path output = new Path(sourceListing.toString() +  "_sorted");
 
     if (fs.exists(output)) {
@@ -239,6 +332,44 @@ public class DistCpUtils {
   }
 
   /**
+   * Determines if a file system supports ACLs by running a canary getAclStatus
+   * request on the file system root.  This method is used before distcp job
+   * submission to fail fast if the user requested preserving ACLs, but the file
+   * system cannot support ACLs.
+   *
+   * @param fs FileSystem to check
+   * @throws AclsNotSupportedException if fs does not support ACLs
+   */
+  public static void checkFileSystemAclSupport(FileSystem fs)
+      throws AclsNotSupportedException {
+    try {
+      fs.getAclStatus(new Path(Path.SEPARATOR));
+    } catch (Exception e) {
+      throw new AclsNotSupportedException("ACLs not supported for file system: "
+        + fs.getUri());
+    }
+  }
+  
+  /**
+   * Determines if a file system supports XAttrs by running a getXAttrs request
+   * on the file system root. This method is used before distcp job submission
+   * to fail fast if the user requested preserving XAttrs, but the file system
+   * cannot support XAttrs.
+   * 
+   * @param fs FileSystem to check
+   * @throws XAttrsNotSupportedException if fs does not support XAttrs
+   */
+  public static void checkFileSystemXAttrSupport(FileSystem fs)
+      throws XAttrsNotSupportedException {
+    try {
+      fs.getXAttrs(new Path(Path.SEPARATOR));
+    } catch (Exception e) {
+      throw new XAttrsNotSupportedException("XAttrs not supported for file system: "
+        + fs.getUri());
+    }
+  }
+
+  /**
    * String utility to convert a number-of-bytes to human readable format.
    */
   private static ThreadLocal<DecimalFormat> FORMATTER

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java Tue Aug 19 23:49:39 2014
@@ -21,6 +21,11 @@ package org.apache.hadoop.tools.util;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+
+import com.google.common.base.Preconditions;
+
 /**
  * The ThrottleInputStream provides bandwidth throttling on a specified
  * InputStream. It is implemented as a wrapper on top of another InputStream
@@ -90,6 +95,25 @@ public class ThrottledInputStream extend
     return readLen;
   }
 
+  /**
+   * Read bytes starting from the specified position. This requires rawStream is
+   * an instance of {@link PositionedReadable}.
+   */
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    if (!(rawStream instanceof PositionedReadable)) {
+      throw new UnsupportedOperationException(
+          "positioned read is not supported by the internal stream");
+    }
+    throttle();
+    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
+        offset, length);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
   private void throttle() throws IOException {
     if (getBytesPerSec() > maxBytesPerSec) {
       try {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml Tue Aug 19 23:49:39 2014
@@ -1,8 +1,6 @@
 <?xml version="1.0"?>
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <!--
-  Copyright 2002-2004 The Apache Software Foundation
-
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.task.
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.List;
@@ -33,18 +32,19 @@ import java.io.IOException;
 public class StubContext {
 
   private StubStatusReporter reporter = new StubStatusReporter();
-  private RecordReader<Text, FileStatus> reader;
+  private RecordReader<Text, CopyListingFileStatus> reader;
   private StubInMemoryWriter writer = new StubInMemoryWriter();
-  private Mapper<Text, FileStatus, Text, Text>.Context mapperContext;
+  private Mapper<Text, CopyListingFileStatus, Text, Text>.Context mapperContext;
 
-  public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
-                     int taskId) throws IOException, InterruptedException {
+  public StubContext(Configuration conf,
+      RecordReader<Text, CopyListingFileStatus> reader, int taskId)
+      throws IOException, InterruptedException {
 
-    WrappedMapper<Text, FileStatus, Text, Text> wrappedMapper
-            = new WrappedMapper<Text, FileStatus, Text, Text>();
+    WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper
+            = new WrappedMapper<Text, CopyListingFileStatus, Text, Text>();
 
-    MapContextImpl<Text, FileStatus, Text, Text> contextImpl
-            = new MapContextImpl<Text, FileStatus, Text, Text>(conf,
+    MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl
+            = new MapContextImpl<Text, CopyListingFileStatus, Text, Text>(conf,
             getTaskAttemptID(taskId), reader, writer,
             null, reporter, null);
 
@@ -52,7 +52,7 @@ public class StubContext {
     this.mapperContext = wrappedMapper.getMapContext(contextImpl);
   }
 
-  public Mapper<Text, FileStatus, Text, Text>.Context getContext() {
+  public Mapper<Text, CopyListingFileStatus, Text, Text>.Context getContext() {
     return mapperContext;
   }
 
@@ -60,7 +60,7 @@ public class StubContext {
     return reporter;
   }
 
-  public RecordReader<Text, FileStatus> getReader() {
+  public RecordReader<Text, CopyListingFileStatus> getReader() {
     return reader;
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.tools.util.TestDistCpUtils;
@@ -106,7 +105,7 @@ public class TestCopyListing extends Sim
     Assert.assertEquals(listing.getNumberOfPaths(), 3);
     SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
         SequenceFile.Reader.file(listingFile));
-    FileStatus fileStatus = new FileStatus();
+    CopyListingFileStatus fileStatus = new CopyListingFileStatus();
     Text relativePath = new Text();
     Assert.assertTrue(reader.next(relativePath, fileStatus));
     Assert.assertEquals(relativePath.toString(), "/1");
@@ -274,7 +273,7 @@ public class TestCopyListing extends Sim
 
       reader = new SequenceFile.Reader(getConf(), SequenceFile.Reader.file(listFile));
 
-      FileStatus fileStatus = new FileStatus();
+      CopyListingFileStatus fileStatus = new CopyListingFileStatus();
       Text relativePath = new Text();
       Assert.assertTrue(reader.next(relativePath, fileStatus));
       Assert.assertTrue(relativePath.toString().equals(""));

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -531,7 +530,7 @@ public class TestFileBasedCopyListing {
                                             SequenceFile.Reader.file(listFile));
     try {
       Text relPath = new Text();
-      FileStatus fileStatus = new FileStatus();
+      CopyListingFileStatus fileStatus = new CopyListingFileStatus();
       while (reader.next(relPath, fileStatus)) {
         if (fileStatus.isDirectory() && relPath.toString().equals("")) {
           // ignore root with empty relPath, which is an entry to be 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java Tue Aug 19 23:49:39 2014
@@ -19,7 +19,6 @@
 package org.apache.hadoop.tools;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -121,7 +120,7 @@ public class TestGlobbedCopyListing {
     SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
                                               listingPath, new Configuration());
     Text key   = new Text();
-    FileStatus value = new FileStatus();
+    CopyListingFileStatus value = new CopyListingFileStatus();
     Map<String, String> actualValues = new HashMap<String, String>();
     while (reader.next(key, value)) {
       if (value.isDirectory() && key.toString().equals("")) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java Tue Aug 19 23:49:39 2014
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.tools;
 
+import static org.junit.Assert.fail;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tools.DistCpOptions.*;
 import org.apache.hadoop.conf.Configuration;
 
@@ -410,6 +413,8 @@ public class TestOptionsParser {
     Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
     Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
     Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
 
     options = OptionsParser.parse(new String[] {
         "-p",
@@ -421,6 +426,8 @@ public class TestOptionsParser {
     Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
     Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
     Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
 
     options = OptionsParser.parse(new String[] {
         "-pbr",
@@ -433,6 +440,8 @@ public class TestOptionsParser {
     Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
     Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
     Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
 
     options = OptionsParser.parse(new String[] {
         "-pbrgup",
@@ -445,9 +454,11 @@ public class TestOptionsParser {
     Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
     Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
     Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
 
     options = OptionsParser.parse(new String[] {
-        "-pbrgupc",
+        "-pbrgupcax",
         "-f",
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/target/"});
@@ -457,6 +468,8 @@ public class TestOptionsParser {
     Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
     Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
     Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.ACL));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.XATTR));
 
     options = OptionsParser.parse(new String[] {
         "-pc",
@@ -469,6 +482,8 @@ public class TestOptionsParser {
     Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
     Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
     Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
 
     options = OptionsParser.parse(new String[] {
         "-p",
@@ -485,7 +500,7 @@ public class TestOptionsParser {
 
     try {
       OptionsParser.parse(new String[] {
-          "-pabc",
+          "-pabcd",
           "-f",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/target"});
@@ -548,4 +563,45 @@ public class TestOptionsParser {
     Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
     Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
   }
+
+  @Test
+  public void testAppendOption() {
+    Configuration conf = new Configuration();
+    Assert.assertFalse(conf.getBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+    Assert.assertFalse(conf.getBoolean(
+        DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+    DistCpOptions options = OptionsParser.parse(new String[] { "-update",
+        "-append", "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/" });
+    options.appendToConf(conf);
+    Assert.assertTrue(conf.getBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+    Assert.assertTrue(conf.getBoolean(
+        DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+    // make sure -append is only valid when -update is specified
+    try {
+      options = OptionsParser.parse(new String[] { "-append",
+              "hdfs://localhost:8020/source/first",
+              "hdfs://localhost:8020/target/" });
+      fail("Append should fail if update option is not specified");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Append is valid only with update options", e);
+    }
+
+    // make sure -append is invalid when skipCrc is specified
+    try {
+      options = OptionsParser.parse(new String[] {
+          "-append", "-update", "-skipcrccheck",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/" });
+      fail("Append should fail if skipCrc option is specified");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Append is disallowed when skipping CRC", e);
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Tue Aug 19 23:49:39 2014
@@ -25,11 +25,13 @@ import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
@@ -42,6 +44,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpOptionSwitch;
 import org.apache.hadoop.tools.DistCpOptions;
@@ -117,6 +120,16 @@ public class TestCopyMapper {
     touchFile(SOURCE_PATH + "/7/8/9");
   }
 
+  private static void appendSourceData() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    for (Path source : pathList) {
+      if (fs.getFileStatus(source).isFile()) {
+        // append 2048 bytes per file
+        appendFile(source, DEFAULT_FILE_SIZE * 2);
+      }
+    }
+  }
+
   private static void createSourceDataWithDifferentBlockSize() throws Exception {
     mkdirs(SOURCE_PATH + "/1");
     mkdirs(SOURCE_PATH + "/2");
@@ -200,95 +213,141 @@ public class TestCopyMapper {
     }
   }
 
+  /**
+   * Append specified length of bytes to a given file
+   */
+  private static void appendFile(Path p, int length) throws IOException {
+    byte[] toAppend = new byte[length];
+    Random random = new Random();
+    random.nextBytes(toAppend);
+    FSDataOutputStream out = cluster.getFileSystem().append(p);
+    try {
+      out.write(toAppend);
+    } finally {
+      IOUtils.closeStream(out);
+    }
+  }
+
   @Test
   public void testCopyWithDifferentChecksumType() throws Exception {
     testCopy(true);
   }
 
   @Test(timeout=40000)
-  public void testRun() {
+  public void testRun() throws Exception {
     testCopy(false);
   }
 
-  private void testCopy(boolean preserveChecksum) {
-    try {
-      deleteState();
-      if (preserveChecksum) {
-        createSourceDataWithDifferentChecksumType();
-      } else {
-        createSourceData();
-      }
+  @Test
+  public void testCopyWithAppend() throws Exception {
+    final FileSystem fs = cluster.getFileSystem();
+    // do the first distcp
+    testCopy(false);
+    // start appending data to source
+    appendSourceData();
 
-      FileSystem fs = cluster.getFileSystem();
-      CopyMapper copyMapper = new CopyMapper();
-      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, FileStatus, Text, Text>.Context context
-              = stubContext.getContext();
+    // do the distcp again with -update and -append option
+    CopyMapper copyMapper = new CopyMapper();
+    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+        stubContext.getContext();
+    // Enable append 
+    context.getConfiguration().setBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), true);
+    copyMapper.setup(context);
+    for (Path path: pathList) {
+      copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+              new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
+                  path)), context);
+    }
+
+    verifyCopy(fs, false);
+    // verify that we only copied new appended data
+    Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
+        .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+        .getValue());
+    Assert.assertEquals(pathList.size(), stubContext.getReporter().
+        getCounter(CopyMapper.Counter.COPY).getValue());
+  }
+
+  private void testCopy(boolean preserveChecksum) throws Exception {
+    deleteState();
+    if (preserveChecksum) {
+      createSourceDataWithDifferentChecksumType();
+    } else {
+      createSourceData();
+    }
 
-      Configuration configuration = context.getConfiguration();
-      EnumSet<DistCpOptions.FileAttribute> fileAttributes
-              = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
-      if (preserveChecksum) {
-        fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
-      }
-      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
-              DistCpUtils.packAttributes(fileAttributes));
+    FileSystem fs = cluster.getFileSystem();
+    CopyMapper copyMapper = new CopyMapper();
+    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
+            = stubContext.getContext();
+
+    Configuration configuration = context.getConfiguration();
+    EnumSet<DistCpOptions.FileAttribute> fileAttributes
+            = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
+    if (preserveChecksum) {
+      fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
+    }
+    configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+            DistCpUtils.packAttributes(fileAttributes));
 
-      copyMapper.setup(context);
+    copyMapper.setup(context);
 
-      for (Path path: pathList) {
-        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
-                fs.getFileStatus(path), context);
-      }
+    for (Path path: pathList) {
+      copyMapper.map(
+          new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+          new CopyListingFileStatus(fs.getFileStatus(path)), context);
+    }
 
-      // Check that the maps worked.
-      for (Path path : pathList) {
-        final Path targetPath = new Path(path.toString()
-                .replaceAll(SOURCE_PATH, TARGET_PATH));
-        Assert.assertTrue(fs.exists(targetPath));
-        Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
-        FileStatus sourceStatus = fs.getFileStatus(path);
-        FileStatus targetStatus = fs.getFileStatus(targetPath);
-        Assert.assertEquals(sourceStatus.getReplication(),
-            targetStatus.getReplication());
-        if (preserveChecksum) {
-          Assert.assertEquals(sourceStatus.getBlockSize(),
-              targetStatus.getBlockSize());
-        }
-        Assert.assertTrue(!fs.isFile(targetPath)
-            || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
-      }
-
-      Assert.assertEquals(pathList.size(),
-              stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
-      if (!preserveChecksum) {
-        Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
-            .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
-            .getValue());
-      } else {
-        Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
-            .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
-            .getValue());
-      }
-
-      testCopyingExistingFiles(fs, copyMapper, context);
-      for (Text value : stubContext.getWriter().values()) {
-        Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
-      }
+    // Check that the maps worked.
+    verifyCopy(fs, preserveChecksum);
+    Assert.assertEquals(pathList.size(), stubContext.getReporter()
+        .getCounter(CopyMapper.Counter.COPY).getValue());
+    if (!preserveChecksum) {
+      Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
+          .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+          .getValue());
+    } else {
+      Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
+          .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+          .getValue());
     }
-    catch (Exception e) {
-      LOG.error("Unexpected exception: ", e);
-      Assert.assertTrue(false);
+
+    testCopyingExistingFiles(fs, copyMapper, context);
+    for (Text value : stubContext.getWriter().values()) {
+      Assert.assertTrue(value.toString() + " is not skipped", value
+          .toString().startsWith("SKIP:"));
     }
   }
 
-  private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
-                                        Mapper<Text, FileStatus, Text, Text>.Context context) {
+  private void verifyCopy(FileSystem fs, boolean preserveChecksum)
+      throws Exception {
+    for (Path path : pathList) {
+      final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH,
+          TARGET_PATH));
+      Assert.assertTrue(fs.exists(targetPath));
+      Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+      FileStatus sourceStatus = fs.getFileStatus(path);
+      FileStatus targetStatus = fs.getFileStatus(targetPath);
+      Assert.assertEquals(sourceStatus.getReplication(),
+          targetStatus.getReplication());
+      if (preserveChecksum) {
+        Assert.assertEquals(sourceStatus.getBlockSize(),
+            targetStatus.getBlockSize());
+      }
+      Assert.assertTrue(!fs.isFile(targetPath)
+          || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
+    }
+  }
 
+  private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context) {
     try {
       for (Path path : pathList) {
         copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
-                fs.getFileStatus(path), context);
+                new CopyListingFileStatus(fs.getFileStatus(path)), context);
       }
 
       Assert.assertEquals(nFiles,
@@ -309,7 +368,7 @@ public class TestCopyMapper {
       FileSystem fs = cluster.getFileSystem();
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, FileStatus, Text, Text>.Context context
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
               = stubContext.getContext();
 
       Configuration configuration = context.getConfiguration();
@@ -320,7 +379,7 @@ public class TestCopyMapper {
       copyMapper.setup(context);
 
       copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
-              fs.getFileStatus(pathList.get(0)), context);
+              new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context);
 
       Assert.assertTrue("There should have been an exception.", false);
     }
@@ -343,7 +402,7 @@ public class TestCopyMapper {
       FileSystem fs = cluster.getFileSystem();
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, FileStatus, Text, Text>.Context context
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
               = stubContext.getContext();
 
       mkdirs(SOURCE_PATH + "/src/file");
@@ -351,7 +410,8 @@ public class TestCopyMapper {
       try {
         copyMapper.setup(context);
         copyMapper.map(new Text("/src/file"),
-            fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+            new CopyListingFileStatus(fs.getFileStatus(
+              new Path(SOURCE_PATH + "/src/file"))),
             context);
       } catch (IOException e) {
         Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
@@ -372,22 +432,25 @@ public class TestCopyMapper {
 
       final CopyMapper copyMapper = new CopyMapper();
 
-      final Mapper<Text, FileStatus, Text, Text>.Context context =  tmpUser.
-          doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
-        @Override
-        public Mapper<Text, FileStatus, Text, Text>.Context run() {
-          try {
-            StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-            return stubContext.getContext();
-          } catch (Exception e) {
-            LOG.error("Exception encountered ", e);
-            throw new RuntimeException(e);
-          }
-        }
-      });
+      final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+        tmpUser.doAs(
+          new PrivilegedAction<Mapper<Text, CopyListingFileStatus, Text, Text>.Context>() {
+            @Override
+            public Mapper<Text, CopyListingFileStatus, Text, Text>.Context run() {
+              try {
+                StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+                return stubContext.getContext();
+              } catch (Exception e) {
+                LOG.error("Exception encountered ", e);
+                throw new RuntimeException(e);
+              }
+            }
+          });
 
       EnumSet<DistCpOptions.FileAttribute> preserveStatus =
           EnumSet.allOf(DistCpOptions.FileAttribute.class);
+      preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
+      preserveStatus.remove(DistCpOptions.FileAttribute.XATTR);
 
       context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
         DistCpUtils.packAttributes(preserveStatus));
@@ -415,7 +478,8 @@ public class TestCopyMapper {
           try {
             copyMapper.setup(context);
             copyMapper.map(new Text("/src/file"),
-                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                new CopyListingFileStatus(tmpFS.getFileStatus(
+                  new Path(SOURCE_PATH + "/src/file"))),
                 context);
             Assert.fail("Expected copy to fail");
           } catch (AccessControlException e) {
@@ -442,19 +506,20 @@ public class TestCopyMapper {
 
       final CopyMapper copyMapper = new CopyMapper();
 
-      final Mapper<Text, FileStatus, Text, Text>.Context context =  tmpUser.
-          doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
-        @Override
-        public Mapper<Text, FileStatus, Text, Text>.Context run() {
-          try {
-            StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-            return stubContext.getContext();
-          } catch (Exception e) {
-            LOG.error("Exception encountered ", e);
-            throw new RuntimeException(e);
-          }
-        }
-      });
+      final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+        tmpUser.doAs(
+          new PrivilegedAction<Mapper<Text, CopyListingFileStatus, Text, Text>.Context>() {
+            @Override
+            public Mapper<Text, CopyListingFileStatus, Text, Text>.Context run() {
+              try {
+                StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+                return stubContext.getContext();
+              } catch (Exception e) {
+                LOG.error("Exception encountered ", e);
+                throw new RuntimeException(e);
+              }
+            }
+          });
 
       touchFile(SOURCE_PATH + "/src/file");
       mkdirs(TARGET_PATH);
@@ -481,7 +546,8 @@ public class TestCopyMapper {
           try {
             copyMapper.setup(context);
             copyMapper.map(new Text("/src/file"),
-                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                new CopyListingFileStatus(tmpFS.getFileStatus(
+                  new Path(SOURCE_PATH + "/src/file"))),
                 context);
           } catch (Exception e) {
             throw new RuntimeException(e);
@@ -518,9 +584,12 @@ public class TestCopyMapper {
         }
       });
 
-      final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
+      final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+        stubContext.getContext();
       EnumSet<DistCpOptions.FileAttribute> preserveStatus =
           EnumSet.allOf(DistCpOptions.FileAttribute.class);
+      preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
+      preserveStatus.remove(DistCpOptions.FileAttribute.XATTR);
 
       context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
         DistCpUtils.packAttributes(preserveStatus));
@@ -551,7 +620,8 @@ public class TestCopyMapper {
           try {
             copyMapper.setup(context);
             copyMapper.map(new Text("/src/file"),
-                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                new CopyListingFileStatus(tmpFS.getFileStatus(
+                  new Path(SOURCE_PATH + "/src/file"))),
                 context);
             Assert.assertEquals(stubContext.getWriter().values().size(), 1);
             Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
@@ -594,8 +664,10 @@ public class TestCopyMapper {
 
       EnumSet<DistCpOptions.FileAttribute> preserveStatus =
           EnumSet.allOf(DistCpOptions.FileAttribute.class);
+      preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
+      preserveStatus.remove(DistCpOptions.FileAttribute.XATTR);
 
-      final Mapper<Text, FileStatus, Text, Text>.Context context
+      final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
               = stubContext.getContext();
 
       context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
@@ -629,7 +701,8 @@ public class TestCopyMapper {
           try {
             copyMapper.setup(context);
             copyMapper.map(new Text("/src/file"),
-                tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+                new CopyListingFileStatus(tmpFS.getFileStatus(
+                  new Path(SOURCE_PATH + "/src/file"))),
                 context);
             Assert.fail("Didn't expect the file to be copied");
           } catch (AccessControlException ignore) {
@@ -661,7 +734,7 @@ public class TestCopyMapper {
       FileSystem fs = cluster.getFileSystem();
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, FileStatus, Text, Text>.Context context
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
               = stubContext.getContext();
 
       touchFile(SOURCE_PATH + "/src/file");
@@ -669,7 +742,8 @@ public class TestCopyMapper {
       try {
         copyMapper.setup(context);
         copyMapper.map(new Text("/src/file"),
-            fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
+            new CopyListingFileStatus(fs.getFileStatus(
+              new Path(SOURCE_PATH + "/src/file"))),
             context);
       } catch (IOException e) {
         Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
@@ -688,7 +762,7 @@ public class TestCopyMapper {
       FileSystem fs = cluster.getFileSystem();
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, FileStatus, Text, Text>.Context context
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
               = stubContext.getContext();
 
       Configuration configuration = context.getConfiguration();
@@ -705,7 +779,7 @@ public class TestCopyMapper {
         if (!fileStatus.isDirectory()) {
           fs.delete(path, true);
           copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
-                  fileStatus, context);
+                  new CopyListingFileStatus(fileStatus), context);
         }
       }
       if (ignoreFailures) {
@@ -745,7 +819,7 @@ public class TestCopyMapper {
       FileSystem fs = cluster.getFileSystem();
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, FileStatus, Text, Text>.Context context
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
           = stubContext.getContext();
 
       Configuration configuration = context.getConfiguration();
@@ -759,7 +833,7 @@ public class TestCopyMapper {
       for (Path path : pathList) {
         final FileStatus fileStatus = fs.getFileStatus(path);
         copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
-            fileStatus, context);
+            new CopyListingFileStatus(fileStatus), context);
       }
 
       Assert.fail("Copy should have failed because of block-size difference.");
@@ -780,7 +854,7 @@ public class TestCopyMapper {
       FileSystem fs = cluster.getFileSystem();
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, FileStatus, Text, Text>.Context context
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
               = stubContext.getContext();
 
       Configuration configuration = context.getConfiguration();
@@ -798,7 +872,7 @@ public class TestCopyMapper {
       for (Path path : pathList) {
         final FileStatus fileStatus = fs.getFileStatus(path);
         copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
-                fileStatus, context);
+                new CopyListingFileStatus(fileStatus), context);
       }
 
       // Check that the block-size/replication aren't preserved.
@@ -855,7 +929,7 @@ public class TestCopyMapper {
       FileSystem fs = cluster.getFileSystem();
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, FileStatus, Text, Text>.Context context
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
               = stubContext.getContext();
 
       context.getConfiguration().set(
@@ -863,7 +937,8 @@ public class TestCopyMapper {
               targetFilePath.getParent().toString()); // Parent directory.
       copyMapper.setup(context);
 
-      final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
+      final CopyListingFileStatus sourceFileStatus = new CopyListingFileStatus(
+        fs.getFileStatus(sourceFilePath));
 
       long before = fs.getFileStatus(targetFilePath).getModificationTime();
       copyMapper.map(new Text(DistCpUtils.getRelativePath(
@@ -907,7 +982,7 @@ public class TestCopyMapper {
       FileSystem fs = cluster.getFileSystem();
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, FileStatus, Text, Text>.Context context
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
               = stubContext.getContext();
 
       Configuration configuration = context.getConfiguration();
@@ -926,7 +1001,7 @@ public class TestCopyMapper {
       for (Path path : pathList) {
         final FileStatus fileStatus = fs.getFileStatus(path);
         copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
-                fileStatus, context);
+                new CopyListingFileStatus(fileStatus), context);
       }
 
       // Check that the user/group attributes are preserved

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
 import org.junit.Test;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -48,8 +49,8 @@ public class TestRetriableFileCopyComman
     
     Exception actualEx = null;
     try {
-      new RetriableFileCopyCommand("testFailOnCloseError")
-        .copyBytes(stat, out, 512, context);
+      new RetriableFileCopyCommand("testFailOnCloseError", FileAction.OVERWRITE)
+        .copyBytes(stat, 0, out, 512, context);
     } catch (Exception e) {
       actualEx = e;
     }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.tools.StubContext;
 import org.apache.hadoop.security.Credentials;
@@ -122,8 +123,8 @@ public class TestUniformSizeInputFormat 
     for (int i=0; i<splits.size(); ++i) {
       InputSplit split = splits.get(i);
       int currentSplitSize = 0;
-      RecordReader<Text, FileStatus> recordReader = uniformSizeInputFormat.createRecordReader(
-              split, null);
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+        uniformSizeInputFormat.createRecordReader(split, null);
       StubContext stubContext = new StubContext(jobContext.getConfiguration(),
                                                 recordReader, 0);
       final TaskAttemptContext taskAttemptContext
@@ -168,7 +169,7 @@ public class TestUniformSizeInputFormat 
 
     try {
       reader.seek(lastEnd);
-      FileStatus srcFileStatus = new FileStatus();
+      CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
       Text srcRelPath = new Text();
       Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
     } finally {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java Tue Aug 19 23:49:39 2014
@@ -18,19 +18,20 @@
 
 package org.apache.hadoop.tools.mapred.lib;
 
-import junit.framework.Assert;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.tools.StubContext;
 import org.apache.hadoop.security.Credentials;
@@ -117,15 +118,15 @@ public class TestDynamicInputFormat {
                     +"/tmp/testDynInputFormat/fileList.seq"), options);
 
     JobContext jobContext = new JobContextImpl(configuration, new JobID());
-    DynamicInputFormat<Text, FileStatus> inputFormat =
-        new DynamicInputFormat<Text, FileStatus>();
+    DynamicInputFormat<Text, CopyListingFileStatus> inputFormat =
+        new DynamicInputFormat<Text, CopyListingFileStatus>();
     List<InputSplit> splits = inputFormat.getSplits(jobContext);
 
     int nFiles = 0;
     int taskId = 0;
 
     for (InputSplit split : splits) {
-      RecordReader<Text, FileStatus> recordReader =
+      RecordReader<Text, CopyListingFileStatus> recordReader =
            inputFormat.createRecordReader(split, null);
       StubContext stubContext = new StubContext(jobContext.getConfiguration(),
                                                 recordReader, taskId);
@@ -135,7 +136,7 @@ public class TestDynamicInputFormat {
       recordReader.initialize(splits.get(0), taskAttemptContext);
       float previousProgressValue = 0f;
       while (recordReader.nextKeyValue()) {
-        FileStatus fileStatus = recordReader.getCurrentValue();
+        CopyListingFileStatus fileStatus = recordReader.getCurrentValue();
         String source = fileStatus.getPath().toString();
         System.out.println(source);
         Assert.assertTrue(expectedFilePaths.contains(source));
@@ -160,5 +161,25 @@ public class TestDynamicInputFormat {
     Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
     Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
     Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
+
+    // Tests with negative value configuration
+    Configuration conf = new Configuration();
+    conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, -1);
+    conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, -1);
+    conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, -1);
+    conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, -1);
+    Assert.assertEquals(1,
+        DynamicInputFormat.getSplitRatio(1, 1000000000, conf));
+    Assert.assertEquals(2,
+        DynamicInputFormat.getSplitRatio(11000000, 10, conf));
+    Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700, conf));
+    Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200, conf));
+
+    // Tests with valid configuration
+    conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, 100);
+    conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, 30);
+    conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, 10);
+    conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53);
+    Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf));
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.Assert;
@@ -106,7 +107,8 @@ public class TestDistCpUtils {
       Path src = new Path("/tmp/src");
       fs.mkdirs(path);
       fs.mkdirs(src);
-      FileStatus srcStatus = fs.getFileStatus(src);
+      CopyListingFileStatus srcStatus = new CopyListingFileStatus(
+        fs.getFileStatus(src));
 
       FsPermission noPerm = new FsPermission((short) 0);
       fs.setPermission(path, noPerm);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml Tue Aug 19 23:49:39 2014
@@ -1,8 +1,6 @@
 <?xml version="1.0"?>
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <!--
-  Copyright 2002-2004 The Apache Software Foundation
-
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java Tue Aug 19 23:49:39 2014
@@ -376,7 +376,7 @@ public class TestGridMixClasses {
   /*
    * test LoadSortComparator
    */
-  @Test (timeout=1000)
+  @Test (timeout=3000)
   public void testLoadJobLoadSortComparator() throws Exception {
     LoadJob.LoadSortComparator test = new LoadJob.LoadSortComparator();
 
@@ -409,7 +409,7 @@ public class TestGridMixClasses {
   /*
    * test SpecGroupingComparator
    */
-  @Test (timeout=1000)
+  @Test (timeout=3000)
   public void testGridmixJobSpecGroupingComparator() throws Exception {
     GridmixJob.SpecGroupingComparator test = new GridmixJob.SpecGroupingComparator();
 
@@ -452,7 +452,7 @@ public class TestGridMixClasses {
   /*
    * test CompareGridmixJob only equals and compare
    */
-  @Test (timeout=10000)
+  @Test (timeout=30000)
   public void testCompareGridmixJob() throws Exception {
     Configuration conf = new Configuration();
     Path outRoot = new Path("target");
@@ -478,7 +478,7 @@ public class TestGridMixClasses {
   /*
    * test ReadRecordFactory. should read all data from inputstream
    */
-  @Test (timeout=1000)
+  @Test (timeout=3000)
   public void testReadRecordFactory() throws Exception {
 
     // RecordFactory factory, InputStream src, Configuration conf
@@ -589,7 +589,7 @@ public class TestGridMixClasses {
   /*
    * test LoadRecordReader. It class reads data from some files.
    */
-  @Test (timeout=1000)
+  @Test (timeout=3000)
   public void testLoadJobLoadRecordReader() throws Exception {
     LoadJob.LoadRecordReader test = new LoadJob.LoadRecordReader();
     Configuration conf = new Configuration();
@@ -652,7 +652,7 @@ public class TestGridMixClasses {
    * test LoadReducer
    */
 
-  @Test (timeout=1000)
+  @Test (timeout=3000)
   public void testLoadJobLoadReducer() throws Exception {
     LoadJob.LoadReducer test = new LoadJob.LoadReducer();
 
@@ -772,7 +772,7 @@ public class TestGridMixClasses {
   /*
    * test SerialJobFactory
    */
-  @Test (timeout=40000)
+  @Test (timeout=120000)
   public void testSerialReaderThread() throws Exception {
 
     Configuration conf = new Configuration();
@@ -833,7 +833,7 @@ public class TestGridMixClasses {
    * test SleepMapper
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
-  @Test (timeout=10000)
+  @Test (timeout=30000)
   public void testSleepMapper() throws Exception {
     SleepJob.SleepMapper test = new SleepJob.SleepMapper();
 
@@ -878,7 +878,7 @@ public class TestGridMixClasses {
   /*
    * test SleepReducer
    */
-  @Test (timeout=1000)
+  @Test (timeout=3000)
   public void testSleepReducer() throws Exception {
     Configuration conf = new Configuration();
     conf.setInt(JobContext.NUM_REDUCES, 2);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java Tue Aug 19 23:49:39 2014
@@ -140,10 +140,10 @@ public class TestGridmixRecord {
       final int chk = WritableComparator.compareBytes(
           out1.getData(), 0, out1.getLength(),
           out2.getData(), 0, out2.getLength());
-      assertEquals(chk, x.compareTo(y));
-      assertEquals(chk, cmp.compare(
+      assertEquals(Integer.signum(chk), Integer.signum(x.compareTo(y)));
+      assertEquals(Integer.signum(chk), Integer.signum(cmp.compare(
             out1.getData(), 0, out1.getLength(),
-            out2.getData(), 0, out2.getLength()));
+            out2.getData(), 0, out2.getLength())));
       // write second copy, compare eq
       final int s1 = out1.getLength();
       x.write(out1);
@@ -153,8 +153,8 @@ public class TestGridmixRecord {
       y.write(out2);
       assertEquals(0, cmp.compare(out2.getData(), 0, s2,
             out2.getData(), s2, out2.getLength() - s2));
-      assertEquals(chk, cmp.compare(out1.getData(), 0, s1,
-            out2.getData(), s2, out2.getLength() - s2));
+      assertEquals(Integer.signum(chk), Integer.signum(cmp.compare(out1.getData(), 0, s1,
+            out2.getData(), s2, out2.getLength() - s2)));
     }
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/pom.xml Tue Aug 19 23:49:39 2014
@@ -146,6 +146,11 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <scope>test</scope>

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/StrictBufferedFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/StrictBufferedFSInputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/StrictBufferedFSInputStream.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/StrictBufferedFSInputStream.java Tue Aug 19 23:49:39 2014
@@ -19,9 +19,11 @@
 package org.apache.hadoop.fs.swift.snative;
 
 import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.swift.exceptions.SwiftConnectionClosedException;
 
+import java.io.EOFException;
 import java.io.IOException;
 
 /**
@@ -37,10 +39,10 @@ public class StrictBufferedFSInputStream
   @Override
   public void seek(long pos) throws IOException {
     if (pos < 0) {
-      throw new IOException("Negative position");
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
     }
     if (in == null) {
-      throw new SwiftConnectionClosedException("Stream closed");
+      throw new SwiftConnectionClosedException(FSExceptionMessages.STREAM_IS_CLOSED);
     }
     super.seek(pos);
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystem.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystem.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystem.java Tue Aug 19 23:49:39 2014
@@ -25,14 +25,14 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException;
-import org.apache.hadoop.fs.swift.exceptions.SwiftNotDirectoryException;
 import org.apache.hadoop.fs.swift.exceptions.SwiftOperationFailedException;
-import org.apache.hadoop.fs.swift.exceptions.SwiftPathExistsException;
 import org.apache.hadoop.fs.swift.exceptions.SwiftUnsupportedFeatureException;
 import org.apache.hadoop.fs.swift.http.SwiftProtocolConstants;
 import org.apache.hadoop.fs.swift.util.DurationStats;
@@ -373,7 +373,7 @@ public class SwiftNativeFileSystem exten
    * @param directory path to query
    * @return true iff the directory should be created
    * @throws IOException IO problems
-   * @throws SwiftNotDirectoryException if the path references a file
+   * @throws ParentNotDirectoryException if the path references a file
    */
   private boolean shouldCreate(Path directory) throws IOException {
     FileStatus fileStatus;
@@ -388,9 +388,9 @@ public class SwiftNativeFileSystem exten
 
       if (!SwiftUtils.isDirectory(fileStatus)) {
         //if it's a file, raise an error
-        throw new SwiftNotDirectoryException(directory,
-                String.format(": can't mkdir since it exists and is not a directory: %s",
-                        fileStatus));
+        throw new ParentNotDirectoryException(
+                String.format("%s: can't mkdir since it exists and is not a directory: %s",
+                    directory, fileStatus));
       } else {
         //path exists, and it is a directory
         if (LOG.isDebugEnabled()) {
@@ -488,7 +488,7 @@ public class SwiftNativeFileSystem exten
         //overwrite set -> delete the object.
         store.delete(absolutePath, true);
       } else {
-        throw new SwiftPathExistsException("Path exists: " + file);
+        throw new FileAlreadyExistsException("Path exists: " + file);
       }
     } else {
       // destination does not exist -trigger creation of the parent
@@ -580,6 +580,9 @@ public class SwiftNativeFileSystem exten
     } catch (SwiftOperationFailedException e) {
       //downgrade to a failure
       return false;
+    } catch (FileAlreadyExistsException e) {
+      //downgrade to a failure
+      return false;
     } catch (FileNotFoundException e) {
       //downgrade to a failure
       return false;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystemStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystemStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeFileSystemStore.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,7 @@ import org.apache.commons.httpclient.Htt
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException;
@@ -590,7 +591,7 @@ public class SwiftNativeFileSystemStore 
         } else {
           //outcome #1 dest it's a file: fail if differeent
           if (!renamingOnToSelf) {
-            throw new SwiftOperationFailedException(
+            throw new FileAlreadyExistsException(
                     "cannot rename a file over one that already exists");
           } else {
             //is mv self self where self is a file. this becomes a no-op
@@ -633,7 +634,7 @@ public class SwiftNativeFileSystemStore 
 
       if (destExists && !destIsDir) {
         // #1 destination is a file: fail
-        throw new SwiftOperationFailedException(
+        throw new FileAlreadyExistsException(
                 "the source is a directory, but not the destination");
       }
       Path targetPath;
@@ -927,7 +928,7 @@ public class SwiftNativeFileSystemStore 
     }
 
     if (LOG.isDebugEnabled()) {
-      SwiftUtils.debug(LOG, SwiftUtils.fileStatsToString(statuses, "\n"));
+      SwiftUtils.debug(LOG, "%s", SwiftUtils.fileStatsToString(statuses, "\n"));
     }
 
     if (filecount == 1 && swiftPath.equals(statuses[0].getPath())) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeInputStream.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeInputStream.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift/snative/SwiftNativeInputStream.java Tue Aug 19 23:49:39 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.swift.snati
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -298,7 +299,8 @@ class SwiftNativeInputStream extends FSI
   @Override
   public synchronized void seek(long targetPos) throws IOException {
     if (targetPos < 0) {
-      throw new IOException("Negative Seek offset not supported");
+      throw new EOFException(
+          FSExceptionMessages.NEGATIVE_SEEK);
     }
     //there's some special handling of near-local data
     //as the seek can be omitted if it is in/adjacent