You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:36:54 UTC

svn commit: r1236045 [3/5] - in /hadoop/common/trunk: hadoop-project/ hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/ hadoop-tools/hadoop-distcp/src/main/ hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/hadoop-distcp/sr...

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The DynamicRecordReader is used in conjunction with the DynamicInputFormat
+ * to implement the "Worker pattern" for DistCp.
+ * The DynamicRecordReader is responsible for:
+ * 1. Presenting the contents of each chunk to DistCp's mapper.
+ * 2. Acquiring a new chunk when the current chunk has been completely consumed,
+ *    transparently.
+ */
+public class DynamicRecordReader<K, V> extends RecordReader<K, V> {
+  private static final Log LOG = LogFactory.getLog(DynamicRecordReader.class);
+  private TaskAttemptContext taskAttemptContext;
+  private Configuration configuration;
+  private DynamicInputChunk<K, V> chunk;
+  private TaskID taskId;
+
+  // Data required for progress indication.
+  private int numRecordsPerChunk; // Constant per job.
+  private int totalNumRecords;    // Constant per job.
+  private int numRecordsProcessedByThisMap = 0;
+  private long timeOfLastChunkDirScan = 0;
+  private boolean isChunkDirAlreadyScanned = false;
+
+  private static long TIME_THRESHOLD_FOR_DIR_SCANS = TimeUnit.MINUTES.toMillis(5);
+
+  /**
+   * Implementation for RecordReader::initialize(). Initializes the internal
+   * RecordReader to read from chunks.
+   * @param inputSplit The InputSplit for the map. Ignored entirely.
+   * @param taskAttemptContext The AttemptContext.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public void initialize(InputSplit inputSplit,
+                         TaskAttemptContext taskAttemptContext)
+                         throws IOException, InterruptedException {
+    numRecordsPerChunk = DynamicInputFormat.getNumEntriesPerChunk(
+            taskAttemptContext.getConfiguration());
+    this.taskAttemptContext = taskAttemptContext;
+    configuration = taskAttemptContext.getConfiguration();
+    taskId = taskAttemptContext.getTaskAttemptID().getTaskID();
+    chunk = DynamicInputChunk.acquire(this.taskAttemptContext);
+    timeOfLastChunkDirScan = System.currentTimeMillis();
+    isChunkDirAlreadyScanned = false;
+
+    totalNumRecords = getTotalNumRecords();
+
+  }
+
+  private int getTotalNumRecords() {
+    return DistCpUtils.getInt(configuration,
+                              DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS);
+  }
+
+  /**
+   * Implementation of RecordReader::nextValue().
+   * Reads the contents of the current chunk and returns them. When a chunk has
+   * been completely exhausted, an new chunk is acquired and read,
+   * transparently.
+   * @return True, if the nextValue() could be traversed to. False, otherwise.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public boolean nextKeyValue()
+      throws IOException, InterruptedException {
+
+    if (chunk == null) {
+      if (LOG.isDebugEnabled())
+        LOG.debug(taskId + ": RecordReader is null. No records to be read.");
+      return false;
+    }
+
+    if (chunk.getReader().nextKeyValue()) {
+      ++numRecordsProcessedByThisMap;
+      return true;
+    }
+
+    if (LOG.isDebugEnabled())
+      LOG.debug(taskId + ": Current chunk exhausted. " +
+                         " Attempting to pick up new one.");
+
+    chunk.release();
+    timeOfLastChunkDirScan = System.currentTimeMillis();
+    isChunkDirAlreadyScanned = false;
+    
+    chunk = DynamicInputChunk.acquire(taskAttemptContext);
+
+    if (chunk == null) return false;
+
+    if (chunk.getReader().nextKeyValue()) {
+      ++numRecordsProcessedByThisMap;
+      return true;
+    }
+    else {
+      return false;
+    }
+  }
+
+  /**
+   * Implementation of RecordReader::getCurrentKey().
+   * @return The key of the current record. (i.e. the source-path.)
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public K getCurrentKey()
+      throws IOException, InterruptedException {
+    return chunk.getReader().getCurrentKey();
+  }
+
+  /**
+   * Implementation of RecordReader::getCurrentValue().
+   * @return The value of the current record. (i.e. the target-path.)
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public V getCurrentValue()
+      throws IOException, InterruptedException {
+    return chunk.getReader().getCurrentValue();
+  }
+
+  /**
+   * Implementation of RecordReader::getProgress().
+   * @return A fraction [0.0,1.0] indicating the progress of a DistCp mapper.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public float getProgress()
+      throws IOException, InterruptedException {
+    final int numChunksLeft = getNumChunksLeft();
+    if (numChunksLeft < 0) {// Un-initialized. i.e. Before 1st dir-scan.
+      assert numRecordsProcessedByThisMap <= numRecordsPerChunk
+              : "numRecordsProcessedByThisMap:" + numRecordsProcessedByThisMap +
+                " exceeds numRecordsPerChunk:" + numRecordsPerChunk;
+      return ((float) numRecordsProcessedByThisMap) / totalNumRecords;
+      // Conservative estimate, till the first directory scan.
+    }
+
+    return ((float) numRecordsProcessedByThisMap)
+            /(numRecordsProcessedByThisMap + numRecordsPerChunk*numChunksLeft);
+  }
+
+  private int getNumChunksLeft() throws IOException {
+    long now = System.currentTimeMillis();
+    boolean tooLongSinceLastDirScan
+                  = now - timeOfLastChunkDirScan > TIME_THRESHOLD_FOR_DIR_SCANS;
+
+    if (tooLongSinceLastDirScan
+            || (!isChunkDirAlreadyScanned &&
+                    numRecordsProcessedByThisMap%numRecordsPerChunk
+                              > numRecordsPerChunk/2)) {
+      DynamicInputChunk.getListOfChunkFiles();
+      isChunkDirAlreadyScanned = true;
+      timeOfLastChunkDirScan = now;
+    }
+
+    return DynamicInputChunk.getNumChunksLeft();
+  }
+  /**
+   * Implementation of RecordReader::close().
+   * Closes the RecordReader.
+   * @throws IOException, on failure.
+   */
+  @Override
+  public void close()
+      throws IOException {
+    if (chunk != null)
+        chunk.close();
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.mapreduce.InputFormat;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Locale;
+import java.text.DecimalFormat;
+import java.net.URI;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Utility functions used in DistCp.
+ */
+public class DistCpUtils {
+
+  private static final Log LOG = LogFactory.getLog(DistCpUtils.class);
+
+  /**
+   * Retrieves size of the file at the specified path.
+   * @param path The path of the file whose size is sought.
+   * @param configuration Configuration, to retrieve the appropriate FileSystem.
+   * @return The file-size, in number of bytes.
+   * @throws IOException, on failure.
+   */
+  public static long getFileSize(Path path, Configuration configuration)
+                                            throws IOException {
+    if (LOG.isDebugEnabled())
+      LOG.debug("Retrieving file size for: " + path);
+    return path.getFileSystem(configuration).getFileStatus(path).getLen();
+  }
+
+  /**
+   * Utility to publish a value to a configuration.
+   * @param configuration The Configuration to which the value must be written.
+   * @param label The label for the value being published.
+   * @param value The value being published.
+   * @param <T> The type of the value.
+   */
+  public static <T> void publish(Configuration configuration,
+                                 String label, T value) {
+    configuration.set(label, String.valueOf(value));
+  }
+
+  /**
+   * Utility to retrieve a specified key from a Configuration. Throw exception
+   * if not found.
+   * @param configuration The Configuration in which the key is sought.
+   * @param label The key being sought.
+   * @return Integer value of the key.
+   */
+  public static int getInt(Configuration configuration, String label) {
+    int value = configuration.getInt(label, -1);
+    assert value >= 0 : "Couldn't find " + label;
+    return value;
+  }
+
+  /**
+   * Utility to retrieve a specified key from a Configuration. Throw exception
+   * if not found.
+   * @param configuration The Configuration in which the key is sought.
+   * @param label The key being sought.
+   * @return Long value of the key.
+   */
+  public static long getLong(Configuration configuration, String label) {
+    long value = configuration.getLong(label, -1);
+    assert value >= 0 : "Couldn't find " + label;
+    return value;
+  }
+
+  /**
+   * Returns the class that implements a copy strategy. Looks up the implementation for
+   * a particular strategy from distcp-default.xml
+   *
+   * @param conf - Configuration object
+   * @param options - Handle to input options
+   * @return Class implementing the strategy specified in options.
+   */
+  public static Class<? extends InputFormat> getStrategy(Configuration conf,
+                                                                 DistCpOptions options) {
+    String confLabel = "distcp." +
+        options.getCopyStrategy().toLowerCase(Locale.getDefault()) + ".strategy.impl";
+    return conf.getClass(confLabel, UniformSizeInputFormat.class, InputFormat.class);
+  }
+
+  /**
+   * Gets relative path of child path with respect to a root path
+   * For ex. If childPath = /tmp/abc/xyz/file and
+   *            sourceRootPath = /tmp/abc
+   * Relative path would be /xyz/file
+   *         If childPath = /file and
+   *            sourceRootPath = /
+   * Relative path would be /file
+   * @param sourceRootPath - Source root path
+   * @param childPath - Path for which relative path is required
+   * @return - Relative portion of the child path (always prefixed with /
+   *           unless it is empty 
+   */
+  public static String getRelativePath(Path sourceRootPath, Path childPath) {
+    String childPathString = childPath.toUri().getPath();
+    String sourceRootPathString = sourceRootPath.toUri().getPath();
+    return sourceRootPathString.equals("/") ? childPathString :
+        childPathString.substring(sourceRootPathString.length());
+  }
+
+  /**
+   * Pack file preservation attributes into a string, containing
+   * just the first character of each preservation attribute
+   * @param attributes - Attribute set to preserve
+   * @return - String containing first letters of each attribute to preserve
+   */
+  public static String packAttributes(EnumSet<FileAttribute> attributes) {
+    StringBuffer buffer = new StringBuffer(5);
+    int len = 0;
+    for (FileAttribute attribute : attributes) {
+      buffer.append(attribute.name().charAt(0));
+      len++;
+    }
+    return buffer.substring(0, len);
+  }
+
+  /**
+   * Un packs preservation attribute string containing the first character of
+   * each preservation attribute back to a set of attributes to preserve
+   * @param attributes - Attribute string
+   * @return - Attribute set
+   */
+  public static EnumSet<FileAttribute> unpackAttributes(String attributes) {
+    EnumSet<FileAttribute> retValue = EnumSet.noneOf(FileAttribute.class);
+
+    if (attributes != null) {
+      for (int index = 0; index < attributes.length(); index++) {
+        retValue.add(FileAttribute.getAttribute(attributes.charAt(index)));
+      }
+    }
+
+    return retValue;
+  }
+
+  /**
+   * Preserve attribute on file matching that of the file status being sent
+   * as argument. Barring the block size, all the other attributes are preserved
+   * by this function
+   *
+   * @param targetFS - File system
+   * @param path - Path that needs to preserve original file status
+   * @param srcFileStatus - Original file status
+   * @param attributes - Attribute set that need to be preserved
+   * @throws IOException - Exception if any (particularly relating to group/owner
+   *                       change or any transient error)
+   */
+  public static void preserve(FileSystem targetFS, Path path,
+                              FileStatus srcFileStatus,
+                              EnumSet<FileAttribute> attributes) throws IOException {
+
+    FileStatus targetFileStatus = targetFS.getFileStatus(path);
+    String group = targetFileStatus.getGroup();
+    String user = targetFileStatus.getOwner();
+    boolean chown = false;
+
+    if (attributes.contains(FileAttribute.PERMISSION) &&
+      !srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) {
+      targetFS.setPermission(path, srcFileStatus.getPermission());
+    }
+
+    if (attributes.contains(FileAttribute.REPLICATION) && ! targetFileStatus.isDirectory() &&
+        srcFileStatus.getReplication() != targetFileStatus.getReplication()) {
+      targetFS.setReplication(path, srcFileStatus.getReplication());
+    }
+
+    if (attributes.contains(FileAttribute.GROUP) &&
+            !group.equals(srcFileStatus.getGroup())) {
+      group = srcFileStatus.getGroup();
+      chown = true;
+    }
+
+    if (attributes.contains(FileAttribute.USER) &&
+            !user.equals(srcFileStatus.getOwner())) {
+      user = srcFileStatus.getOwner();
+      chown = true;
+    }
+
+    if (chown) {
+      targetFS.setOwner(path, user, group);
+    }
+  }
+
+  /**
+   * Sort sequence file containing FileStatus and Text as key and value respecitvely
+   *
+   * @param fs - File System
+   * @param conf - Configuration
+   * @param sourceListing - Source listing file
+   * @return Path of the sorted file. Is source file with _sorted appended to the name
+   * @throws IOException - Any exception during sort.
+   */
+  public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing)
+      throws IOException {
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, FileStatus.class, conf);
+    Path output = new Path(sourceListing.toString() +  "_sorted");
+
+    if (fs.exists(output)) {
+      fs.delete(output, false);
+    }
+
+    sorter.sort(sourceListing, output);
+    return output;
+  }
+
+  /**
+   * String utility to convert a number-of-bytes to human readable format.
+   */
+  private static ThreadLocal<DecimalFormat> FORMATTER
+                        = new ThreadLocal<DecimalFormat>() {
+    @Override
+    protected DecimalFormat initialValue() {
+      return new DecimalFormat("0.0");
+    }
+  };
+
+  public static DecimalFormat getFormatter() {
+    return FORMATTER.get();
+  }
+
+  public static String getStringDescriptionFor(long nBytes) {
+
+    char units [] = {'B', 'K', 'M', 'G', 'T', 'P'};
+
+    double current = nBytes;
+    double prev    = current;
+    int index = 0;
+
+    while ((current = current/1024) >= 1) {
+      prev = current;
+      ++index;
+    }
+
+    assert index < units.length : "Too large a number.";
+
+    return getFormatter().format(prev) + units[index];
+  }
+
+  /**
+   * Utility to compare checksums for the paths specified.
+   *
+   * If checksums's can't be retrieved, it doesn't fail the test
+   * Only time the comparison would fail is when checksums are
+   * available and they don't match
+   *                                  
+   * @param sourceFS FileSystem for the source path.
+   * @param source The source path.
+   * @param targetFS FileSystem for the target path.
+   * @param target The target path.
+   * @return If either checksum couldn't be retrieved, the function returns
+   * false. If checksums are retrieved, the function returns true if they match,
+   * and false otherwise.
+   * @throws IOException if there's an exception while retrieving checksums.
+   */
+  public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
+                                   FileSystem targetFS, Path target)
+                                   throws IOException {
+    FileChecksum sourceChecksum = null;
+    FileChecksum targetChecksum = null;
+    try {
+      sourceChecksum = sourceFS.getFileChecksum(source);
+      targetChecksum = targetFS.getFileChecksum(target);
+    } catch (IOException e) {
+      LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
+    }
+    return (sourceChecksum == null || targetChecksum == null ||
+            sourceChecksum.equals(targetChecksum));
+  }
+
+  /* see if two file systems are the same or not
+   *
+   */
+  public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
+    URI srcUri = srcFs.getUri();
+    URI dstUri = destFs.getUri();
+    if (srcUri.getScheme() == null) {
+      return false;
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false;
+    }
+    String srcHost = srcUri.getHost();
+    String dstHost = dstUri.getHost();
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+      } catch(UnknownHostException ue) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Could not compare file-systems. Unknown host: ", ue);
+        return false;
+      }
+      if (!srcHost.equals(dstHost)) {
+        return false;
+      }
+    }
+    else if (srcHost == null && dstHost != null) {
+      return false;
+    }
+    else if (srcHost != null) {
+      return false;
+    }
+
+    //check for ports
+
+    return srcUri.getPort() == dstUri.getPort();
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents commands that be retried on failure, in a configurable
+ * manner.
+ */
+public abstract class RetriableCommand {
+
+  private static Log LOG = LogFactory.getLog(RetriableCommand.class);
+
+  private static final long DELAY_MILLISECONDS = 500;
+  private static final int  MAX_RETRIES        = 3;
+
+  private RetryPolicy retryPolicy = RetryPolicies.
+      exponentialBackoffRetry(MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS);
+  protected String description;
+
+  /**
+   * Constructor.
+   * @param description The human-readable description of the command.
+   */
+  public RetriableCommand(String description) {
+    this.description = description;
+  }
+
+  /**
+   * Constructor.
+   * @param description The human-readable description of the command.
+   * @param retryPolicy The RetryHandler to be used to compute retries.
+   */
+  public RetriableCommand(String description, RetryPolicy retryPolicy) {
+    this(description);
+    setRetryPolicy(retryPolicy);
+  }
+
+  /**
+   * Implement this interface-method define the command-logic that will be
+   * retried on failure (i.e. with Exception).
+   * @param arguments Argument-list to the command.
+   * @return Generic "Object".
+   * @throws Exception Throws Exception on complete failure.
+   */
+  protected abstract Object doExecute(Object... arguments) throws Exception;
+
+  /**
+   * The execute() method invokes doExecute() until either:
+   *  1. doExecute() succeeds, or
+   *  2. the command may no longer be retried (e.g. runs out of retry-attempts).
+   * @param arguments The list of arguments for the command.
+   * @return Generic "Object" from doExecute(), on success.
+   * @throws IOException, IOException, on complete failure.
+   */
+  public Object execute(Object... arguments) throws Exception {
+    Exception latestException;
+    int counter = 0;
+    do {
+      try {
+        return doExecute(arguments);
+      } catch(Exception exception) {
+        LOG.error("Failure in Retriable command: " + description, exception);
+        latestException = exception;
+      }
+      counter++;
+    } while (retryPolicy.shouldRetry(latestException, counter, 0, true).equals(RetryPolicy.RetryAction.RETRY));
+
+    throw new IOException("Couldn't run retriable-command: " + description,
+                          latestException);
+  }
+
+  /**
+   * Fluent-interface to change the RetryHandler.
+   * @param retryHandler The new RetryHandler instance to be used.
+   * @return Self.
+   */
+  public RetriableCommand setRetryPolicy(RetryPolicy retryHandler) {
+    this.retryPolicy = retryHandler;
+    return this;
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * The ThrottleInputStream provides bandwidth throttling on a specified
+ * InputStream. It is implemented as a wrapper on top of another InputStream
+ * instance.
+ * The throttling works by examining the number of bytes read from the underlying
+ * InputStream from the beginning, and sleep()ing for a time interval if
+ * the byte-transfer is found exceed the specified tolerable maximum.
+ * (Thus, while the read-rate might exceed the maximum for a given short interval,
+ * the average tends towards the specified maximum, overall.)
+ */
+public class ThrottledInputStream extends InputStream {
+
+  private final InputStream rawStream;
+  private final long maxBytesPerSec;
+  private final long startTime = System.currentTimeMillis();
+
+  private long bytesRead = 0;
+  private long totalSleepTime = 0;
+
+  private static final long SLEEP_DURATION_MS = 50;
+
+  public ThrottledInputStream(InputStream rawStream) {
+    this(rawStream, Long.MAX_VALUE);
+  }
+
+  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
+    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 
+    this.rawStream = rawStream;
+    this.maxBytesPerSec = maxBytesPerSec;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read() throws IOException {
+    throttle();
+    int data = rawStream.read();
+    if (data != -1) {
+      bytesRead++;
+    }
+    return data;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read(byte[] b) throws IOException {
+    throttle();
+    int readLen = rawStream.read(b);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    throttle();
+    int readLen = rawStream.read(b, off, len);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
+  private void throttle() throws IOException {
+    if (getBytesPerSec() > maxBytesPerSec) {
+      try {
+        Thread.sleep(SLEEP_DURATION_MS);
+        totalSleepTime += SLEEP_DURATION_MS;
+      } catch (InterruptedException e) {
+        throw new IOException("Thread aborted", e);
+      }
+    }
+  }
+
+  /**
+   * Getter for the number of bytes read from this stream, since creation.
+   * @return The number of bytes.
+   */
+  public long getTotalBytesRead() {
+    return bytesRead;
+  }
+
+  /**
+   * Getter for the read-rate from this stream, since creation.
+   * Calculated as bytesRead/elapsedTimeSinceStart.
+   * @return Read rate, in bytes/sec.
+   */
+  public long getBytesPerSec() {
+    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
+    if (elapsed == 0) {
+      return bytesRead;
+    } else {
+      return bytesRead / elapsed;
+    }
+  }
+
+  /**
+   * Getter the total time spent in sleep.
+   * @return Number of milliseconds spent in sleep.
+   */
+  public long getTotalSleepTime() {
+    return totalSleepTime;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public String toString() {
+    return "ThrottledInputStream{" +
+        "bytesRead=" + bytesRead +
+        ", maxBytesPerSec=" + maxBytesPerSec +
+        ", bytesPerSec=" + getBytesPerSec() +
+        ", totalSleepTime=" + totalSleepTime +
+        '}';
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Do not modify this file directly. Anything that need to be overwritten,
+     need to be done so, through -D switches or customized conf -->
+
+<configuration>
+
+    <property>
+        <name>distcp.dynamic.strategy.impl</name>
+        <value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
+        <description>Implementation of dynamic input format</description>
+    </property>
+
+    <property>
+        <name>distcp.static.strategy.impl</name>
+        <value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
+        <description>Implementation of static input format</description>
+    </property>
+
+    <property>
+        <name>mapred.job.map.memory.mb</name>
+        <value>1024</value>
+    </property>
+
+    <property>
+        <name>mapred.job.reduce.memory.mb</name>
+        <value>1024</value>
+    </property>
+
+    <property>
+        <name>mapred.reducer.new-api</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.class</name>
+        <value>org.apache.hadoop.mapreduce.Reducer</value>
+    </property>
+
+</configuration>

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="ISO-8859-1" ?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<faqs xmlns="http://maven.apache.org/FML/1.0.1"
+      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+      xsi:schemaLocation="http://maven.apache.org/FML/1.0.1 http://maven.apache.org/xsd/fml-1.0.1.xsd"
+      title="Frequently Asked Questions">
+  <part id="General">
+    <title>General</title>
+
+    <faq id="Update">
+      <question>Why does -update not create the parent source-directory under
+      a pre-existing target directory?</question>
+      <answer>The behaviour of <code>-update</code> and <code>-overwrite</code>
+      is described in detail in the Usage section of this document. In short,
+      if either option is used with a pre-existing destination directory, the
+      <strong>contents</strong> of each source directory is copied over, rather
+      than the source-directory itself.
+      This behaviour is consistent with the legacy DistCp implementation as well.
+      </answer>
+    </faq>
+
+    <faq id="Deviation">
+      <question>How does the new DistCp differ in semantics from the Legacy
+      DistCp?</question>
+      <answer>
+          <ul>
+              <li>Files that are skipped during copy used to also have their
+              file-attributes (permissions, owner/group info, etc.) unchanged,
+              when copied with Legacy DistCp. These are now updated, even if
+              the file-copy is skipped.</li>
+              <li>Empty root directories among the source-path inputs were not
+              created at the target, in Legacy DistCp. These are now created.</li>
+          </ul>
+      </answer>
+    </faq>
+
+    <faq id="nMaps">
+      <question>Why does the new DistCp use more maps than legacy DistCp?</question>
+      <answer>
+          <p>Legacy DistCp works by figuring out what files need to be actually
+      copied to target <strong>before</strong> the copy-job is launched, and then
+      launching as many maps as required for copy. So if a majority of the files
+      need to be skipped (because they already exist, for example), fewer maps
+      will be needed. As a consequence, the time spent in setup (i.e. before the
+      M/R job) is higher.</p>
+          <p>The new DistCp calculates only the contents of the source-paths. It
+      doesn't try to filter out what files can be skipped. That decision is put-
+      off till the M/R job runs. This is much faster (vis-a-vis execution-time),
+      but the number of maps launched will be as specified in the <code>-m</code>
+      option, or 20 (default) if unspecified.</p>
+      </answer>
+    </faq>
+
+    <faq id="more_maps">
+      <question>Why does DistCp not run faster when more maps are specified?</question>
+      <answer>
+          <p>At present, the smallest unit of work for DistCp is a file. i.e.,
+          a file is processed by only one map. Increasing the number of maps to
+          a value exceeding the number of files would yield no performance
+          benefit. The number of maps lauched would equal the number of files.</p>
+      </answer>
+    </faq>
+
+    <faq id="client_mem">
+      <question>Why does DistCp run out of memory?</question>
+      <answer>
+          <p>If the number of individual files/directories being copied from
+      the source path(s) is extremely large (e.g. 1,000,000 paths), DistCp might
+      run out of memory while determining the list of paths for copy. This is
+      not unique to the new DistCp implementation.</p>
+          <p>To get around this, consider changing the <code>-Xmx</code> JVM
+      heap-size parameters, as follows:</p>
+          <p><code>bash$ export HADOOP_CLIENT_OPTS="-Xms64m -Xmx1024m"</code></p>
+          <p><code>bash$ hadoop distcp /source /target</code></p>
+      </answer>
+    </faq>
+
+  </part>
+</faqs>

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<!-- START SNIPPET: docDescriptor -->
+<document xmlns="http://maven.apache.org/DOCUMENT/1.0.1"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/DOCUMENT/1.0.1 http://maven.apache.org/xsd/document-1.0.1.xsd"
+  outputName="distcp">
+
+  <meta>
+    <title>${project.name}</title>
+  </meta>
+
+  <toc name="Table of Contents">
+    <item name="Introduction" ref="index.xml"/>
+    <item name="Usage" ref="usage.xml"/>
+    <item name="Command Line Reference" ref="cli.xml"/>
+    <item name="Architecture" ref="architecture.xml"/>
+    <item name="Appendix" ref="appendix.xml"/>
+    <item name="FAQ" ref="faq.fml"/>
+  </toc>
+  <cover>
+    <coverTitle>${project.name}</coverTitle>
+    <coverSubTitle>v. ${project.version}</coverSubTitle>
+    <coverType>User Guide</coverType>
+    <projectName>${project.name}</projectName>
+    <companyName>Apache Hadoop</companyName>
+  </cover>
+</document>

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+  <head>
+    <title>Appendix</title>
+  </head>
+  <body>
+    <section name="Map sizing">
+ 
+      <p> By default, DistCp makes an attempt to size each map comparably so
+      that each copies roughly the same number of bytes. Note that files are the
+      finest level of granularity, so increasing the number of simultaneous
+      copiers (i.e. maps) may not always increase the number of
+      simultaneous copies nor the overall throughput.</p>
+
+      <p> The new DistCp also provides a strategy to "dynamically" size maps,
+      allowing faster data-nodes to copy more bytes than slower nodes. Using
+      <code>-strategy dynamic</code> (explained in the Architecture), rather
+      than to assign a fixed set of source-files to each map-task, files are
+      instead split into several sets. The number of sets exceeds the number of
+      maps, usually by a factor of 2-3. Each map picks up and copies all files
+      listed in a chunk. When a chunk is exhausted, a new chunk is acquired and
+      processed, until no more chunks remain.</p>
+
+      <p> By not assigning a source-path to a fixed map, faster map-tasks (i.e.
+      data-nodes) are able to consume more chunks, and thus copy more data,
+      than slower nodes. While this distribution isn't uniform, it is
+      <strong>fair</strong> with regard to each mapper's capacity.</p>
+
+      <p>The dynamic-strategy is implemented by the DynamicInputFormat. It
+      provides superior performance under most conditions. </p>
+
+      <p>Tuning the number of maps to the size of the source and
+      destination clusters, the size of the copy, and the available
+      bandwidth is recommended for long-running and regularly run jobs.</p>
+
+   </section>
+
+   <section name="Copying between versions of HDFS">
+
+        <p>For copying between two different versions of Hadoop, one will
+        usually use HftpFileSystem. This is a read-only FileSystem, so DistCp
+        must be run on the destination cluster (more specifically, on
+        TaskTrackers that can write to the destination cluster). Each source is
+        specified as <code>hftp://&lt;dfs.http.address&gt;/&lt;path&gt;</code>
+        (the default <code>dfs.http.address</code> is
+        &lt;namenode&gt;:50070).</p>
+
+   </section>
+
+   <section name="Map/Reduce and other side-effects">
+
+        <p>As has been mentioned in the preceding, should a map fail to copy
+        one of its inputs, there will be several side-effects.</p>
+
+        <ul>
+
+          <li>Unless <code>-overwrite</code> is specified, files successfully
+          copied by a previous map on a re-execution will be marked as
+          &quot;skipped&quot;.</li>
+
+          <li>If a map fails <code>mapred.map.max.attempts</code> times, the
+          remaining map tasks will be killed (unless <code>-i</code> is
+          set).</li>
+
+          <li>If <code>mapred.speculative.execution</code> is set set
+          <code>final</code> and <code>true</code>, the result of the copy is
+          undefined.</li>
+
+        </ul>
+
+   </section>
+
+   <section name="SSL Configurations for HSFTP sources:">
+
+       <p>To use an HSFTP source (i.e. using the hsftp protocol), a Map-Red SSL
+       configuration file needs to be specified (via the <code>-mapredSslConf</code>
+       option). This must specify 3 parameters:</p>
+
+       <ul>
+           <li><code>ssl.client.truststore.location</code>: The local-filesystem
+            location of the trust-store file, containing the certificate for
+            the namenode.</li>
+
+           <li><code>ssl.client.truststore.type</code>: (Optional) The format of
+           the trust-store file.</li>
+
+           <li><code>ssl.client.truststore.password</code>: (Optional) Password
+           for the trust-store file.</li>
+
+       </ul>
+
+       <p>The following is an example of the contents of the contents of
+       a Map-Red SSL Configuration file:</p>
+
+           <p> <br/> <code> &lt;configuration&gt; </code> </p>
+
+           <p> <br/> <code>&lt;property&gt; </code> </p>
+           <p> <code>&lt;name&gt;ssl.client.truststore.location&lt;/name&gt; </code> </p>
+           <p> <code>&lt;value&gt;/work/keystore.jks&lt;/value&gt; </code> </p>
+           <p> <code>&lt;description&gt;Truststore to be used by clients like distcp. Must be specified. &lt;/description&gt;</code> </p>
+           <p> <br/> <code>&lt;/property&gt; </code> </p>
+
+           <p><code> &lt;property&gt; </code> </p>
+           <p> <code>&lt;name&gt;ssl.client.truststore.password&lt;/name&gt; </code> </p>
+           <p> <code>&lt;value&gt;changeme&lt;/value&gt; </code> </p>
+           <p> <code>&lt;description&gt;Optional. Default value is "". &lt;/description&gt;  </code> </p>
+           <p> <code>&lt;/property&gt; </code>  </p>
+
+           <p> <br/> <code> &lt;property&gt; </code> </p>
+           <p> <code> &lt;name&gt;ssl.client.truststore.type&lt;/name&gt;</code>  </p>
+           <p> <code> &lt;value&gt;jks&lt;/value&gt;</code>  </p>
+           <p> <code> &lt;description&gt;Optional. Default value is "jks". &lt;/description&gt;</code>  </p>
+           <p> <code> &lt;/property&gt; </code> </p>
+
+           <p> <code> <br/> &lt;/configuration&gt; </code> </p>
+
+       <p><br/>The SSL configuration file must be in the class-path of the 
+       DistCp program.</p>
+
+   </section>
+
+  </body>
+</document>

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+    <head>
+        <title>Architecture of DistCp</title>
+    </head>
+    <body>
+      <section name="Architecture">
+
+        <p>The components of the new DistCp may be classified into the following
+           categories: </p>
+
+        <ul>
+
+          <li>DistCp Driver</li>
+          <li>Copy-listing generator</li>
+          <li>Input-formats and Map-Reduce components</li>
+
+        </ul>
+
+        <subsection name="DistCp Driver">
+          <p>The DistCp Driver components are responsible for:</p>
+
+          <ul>
+            <li>Parsing the arguments passed to the DistCp command on the
+                command-line, via:
+              <ul>
+                <li>OptionsParser, and</li>
+                <li>DistCpOptionsSwitch</li>
+              </ul>
+            </li>
+            <li>Assembling the command arguments into an appropriate
+                DistCpOptions object, and initializing DistCp. These arguments
+                include:
+              <ul>
+                <li>Source-paths</li>
+                <li>Target location</li>
+                <li>Copy options (e.g. whether to update-copy, overwrite, which
+                    file-attributes to preserve, etc.)</li>
+              </ul>
+            </li>
+            <li>Orchestrating the copy operation by:
+              <ul>
+                <li>Invoking the copy-listing-generator to create the list of
+                    files to be copied.</li>
+                <li>Setting up and launching the Hadoop Map-Reduce Job to carry
+                    out the copy.</li>
+                <li>Based on the options, either returning a handle to the
+                    Hadoop MR Job immediately, or waiting till completion.</li>
+              </ul>
+            </li>
+          </ul>
+          <br/>
+
+          <p>The parser-elements are exercised only from the command-line (or if
+             DistCp::run() is invoked). The DistCp class may also be used
+             programmatically, by constructing the DistCpOptions object, and
+             initializing a DistCp object appropriately.</p>
+
+        </subsection>
+
+        <subsection name="Copy-listing generator">
+
+          <p>The copy-listing-generator classes are responsible for creating the
+             list of files/directories to be copied from source. They examine
+             the contents of the source-paths (files/directories, including
+             wild-cards), and record all paths that need copy into a sequence-
+             file, for consumption by the DistCp Hadoop Job. The main classes in
+             this module include:</p>
+
+          <ol>
+
+            <li>CopyListing: The interface that should be implemented by any 
+                copy-listing-generator implementation. Also provides the factory
+                method by which the concrete CopyListing implementation is
+                chosen.</li>
+
+            <li>SimpleCopyListing: An implementation of CopyListing that accepts
+                multiple source paths (files/directories), and recursively lists
+                all the individual files and directories under each, for
+                copy.</li>
+
+            <li>GlobbedCopyListing: Another implementation of CopyListing that
+                expands wild-cards in the source paths.</li>
+
+            <li>FileBasedCopyListing: An implementation of CopyListing that
+                reads the source-path list from a specified file.</li>
+
+          </ol>
+          <p/>
+
+          <p>Based on whether a source-file-list is specified in the
+             DistCpOptions, the source-listing is generated in one of the
+             following ways:</p>
+
+          <ol>
+
+            <li>If there's no source-file-list, the GlobbedCopyListing is used.
+                All wild-cards are expanded, and all the expansions are
+                forwarded to the SimpleCopyListing, which in turn constructs the
+                listing (via recursive descent of each path). </li>
+
+            <li>If a source-file-list is specified, the FileBasedCopyListing is
+                used. Source-paths are read from the specified file, and then
+                forwarded to the GlobbedCopyListing. The listing is then
+                constructed as described above.</li>
+
+          </ol>
+
+          <br/>
+
+          <p>One may customize the method by which the copy-listing is
+             constructed by providing a custom implementation of the CopyListing
+             interface. The behaviour of DistCp differs here from the legacy
+             DistCp, in how paths are considered for copy. </p>
+
+          <p>The legacy implementation only lists those paths that must
+             definitely be copied on to target.
+             E.g. if a file already exists at the target (and -overwrite isn't
+             specified), the file isn't even considered in the Map-Reduce Copy
+             Job. Determining this during setup (i.e. before the Map-Reduce Job)
+             involves file-size and checksum-comparisons that are potentially
+             time-consuming.</p>
+
+          <p>The new DistCp postpones such checks until the Map-Reduce Job, thus
+             reducing setup time. Performance is enhanced further since these
+             checks are parallelized across multiple maps.</p>
+
+        </subsection>
+
+        <subsection name="Input-formats and Map-Reduce components">
+
+          <p> The Input-formats and Map-Reduce components are responsible for
+              the actual copy of files and directories from the source to the
+              destination path. The listing-file created during copy-listing
+              generation is consumed at this point, when the copy is carried
+              out. The classes of interest here include:</p>
+
+          <ul>
+            <li><strong>UniformSizeInputFormat:</strong> This implementation of
+                org.apache.hadoop.mapreduce.InputFormat provides equivalence
+                with Legacy DistCp in balancing load across maps.
+                The aim of the UniformSizeInputFormat is to make each map copy
+                roughly the same number of bytes. Apropos, the listing file is
+                split into groups of paths, such that the sum of file-sizes in
+                each InputSplit is nearly equal to every other map. The splitting
+                isn't always perfect, but its trivial implementation keeps the
+                setup-time low.</li>
+
+            <li><strong>DynamicInputFormat and DynamicRecordReader:</strong>
+                <p> The DynamicInputFormat implements org.apache.hadoop.mapreduce.InputFormat,
+                and is new to DistCp. The listing-file is split into several
+                "chunk-files", the exact number of chunk-files being a multiple
+                of the number of maps requested for in the Hadoop Job. Each map
+                task is "assigned" one of the chunk-files (by renaming the chunk
+                to the task's id), before the Job is launched.</p>
+
+                <p>Paths are read from each chunk using the DynamicRecordReader,
+                and processed in the CopyMapper. After all the paths in a chunk
+                are processed, the current chunk is deleted and a new chunk is
+                acquired. The process continues until no more chunks are
+                available.</p>
+                <p>This "dynamic" approach allows faster map-tasks to consume
+                more paths than slower ones, thus speeding up the DistCp job
+                overall. </p>
+            </li>
+
+            <li><strong>CopyMapper:</strong> This class implements the physical
+                file-copy. The input-paths are checked against the input-options
+                (specified in the Job's Configuration), to determine whether a
+                file needs copy. A file will be copied only if at least one of
+                the following is true:
+              <ul>
+                <li>A file with the same name doesn't exist at target.</li>
+                <li>A file with the same name exists at target, but has a
+                    different file size.</li>
+                <li>A file with the same name exists at target, but has a
+                    different checksum, and -skipcrccheck isn't mentioned.</li>
+                <li>A file with the same name exists at target, but -overwrite
+                    is specified.</li>
+                <li>A file with the same name exists at target, but differs in
+                    block-size (and block-size needs to be preserved.</li>
+              </ul>
+            </li>
+
+            <li><strong>CopyCommitter:</strong>
+                This class is responsible for the commit-phase of the DistCp
+                job, including:
+              <ul>
+                <li>Preservation of directory-permissions (if specified in the
+                    options)</li>
+                <li>Clean-up of temporary-files, work-directories, etc.</li>
+              </ul>
+            </li>
+          </ul>
+        </subsection>
+      </section>
+    </body>
+</document>

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+  <head>
+    <title>Command Line Options</title>
+  </head>
+  <body>
+      <section name="Options Index"> 
+        <table>
+          <tr><th> Flag </th><th> Description </th><th> Notes </th></tr>
+
+          <tr><td><code>-p[rbugp]</code></td>
+              <td>Preserve<br/>
+                  r: replication number<br/>
+                  b: block size<br/>
+                  u: user<br/>
+                  g: group<br/>
+                  p: permission<br/></td>
+              <td>Modification times are not preserved. Also, when
+              <code>-update</code> is specified, status updates will
+              <strong>not</strong> be synchronized unless the file sizes
+              also differ (i.e. unless the file is re-created).
+              </td></tr>
+          <tr><td><code>-i</code></td>
+              <td>Ignore failures</td>
+              <td>As explained in the Appendix, this option
+              will keep more accurate statistics about the copy than the
+              default case. It also preserves logs from failed copies, which
+              can be valuable for debugging. Finally, a failing map will not
+              cause the job to fail before all splits are attempted.
+              </td></tr>
+          <tr><td><code>-log &lt;logdir&gt;</code></td>
+              <td>Write logs to &lt;logdir&gt;</td>
+              <td>DistCp keeps logs of each file it attempts to copy as map
+              output. If a map fails, the log output will not be retained if
+              it is re-executed.
+              </td></tr>
+          <tr><td><code>-m &lt;num_maps&gt;</code></td>
+              <td>Maximum number of simultaneous copies</td>
+              <td>Specify the number of maps to copy data. Note that more maps
+              may not necessarily improve throughput.
+              </td></tr>
+          <tr><td><code>-overwrite</code></td>
+              <td>Overwrite destination</td>
+              <td>If a map fails and <code>-i</code> is not specified, all the
+              files in the split, not only those that failed, will be recopied.
+              As discussed in the Usage documentation, it also changes
+              the semantics for generating destination paths, so users should
+              use this carefully.
+              </td></tr>
+          <tr><td><code>-update</code></td>
+              <td>Overwrite if src size different from dst size</td>
+              <td>As noted in the preceding, this is not a &quot;sync&quot;
+              operation. The only criterion examined is the source and
+              destination file sizes; if they differ, the source file
+              replaces the destination file. As discussed in the
+              Usage documentation, it also changes the semantics for
+              generating destination paths, so users should use this carefully.
+              </td></tr>
+          <tr><td><code>-f &lt;urilist_uri&gt;</code></td>
+              <td>Use list at &lt;urilist_uri&gt; as src list</td>
+              <td>This is equivalent to listing each source on the command
+              line. The <code>urilist_uri</code> list should be a fully
+              qualified URI.
+              </td></tr>
+          <tr><td><code>-filelimit &lt;n&gt;</code></td>
+              <td>Limit the total number of files to be &lt;= n</td>
+              <td><strong>Deprecated!</strong> Ignored in the new DistCp.
+              </td></tr>
+          <tr><td><code>-sizelimit &lt;n&gt;</code></td>
+              <td>Limit the total size to be &lt;= n bytes</td>
+              <td><strong>Deprecated!</strong> Ignored in the new DistCp.
+              </td></tr>
+          <tr><td><code>-delete</code></td>
+              <td>Delete the files existing in the dst but not in src</td>
+              <td>The deletion is done by FS Shell.  So the trash will be used,
+                  if it is enable.
+              </td></tr>
+          <tr><td><code>-strategy {dynamic|uniformsize}</code></td>
+              <td>Choose the copy-strategy to be used in DistCp.</td>
+              <td>By default, uniformsize is used. (i.e. Maps are balanced on the
+                  total size of files copied by each map. Similar to legacy.)
+                  If "dynamic" is specified, <code>DynamicInputFormat</code> is
+                  used instead. (This is described in the Architecture section,
+                  under InputFormats.)
+              </td></tr>
+          <tr><td><code>-bandwidth</code></td>
+                <td>Specify bandwidth per map, in MB/second.</td>
+                <td>Each map will be restricted to consume only the specified
+                    bandwidth. This is not always exact. The map throttles back
+                    its bandwidth consumption during a copy, such that the
+                    <strong>net</strong> bandwidth used tends towards the
+                    specified value.
+                </td></tr>
+          <tr><td><code>-atomic {-tmp &lt;tmp_dir&gt;}</code></td>
+                <td>Specify atomic commit, with optional tmp directory.</td>
+                <td><code>-atomic</code> instructs DistCp to copy the source
+                    data to a temporary target location, and then move the
+                    temporary target to the final-location atomically. Data will
+                    either be available at final target in a complete and consistent
+                    form, or not at all.
+                    Optionally, <code>-tmp</code> may be used to specify the
+                    location of the tmp-target. If not specified, a default is
+                    chosen. <strong>Note:</strong> tmp_dir must be on the final
+                    target cluster.
+                </td></tr>
+            <tr><td><code>-mapredSslConf &lt;ssl_conf_file&gt;</code></td>
+                  <td>Specify SSL Config file, to be used with HSFTP source</td>
+                  <td>When using the hsftp protocol with a source, the security-
+                      related properties may be specified in a config-file and
+                      passed to DistCp. &lt;ssl_conf_file&gt; needs to be in
+                      the classpath.
+                  </td></tr>
+            <tr><td><code>-async</code></td>
+                  <td>Run DistCp asynchronously. Quits as soon as the Hadoop
+                  Job is launched.</td>
+                  <td>The Hadoop Job-id is logged, for tracking.
+                  </td></tr>
+        </table>
+      </section>
+  </body>
+</document>

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+  <head>
+    <title>DistCp</title>
+  </head>
+  <body>
+    <section name="Overview">
+      <p>
+        DistCp (distributed copy) is a tool used for large inter/intra-cluster
+      copying. It uses Map/Reduce to effect its distribution, error
+      handling and recovery, and reporting. It expands a list of files and
+      directories into input to map tasks, each of which will copy a partition
+      of the files specified in the source list.
+      </p>
+      <p>
+       The erstwhile implementation of DistCp has its share of quirks and
+       drawbacks, both in its usage, as well as its extensibility and
+       performance. The purpose of the DistCp refactor was to fix these shortcomings,
+       enabling it to be used and extended programmatically. New paradigms have
+       been introduced to improve runtime and setup performance, while simultaneously
+       retaining the legacy behaviour as default.
+      </p>
+      <p>
+       This document aims to describe the design of the new DistCp, its spanking
+       new features, their optimal use, and any deviance from the legacy
+       implementation.
+      </p>
+    </section>
+  </body>
+</document>

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,147 @@
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+  <head>
+    <title>Usage </title>
+  </head>
+  <body>
+    <section name="Basic Usage">
+        <p>The most common invocation of DistCp is an inter-cluster copy:</p>
+        <p><code>bash$ hadoop jar hadoop-distcp.jar hdfs://nn1:8020/foo/bar \</code><br/>
+           <code>                    hdfs://nn2:8020/bar/foo</code></p>
+
+        <p>This will expand the namespace under <code>/foo/bar</code> on nn1
+        into a temporary file, partition its contents among a set of map
+        tasks, and start a copy on each TaskTracker from nn1 to nn2.</p>
+
+        <p>One can also specify multiple source directories on the command
+        line:</p>
+        <p><code>bash$ hadoop jar hadoop-distcp.jar hdfs://nn1:8020/foo/a \</code><br/>
+           <code> hdfs://nn1:8020/foo/b \</code><br/>
+           <code> hdfs://nn2:8020/bar/foo</code></p>
+
+        <p>Or, equivalently, from a file using the <code>-f</code> option:<br/>
+        <code>bash$ hadoop jar hadoop-distcp.jar -f hdfs://nn1:8020/srclist \</code><br/>
+        <code> hdfs://nn2:8020/bar/foo</code><br/></p>
+
+        <p>Where <code>srclist</code> contains<br/>
+        <code>hdfs://nn1:8020/foo/a</code><br/>
+        <code>hdfs://nn1:8020/foo/b</code></p>
+
+        <p>When copying from multiple sources, DistCp will abort the copy with
+        an error message if two sources collide, but collisions at the
+        destination are resolved per the <a href="#options">options</a>
+        specified. By default, files already existing at the destination are
+        skipped (i.e. not replaced by the source file). A count of skipped
+        files is reported at the end of each job, but it may be inaccurate if a
+        copier failed for some subset of its files, but succeeded on a later
+        attempt.</p>
+
+        <p>It is important that each TaskTracker can reach and communicate with
+        both the source and destination file systems. For HDFS, both the source
+        and destination must be running the same version of the protocol or use
+        a backwards-compatible protocol (see <a href="#cpver">Copying Between
+        Versions</a>).</p>
+
+        <p>After a copy, it is recommended that one generates and cross-checks
+        a listing of the source and destination to verify that the copy was
+        truly successful. Since DistCp employs both Map/Reduce and the
+        FileSystem API, issues in or between any of the three could adversely
+        and silently affect the copy. Some have had success running with
+        <code>-update</code> enabled to perform a second pass, but users should
+        be acquainted with its semantics before attempting this.</p>
+
+        <p>It's also worth noting that if another client is still writing to a
+        source file, the copy will likely fail. Attempting to overwrite a file
+        being written at the destination should also fail on HDFS. If a source
+        file is (re)moved before it is copied, the copy will fail with a
+        FileNotFoundException.</p>
+
+        <p>Please refer to the detailed Command Line Reference for information
+        on all the options available in DistCp.</p>
+        
+    </section>
+    <section name="Update and Overwrite">
+
+        <p><code>-update</code> is used to copy files from source that don't
+        exist at the target, or have different contents. <code>-overwrite</code>
+        overwrites target-files even if they exist at the source, or have the
+        same contents.</p>
+
+        <p><br/>Update and Overwrite options warrant special attention, since their
+        handling of source-paths varies from the defaults in a very subtle manner.
+        Consider a copy from <code>/source/first/</code> and
+        <code>/source/second/</code> to <code>/target/</code>, where the source
+        paths have the following contents:</p>
+
+        <p><code>hdfs://nn1:8020/source/first/1</code><br/>
+           <code>hdfs://nn1:8020/source/first/2</code><br/>
+           <code>hdfs://nn1:8020/source/second/10</code><br/>
+           <code>hdfs://nn1:8020/source/second/20</code><br/></p>
+
+        <p><br/>When DistCp is invoked without <code>-update</code> or
+        <code>-overwrite</code>, the DistCp defaults would create directories
+        <code>first/</code> and <code>second/</code>, under <code>/target</code>.
+        Thus:<br/></p>
+
+        <p><code>distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target</code></p>
+        <p><br/>would yield the following contents in <code>/target</code>: </p>
+
+        <p><code>hdfs://nn2:8020/target/first/1</code><br/>
+           <code>hdfs://nn2:8020/target/first/2</code><br/>
+           <code>hdfs://nn2:8020/target/second/10</code><br/>
+           <code>hdfs://nn2:8020/target/second/20</code><br/></p>
+
+        <p><br/>When either <code>-update</code> or <code>-overwrite</code> is
+            specified, the <strong>contents</strong> of the source-directories
+            are copied to target, and not the source directories themselves. Thus: </p>
+
+        <p><code>distcp -update hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target</code></p>
+
+        <p><br/>would yield the following contents in <code>/target</code>: </p>
+
+        <p><code>hdfs://nn2:8020/target/1</code><br/>
+           <code>hdfs://nn2:8020/target/2</code><br/>
+           <code>hdfs://nn2:8020/target/10</code><br/>
+           <code>hdfs://nn2:8020/target/20</code><br/></p>
+
+        <p><br/>By extension, if both source folders contained a file with the same
+        name (say, <code>0</code>), then both sources would map an entry to
+        <code>/target/0</code> at the destination. Rather than to permit this
+        conflict, DistCp will abort.</p>
+
+        <p><br/>Now, consider the following copy operation:</p>
+
+        <p><code>distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target</code></p>
+
+        <p><br/>With sources/sizes:</p>
+
+        <p><code>hdfs://nn1:8020/source/first/1     32</code><br/>
+           <code>hdfs://nn1:8020/source/first/2     32</code><br/>
+           <code>hdfs://nn1:8020/source/second/10   64</code><br/>
+           <code>hdfs://nn1:8020/source/second/20   32</code><br/></p>
+
+        <p><br/>And destination/sizes:</p>
+
+        <p><code>hdfs://nn2:8020/target/1   32</code><br/>
+           <code>hdfs://nn2:8020/target/10  32</code><br/>
+           <code>hdfs://nn2:8020/target/20  64</code><br/></p>
+
+        <p><br/>Will effect: </p>
+
+        <p><code>hdfs://nn2:8020/target/1   32</code><br/>
+           <code>hdfs://nn2:8020/target/2   32</code><br/>
+           <code>hdfs://nn2:8020/target/10  64</code><br/>
+           <code>hdfs://nn2:8020/target/20  32</code><br/></p>
+
+        <p><br/><code>1</code> is skipped because the file-length and contents match.
+        <code>2</code> is copied because it doesn't exist at the target.
+        <code>10</code> and <code>20</code> are overwritten since the contents
+        don't match the source. </p>
+
+        <p>If <code>-update</code> is used, <code>1</code> is overwritten as well.</p>
+
+    </section>
+  </body>
+
+</document>

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+
+public class StubContext {
+
+  private StubStatusReporter reporter = new StubStatusReporter();
+  private RecordReader<Text, FileStatus> reader;
+  private StubInMemoryWriter writer = new StubInMemoryWriter();
+  private Mapper<Text, FileStatus, Text, Text>.Context mapperContext;
+
+  public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
+                     int taskId) throws IOException, InterruptedException {
+
+    WrappedMapper<Text, FileStatus, Text, Text> wrappedMapper
+            = new WrappedMapper<Text, FileStatus, Text, Text>();
+
+    MapContextImpl<Text, FileStatus, Text, Text> contextImpl
+            = new MapContextImpl<Text, FileStatus, Text, Text>(conf,
+            getTaskAttemptID(taskId), reader, writer,
+            null, reporter, null);
+
+    this.reader = reader;
+    this.mapperContext = wrappedMapper.getMapContext(contextImpl);
+  }
+
+  public Mapper<Text, FileStatus, Text, Text>.Context getContext() {
+    return mapperContext;
+  }
+
+  public StatusReporter getReporter() {
+    return reporter;
+  }
+
+  public RecordReader<Text, FileStatus> getReader() {
+    return reader;
+  }
+
+  public StubInMemoryWriter getWriter() {
+    return writer;
+  }
+
+  public static class StubStatusReporter extends StatusReporter {
+
+    private Counters counters = new Counters();
+
+    public StubStatusReporter() {
+	    /*
+      final CounterGroup counterGroup
+              = new CounterGroup("FileInputFormatCounters",
+                                 "FileInputFormatCounters");
+      counterGroup.addCounter(new Counter("BYTES_READ",
+                                          "BYTES_READ",
+                                          0));
+      counters.addGroup(counterGroup);
+      */
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return counters.findCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return counters.findCounter(group, name);
+    }
+
+    @Override
+    public void progress() {}
+
+    @Override
+    public float getProgress() {
+      return 0F;
+    }
+
+    @Override
+    public void setStatus(String status) {}
+  }
+
+
+  public static class StubInMemoryWriter extends RecordWriter<Text, Text> {
+
+    List<Text> keys = new ArrayList<Text>();
+
+    List<Text> values = new ArrayList<Text>();
+
+    @Override
+    public void write(Text key, Text value) throws IOException, InterruptedException {
+      keys.add(key);
+      values.add(value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+    }
+
+    public List<Text> keys() {
+      return keys;
+    }
+
+    public List<Text> values() {
+      return values;
+    }
+
+  }
+
+  public static TaskAttemptID getTaskAttemptID(int taskId) {
+    return new TaskAttemptID("", 0, TaskType.MAP, taskId, 0);
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.util.TestDistCpUtils;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.ArrayList;
+
+public class TestCopyListing extends SimpleCopyListing {
+  private static final Log LOG = LogFactory.getLog(TestCopyListing.class);
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+  private static final Configuration config = new Configuration();
+  private static MiniDFSCluster cluster;
+
+  @BeforeClass
+  public static void create() throws IOException {
+    cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+                                                .build();
+  }
+
+  @AfterClass
+  public static void destroy() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+  
+  public TestCopyListing() {
+    super(config, CREDENTIALS);
+  }
+
+  protected TestCopyListing(Configuration configuration) {
+    super(configuration, CREDENTIALS);
+  }
+
+  @Override
+  protected long getBytesToCopy() {
+    return 0;
+  }
+
+  @Override
+  protected long getNumberOfPaths() {
+    return 0;
+  }
+
+  @Test
+  public void testMultipleSrcToFile() {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(getConf());
+      List<Path> srcPaths = new ArrayList<Path>();
+      srcPaths.add(new Path("/tmp/in/1"));
+      srcPaths.add(new Path("/tmp/in/2"));
+      Path target = new Path("/tmp/out/1");
+      TestDistCpUtils.createFile(fs, "/tmp/in/1");
+      TestDistCpUtils.createFile(fs, "/tmp/in/2");
+      fs.mkdirs(target);
+      DistCpOptions options = new DistCpOptions(srcPaths, target);
+      validatePaths(options);
+      TestDistCpUtils.delete(fs, "/tmp");
+      //No errors
+
+      target = new Path("/tmp/out/1");
+      fs.create(target).close();
+      options = new DistCpOptions(srcPaths, target);
+      try {
+        validatePaths(options);
+        Assert.fail("Invalid inputs accepted");
+      } catch (InvalidInputException ignore) { }
+      TestDistCpUtils.delete(fs, "/tmp");
+
+      srcPaths.clear();
+      srcPaths.add(new Path("/tmp/in/1"));
+      fs.mkdirs(new Path("/tmp/in/1"));
+      target = new Path("/tmp/out/1");
+      fs.create(target).close();
+      options = new DistCpOptions(srcPaths, target);
+      try {
+        validatePaths(options);
+        Assert.fail("Invalid inputs accepted");
+      } catch (InvalidInputException ignore) { }
+      TestDistCpUtils.delete(fs, "/tmp");
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test input validation failed");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testDuplicates() {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(getConf());
+      List<Path> srcPaths = new ArrayList<Path>();
+      srcPaths.add(new Path("/tmp/in/*/*"));
+      TestDistCpUtils.createFile(fs, "/tmp/in/1.txt");
+      TestDistCpUtils.createFile(fs, "/tmp/in/src/1.txt");
+      Path target = new Path("/tmp/out");
+      Path listingFile = new Path("/tmp/list");
+      DistCpOptions options = new DistCpOptions(srcPaths, target);
+      CopyListing listing = CopyListing.getCopyListing(getConf(), CREDENTIALS, options);
+      try {
+        listing.buildListing(listingFile, options);
+        Assert.fail("Duplicates not detected");
+      } catch (DuplicateFileException ignore) {
+      }
+    } catch (IOException e) {
+      LOG.error("Exception encountered in test", e);
+      Assert.fail("Test failed " + e.getMessage());
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testBuildListing() {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(getConf());
+      List<Path> srcPaths = new ArrayList<Path>();
+      Path p1 = new Path("/tmp/in/1");
+      Path p2 = new Path("/tmp/in/2");
+      Path p3 = new Path("/tmp/in2/2");
+      Path target = new Path("/tmp/out/1");
+      srcPaths.add(p1.getParent());
+      srcPaths.add(p3.getParent());
+      TestDistCpUtils.createFile(fs, "/tmp/in/1");
+      TestDistCpUtils.createFile(fs, "/tmp/in/2");
+      TestDistCpUtils.createFile(fs, "/tmp/in2/2");
+      fs.mkdirs(target);
+      OutputStream out = fs.create(p1);
+      out.write("ABC".getBytes());
+      out.close();
+
+      out = fs.create(p2);
+      out.write("DEF".getBytes());
+      out.close();
+
+      out = fs.create(p3);
+      out.write("GHIJ".getBytes());
+      out.close();
+
+      Path listingFile = new Path("/tmp/file");
+
+      DistCpOptions options = new DistCpOptions(srcPaths, target);
+      options.setSyncFolder(true);
+      CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+      try {
+        listing.buildListing(listingFile, options);
+        Assert.fail("Duplicates not detected");
+      } catch (DuplicateFileException ignore) {
+      }
+      Assert.assertEquals(listing.getBytesToCopy(), 10);
+      Assert.assertEquals(listing.getNumberOfPaths(), 3);
+      TestDistCpUtils.delete(fs, "/tmp");
+
+      try {
+        listing.buildListing(listingFile, options);
+        Assert.fail("Invalid input not detected");
+      } catch (InvalidInputException ignore) {
+      }
+      TestDistCpUtils.delete(fs, "/tmp");
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Test build listing failed");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  @Test
+  public void testBuildListingForSingleFile() {
+    FileSystem fs = null;
+    String testRootString = "/singleFileListing";
+    Path testRoot = new Path(testRootString);
+    SequenceFile.Reader reader = null;
+    try {
+      fs = FileSystem.get(getConf());
+      if (fs.exists(testRoot))
+        TestDistCpUtils.delete(fs, testRootString);
+
+      Path sourceFile = new Path(testRoot, "/source/foo/bar/source.txt");
+      Path decoyFile  = new Path(testRoot, "/target/moo/source.txt");
+      Path targetFile = new Path(testRoot, "/target/moo/target.txt");
+
+      TestDistCpUtils.createFile(fs, sourceFile.toString());
+      TestDistCpUtils.createFile(fs, decoyFile.toString());
+      TestDistCpUtils.createFile(fs, targetFile.toString());
+
+      List<Path> srcPaths = new ArrayList<Path>();
+      srcPaths.add(sourceFile);
+
+      DistCpOptions options = new DistCpOptions(srcPaths, targetFile);
+      CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+
+      final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
+      listing.buildListing(listFile, options);
+
+      reader = new SequenceFile.Reader(fs, listFile, getConf());
+      FileStatus fileStatus = new FileStatus();
+      Text relativePath = new Text();
+      Assert.assertTrue(reader.next(relativePath, fileStatus));
+      Assert.assertTrue(relativePath.toString().equals(""));
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception encountered.");
+      LOG.error("Unexpected exception: ", e);
+    }
+    finally {
+      TestDistCpUtils.delete(fs, testRootString);
+      IOUtils.closeStream(reader);
+    }
+  }
+}