You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:36:54 UTC

svn commit: r1236045 [1/5] - in /hadoop/common/trunk: hadoop-project/ hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/ hadoop-tools/hadoop-distcp/src/main/ hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/hadoop-distcp/sr...

Author: mahadev
Date: Thu Jan 26 06:36:52 2012
New Revision: 1236045

URL: http://svn.apache.org/viewvc?rev=1236045&view=rev
Log:
MAPREDUCE-2765. DistCp Rewrite. (Mithun Radhakrishnan via mahadev)

Added:
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/README
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/
    hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml
Modified:
    hadoop/common/trunk/hadoop-project/pom.xml
    hadoop/common/trunk/hadoop-tools/pom.xml

Modified: hadoop/common/trunk/hadoop-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-project/pom.xml?rev=1236045&r1=1236044&r2=1236045&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-project/pom.xml (original)
+++ hadoop/common/trunk/hadoop-project/pom.xml Thu Jan 26 06:36:52 2012
@@ -710,10 +710,20 @@
           <version>2.4</version>
         </plugin>
         <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-resources-plugin</artifactId>
+          <version>2.2</version>
+        </plugin>
+        <plugin>
           <groupId>org.codehaus.mojo</groupId>
           <artifactId>exec-maven-plugin</artifactId>
           <version>1.2</version>
         </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-pdf-plugin</artifactId>
+          <version>1.1</version>
+        </plugin>
       </plugins>
     </pluginManagement>
 
@@ -773,6 +783,14 @@
           </excludes>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-pdf-plugin</artifactId>
+        <configuration>
+          <outputDirectory>${project.reporting.outputDirectory}</outputDirectory>
+          <includeReports>false</includeReports>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/README
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/README?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/README (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/README Thu Jan 26 06:36:52 2012
@@ -0,0 +1,7 @@
+DistCp (distributed copy) is a tool used for large inter/intra-cluster copying. 
+It uses Map/Reduce to effect its distribution, error handling and recovery, 
+and reporting. It expands a list of files and directories into input to map tasks, 
+each of which will copy a partition of the files specified in the source list.
+
+Version 0.1 (2010/08/02 sriksun)
+ - Initial Version

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,185 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project</artifactId>
+    <version>0.23.1-SNAPSHOT</version>
+    <relativePath>../../hadoop-project</relativePath>
+  </parent>
+  <groupId>org.apache.hadoop.tools</groupId>
+  <artifactId>hadoop-distcp</artifactId>
+  <version>0.23.1-SNAPSHOT</version>
+  <description>Apache Hadoop Distributed Copy</description>
+  <name>Apache Hadoop Distributed Copy</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <file.encoding>UTF-8</file.encoding>
+    <downloadSources>true</downloadSources>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-app</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-hs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+    <testResources>
+      <testResource>
+        <directory>src/test/resources</directory>
+        <filtering>true</filtering>
+      </testResource>
+    </testResources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>always</forkMode>
+          <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
+          <argLine>-Xmx1024m</argLine>
+          <includes>
+            <include>**/Test*.java</include>
+          </includes>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+          <systemProperties>
+            <property>
+              <name>test.build.data</name>
+              <value>${basedir}/target/test/data</value>
+            </property>
+            <property>
+              <name>hadoop.log.dir</name>
+              <value>target/test/logs</value>
+            </property>
+            <property>
+              <name>org.apache.commons.logging.Log</name>
+              <value>org.apache.commons.logging.impl.SimpleLog</value>
+            </property>
+            <property>
+              <name>org.apache.commons.logging.simplelog.defaultlog</name>
+              <value>warn</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <enableRulesSummary>true</enableRulesSummary>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifest>
+              <mainClass>org.apache.hadoop.tools.DistCp</mainClass>
+            </manifest>
+          </archive>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <configuration>
+          <attach>true</attach>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-pdf-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>pdf</id>
+            <phase>package</phase>
+            <goals>
+              <goal>pdf</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+
+/**
+ * The CopyListing abstraction is responsible for how the list of
+ * sources and targets is constructed, for DistCp's copy function.
+ * The copy-listing should be a SequenceFile<Text, FileStatus>,
+ * located at the path specified to buildListing(),
+ * each entry being a pair of (Source relative path, source file status),
+ * all the paths being fully qualified.
+ */
+public abstract class CopyListing extends Configured {
+
+  private Credentials credentials;
+
+  /**
+   * Build listing function creates the input listing that distcp uses to
+   * perform the copy.
+   *
+   * The build listing is a sequence file that has relative path of a file in the key
+   * and the file status information of the source file in the value
+   *
+   * For instance if the source path is /tmp/data and the traversed path is
+   * /tmp/data/dir1/dir2/file1, then the sequence file would contain
+   *
+   * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+   *
+   * File would also contain directory entries. Meaning, if /tmp/data/dir1/dir2/file1
+   * is the only file under /tmp/data, the resulting sequence file would contain the
+   * following entries
+   *
+   * key: /dir1 and value: FileStatus(/tmp/data/dir1)
+   * key: /dir1/dir2 and value: FileStatus(/tmp/data/dir1/dir2)
+   * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+   *
+   * Cases requiring special handling:
+   * If source path is a file (/tmp/file1), contents of the file will be as follows
+   *
+   * TARGET DOES NOT EXIST: Key-"", Value-FileStatus(/tmp/file1)
+   * TARGET IS FILE       : Key-"", Value-FileStatus(/tmp/file1)
+   * TARGET IS DIR        : Key-"/file1", Value-FileStatus(/tmp/file1)  
+   *
+   * @param pathToListFile - Output file where the listing would be stored
+   * @param options - Input options to distcp
+   * @throws IOException - Exception if any
+   */
+  public final void buildListing(Path pathToListFile,
+                                 DistCpOptions options) throws IOException {
+    validatePaths(options);
+    doBuildListing(pathToListFile, options);
+    Configuration config = getConf();
+
+    config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, pathToListFile.toString());
+    config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy());
+    config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
+
+    checkForDuplicates(pathToListFile);
+  }
+
+  /**
+   * Validate input and output paths
+   *
+   * @param options - Input options
+   * @throws InvalidInputException: If inputs are invalid
+   * @throws IOException: any Exception with FS 
+   */
+  protected abstract void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException;
+
+  /**
+   * The interface to be implemented by sub-classes, to create the source/target file listing.
+   * @param pathToListFile Path on HDFS where the listing file is written.
+   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @throws IOException: Thrown on failure to create the listing file.
+   */
+  protected abstract void doBuildListing(Path pathToListFile,
+                                         DistCpOptions options) throws IOException;
+
+  /**
+   * Return the total bytes that distCp should copy for the source paths
+   * This doesn't consider whether file is same should be skipped during copy
+   *
+   * @return total bytes to copy
+   */
+  protected abstract long getBytesToCopy();
+
+  /**
+   * Return the total number of paths to distcp, includes directories as well
+   * This doesn't consider whether file/dir is already present and should be skipped during copy
+   *
+   * @return Total number of paths to distcp
+   */
+  protected abstract long getNumberOfPaths();
+
+  /**
+   * Validate the final resulting path listing to see if there are any duplicate entries
+   *
+   * @param pathToListFile - path listing build by doBuildListing
+   * @throws IOException - Any issues while checking for duplicates and throws
+   * @throws DuplicateFileException - if there are duplicates
+   */
+  private void checkForDuplicates(Path pathToListFile)
+      throws DuplicateFileException, IOException {
+
+    Configuration config = getConf();
+    FileSystem fs = pathToListFile.getFileSystem(config);
+
+    Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile);
+
+    SequenceFile.Reader reader = new SequenceFile.Reader(
+                          config, SequenceFile.Reader.file(sortedList));
+    try {
+      Text lastKey = new Text("*"); //source relative path can never hold *
+      FileStatus lastFileStatus = new FileStatus();
+
+      Text currentKey = new Text();
+      while (reader.next(currentKey)) {
+        if (currentKey.equals(lastKey)) {
+          FileStatus currentFileStatus = new FileStatus();
+          reader.getCurrentValue(currentFileStatus);
+          throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
+              currentFileStatus.getPath() + " would cause duplicates. Aborting");
+        }
+        reader.getCurrentValue(lastFileStatus);
+        lastKey.set(currentKey);
+      }
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+  }
+
+  /**
+   * Protected constructor, to initialize configuration.
+   * @param configuration The input configuration,
+   *                        with which the source/target FileSystems may be accessed.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached.If null
+   * delegation token caching is skipped
+   */
+  protected CopyListing(Configuration configuration, Credentials credentials) {
+    setConf(configuration);
+    setCredentials(credentials);
+  }
+
+  /**
+   * set Credentials store, on which FS delegatin token will be cached
+   * @param credentials - Credentials object
+   */
+  protected void setCredentials(Credentials credentials) {
+    this.credentials = credentials;
+  }
+
+  /**
+   * get credentials to update the delegation tokens for accessed FS objects
+   * @return Credentials object
+   */
+  protected Credentials getCredentials() {
+    return credentials;
+  }
+
+  /**
+   * Public Factory method with which the appropriate CopyListing implementation may be retrieved.
+   * @param configuration The input configuration.
+   * @param credentials Credentials object on which the FS delegation tokens are cached
+   * @param options The input Options, to help choose the appropriate CopyListing Implementation.
+   * @return An instance of the appropriate CopyListing implementation.
+   */
+  public static CopyListing getCopyListing(Configuration configuration,
+                                           Credentials credentials,
+                                           DistCpOptions options) {
+    if (options.getSourceFileListing() == null) {
+      return new GlobbedCopyListing(configuration, credentials);
+    } else {
+      return new FileBasedCopyListing(configuration, credentials);
+    }
+  }
+
+  static class DuplicateFileException extends RuntimeException {
+    public DuplicateFileException(String message) {
+      super(message);
+    }
+  }
+
+  static class InvalidInputException extends RuntimeException {
+    public InvalidInputException(String message) {
+      super(message);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.tools.CopyListing.*;
+import org.apache.hadoop.tools.mapred.CopyMapper;
+import org.apache.hadoop.tools.mapred.CopyOutputFormat;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * DistCp is the main driver-class for DistCpV2.
+ * For command-line use, DistCp::main() orchestrates the parsing of command-line
+ * parameters and the launch of the DistCp job.
+ * For programmatic use, a DistCp object can be constructed by specifying
+ * options (in a DistCpOptions object), and DistCp::execute() may be used to
+ * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
+ * behaviour.
+ */
+public class DistCp extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(DistCp.class);
+
+  private DistCpOptions inputOptions;
+  private Path metaFolder;
+
+  private static final String PREFIX = "_distcp";
+  private static final String WIP_PREFIX = "._WIP_";
+  private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
+  public static final Random rand = new Random();
+
+  private boolean submitted;
+  private FileSystem jobFS;
+
+  /**
+   * Public Constructor. Creates DistCp object with specified input-parameters.
+   * (E.g. source-paths, target-location, etc.)
+   * @param inputOptions Options (indicating source-paths, target-location.)
+   * @param configuration The Hadoop configuration against which the Copy-mapper must run.
+   * @throws Exception, on failure.
+   */
+  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
+    Configuration config = new Configuration(configuration);
+    config.addResource(DISTCP_DEFAULT_XML);
+    setConf(config);
+    this.inputOptions = inputOptions;
+    this.metaFolder   = createMetaFolderPath();
+  }
+
+  /**
+   * To be used with the ToolRunner. Not for public consumption.
+   */
+  private DistCp() {}
+
+  /**
+   * Implementation of Tool::run(). Orchestrates the copy of source file(s)
+   * to target location, by:
+   *  1. Creating a list of files to be copied to target.
+   *  2. Launching a Map-only job to copy the files. (Delegates to execute().)
+   * @param argv List of arguments passed to DistCp, from the ToolRunner.
+   * @return On success, it returns 0. Else, -1.
+   */
+  public int run(String[] argv) {
+    try {
+      inputOptions = (OptionsParser.parse(argv));
+
+      LOG.info("Input Options: " + inputOptions);
+    } catch (Throwable e) {
+      LOG.error("Invalid arguments: ", e);
+      System.err.println("Invalid arguments: " + e.getMessage());
+      OptionsParser.usage();      
+      return DistCpConstants.INVALID_ARGUMENT;
+    }
+    
+    try {
+      execute();
+    } catch (InvalidInputException e) {
+      LOG.error("Invalid input: ", e);
+      return DistCpConstants.INVALID_ARGUMENT;
+    } catch (DuplicateFileException e) {
+      LOG.error("Duplicate files in input path: ", e);
+      return DistCpConstants.DUPLICATE_INPUT;
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      return DistCpConstants.UNKNOWN_ERROR;
+    }
+    return DistCpConstants.SUCCESS;
+  }
+
+  /**
+   * Implements the core-execution. Creates the file-list for copy,
+   * and launches the Hadoop-job, to do the copy.
+   * @return Job handle
+   * @throws Exception, on failure.
+   */
+  public Job execute() throws Exception {
+    assert inputOptions != null;
+    assert getConf() != null;
+
+    Job job = null;
+    try {
+      metaFolder = createMetaFolderPath();
+      jobFS = metaFolder.getFileSystem(getConf());
+
+      job = createJob();
+      createInputFileListing(job);
+
+      job.submit();
+      submitted = true;
+    } finally {
+      if (!submitted) {
+        cleanup();
+      }
+    }
+
+    String jobID = job.getJobID().toString();
+    job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+    
+    LOG.info("DistCp job-id: " + jobID);
+    if (inputOptions.shouldBlock()) {
+      job.waitForCompletion(true);
+    }
+    return job;
+  }
+
+  /**
+   * Create Job object for submitting it, with all the configuration
+   *
+   * @return Reference to job object.
+   * @throws IOException - Exception if any
+   */
+  private Job createJob() throws IOException {
+    String jobName = "distcp";
+    String userChosenName = getConf().get(JobContext.JOB_NAME);
+    if (userChosenName != null)
+      jobName += ": " + userChosenName;
+    Job job = Job.getInstance(getConf());
+    job.setJobName(jobName);
+    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
+    job.setJarByClass(CopyMapper.class);
+    configureOutputFormat(job);
+
+    job.setMapperClass(CopyMapper.class);
+    job.setNumReduceTasks(0);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputFormatClass(CopyOutputFormat.class);
+    job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
+    job.getConfiguration().set(JobContext.NUM_MAPS,
+                  String.valueOf(inputOptions.getMaxMaps()));
+
+    if (inputOptions.getSslConfigurationFile() != null) {
+      setupSSLConfig(job);
+    }
+
+    inputOptions.appendToConf(job.getConfiguration());
+    return job;
+  }
+
+  /**
+   * Setup ssl configuration on the job configuration to enable hsftp access
+   * from map job. Also copy the ssl configuration file to Distributed cache
+   *
+   * @param job - Reference to job's handle
+   * @throws java.io.IOException - Exception if unable to locate ssl config file
+   */
+  private void setupSSLConfig(Job job) throws IOException  {
+    Configuration configuration = job.getConfiguration();
+    Path sslConfigPath = new Path(configuration.
+        getResource(inputOptions.getSslConfigurationFile()).toString());
+
+    addSSLFilesToDistCache(job, sslConfigPath);
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName());
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName());
+  }
+
+  /**
+   * Add SSL files to distributed cache. Trust store, key store and ssl config xml
+   *
+   * @param job - Job handle
+   * @param sslConfigPath - ssl Configuration file specified through options
+   * @throws IOException - If any
+   */
+  private void addSSLFilesToDistCache(Job job,
+                                      Path sslConfigPath) throws IOException {
+    Configuration configuration = job.getConfiguration();
+    FileSystem localFS = FileSystem.getLocal(configuration);
+
+    Configuration sslConf = new Configuration(false);
+    sslConf.addResource(sslConfigPath);
+
+    Path localStorePath = getLocalStorePath(sslConf,
+                            DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
+    job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
+                                      localFS.getWorkingDirectory()).toUri());
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
+                      localStorePath.getName());
+
+    localStorePath = getLocalStorePath(sslConf,
+                             DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
+    job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
+                                      localFS.getWorkingDirectory()).toUri());
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
+                                      localStorePath.getName());
+
+    job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(),
+                                      localFS.getWorkingDirectory()).toUri());
+
+  }
+
+  /**
+   * Get Local Trust store/key store path
+   *
+   * @param sslConf - Config from SSL Client xml
+   * @param storeKey - Key for either trust store or key store
+   * @return - Path where the store is present
+   * @throws IOException -If any
+   */
+  private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException {
+    if (sslConf.get(storeKey) != null) {
+      return new Path(sslConf.get(storeKey));
+    } else {
+      throw new IOException("Store for " + storeKey + " is not set in " +
+          inputOptions.getSslConfigurationFile());
+    }
+  }
+
+  /**
+   * Setup output format appropriately
+   *
+   * @param job - Job handle
+   * @throws IOException - Exception if any
+   */
+  private void configureOutputFormat(Job job) throws IOException {
+    final Configuration configuration = job.getConfiguration();
+    Path targetPath = inputOptions.getTargetPath();
+    FileSystem targetFS = targetPath.getFileSystem(configuration);
+    targetPath = targetPath.makeQualified(targetFS.getUri(),
+                                          targetFS.getWorkingDirectory());
+
+    if (inputOptions.shouldAtomicCommit()) {
+      Path workDir = inputOptions.getAtomicWorkPath();
+      if (workDir == null) {
+        workDir = targetPath.getParent();
+      }
+      workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
+                                + rand.nextInt());
+      FileSystem workFS = workDir.getFileSystem(configuration);
+      if (!DistCpUtils.compareFs(targetFS, workFS)) {
+        throw new IllegalArgumentException("Work path " + workDir +
+            " and target path " + targetPath + " are in different file system");
+      }
+      CopyOutputFormat.setWorkingDirectory(job, workDir);
+    } else {
+      CopyOutputFormat.setWorkingDirectory(job, targetPath);
+    }
+    CopyOutputFormat.setCommitDirectory(job, targetPath);
+
+    Path logPath = inputOptions.getLogPath();
+    if (logPath == null) {
+      logPath = new Path(metaFolder, "_logs");
+    } else {
+      LOG.info("DistCp job log path: " + logPath);
+    }
+    CopyOutputFormat.setOutputPath(job, logPath);
+  }
+
+  /**
+   * Create input listing by invoking an appropriate copy listing
+   * implementation. Also add delegation tokens for each path
+   * to job's credential store
+   *
+   * @param job - Handle to job
+   * @return Returns the path where the copy listing is created
+   * @throws IOException - If any
+   */
+  private Path createInputFileListing(Job job) throws IOException {
+    Path fileListingPath = getFileListingPath();
+    CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
+        job.getCredentials(), inputOptions);
+    copyListing.buildListing(fileListingPath, inputOptions);
+    return fileListingPath;
+  }
+
+  /**
+   * Get default name of the copy listing file. Use the meta folder
+   * to create the copy listing file
+   *
+   * @return - Path where the copy listing file has to be saved
+   * @throws IOException - Exception if any
+   */
+  private Path getFileListingPath() throws IOException {
+    String fileListPathStr = metaFolder + "/fileList.seq";
+    Path path = new Path(fileListPathStr);
+    return new Path(path.toUri().normalize().toString());
+  }
+
+  /**
+   * Create a default working folder for the job, under the
+   * job staging directory
+   *
+   * @return Returns the working folder information
+   * @throws Exception - EXception if any
+   */
+  private Path createMetaFolderPath() throws Exception {
+    Configuration configuration = getConf();
+    Path stagingDir = JobSubmissionFiles.getStagingDir(
+            new Cluster(configuration), configuration);
+    Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
+    if (LOG.isDebugEnabled())
+      LOG.debug("Meta folder location: " + metaFolderPath);
+    configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
+    return metaFolderPath;
+  }
+
+  /**
+   * Main function of the DistCp program. Parses the input arguments (via OptionsParser),
+   * and invokes the DistCp::run() method, via the ToolRunner.
+   * @param argv Command-line arguments sent to DistCp.
+   */
+  public static void main(String argv[]) {
+    try {
+      DistCp distCp = new DistCp();
+      Cleanup CLEANUP = new Cleanup(distCp);
+
+      Runtime.getRuntime().addShutdownHook(CLEANUP);
+      System.exit(ToolRunner.run(getDefaultConf(), distCp, argv));
+    }
+    catch (Exception e) {
+      LOG.error("Couldn't complete DistCp operation: ", e);
+      System.exit(DistCpConstants.UNKNOWN_ERROR);
+    }
+  }
+
+  /**
+   * Loads properties from distcp-default.xml into configuration
+   * object
+   * @return Configuration which includes properties from distcp-default.xml
+   */
+  private static Configuration getDefaultConf() {
+    Configuration config = new Configuration();
+    config.addResource(DISTCP_DEFAULT_XML);
+    return config;
+  }
+
+  private synchronized void cleanup() {
+    try {
+      if (metaFolder == null) return;
+
+      jobFS.delete(metaFolder, true);
+      metaFolder = null;
+    } catch (IOException e) {
+      LOG.error("Unable to cleanup meta folder: " + metaFolder, e);
+    }
+  }
+
+  private boolean isSubmitted() {
+    return submitted;
+  }
+
+  private static class Cleanup extends Thread {
+    private final DistCp distCp;
+
+    public Cleanup(DistCp distCp) {
+      this.distCp = distCp;
+    }
+
+    @Override
+    public void run() {
+      if (distCp.isSubmitted()) return;
+
+      distCp.cleanup();
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,104 @@
+package org.apache.hadoop.tools;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility class to hold commonly used constants.
+ */
+public class DistCpConstants {
+
+  /* Default number of maps to use for DistCp */
+  public static final int DEFAULT_MAPS = 20;
+
+  /* Default bandwidth if none specified */
+  public static final int DEFAULT_BANDWIDTH_MB = 100;
+
+  /* Default strategy for copying. Implementation looked up
+     from distcp-default.xml
+   */
+  public static final String UNIFORMSIZE = "uniformsize";
+
+  /**
+   *  Constants mapping to command line switches/input options
+   */
+  public static final String CONF_LABEL_ATOMIC_COPY = "distcp.atomic.copy";
+  public static final String CONF_LABEL_WORK_PATH = "distcp.work.path";
+  public static final String CONF_LABEL_LOG_PATH = "distcp.log.path";
+  public static final String CONF_LABEL_IGNORE_FAILURES = "distcp.ignore.failures";
+  public static final String CONF_LABEL_PRESERVE_STATUS = "distcp.preserve.status";
+  public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
+  public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
+  public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
+  public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
+  public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
+  public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
+  public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
+  public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
+  public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
+
+  /* Total bytes to be copied. Updated by copylisting. Unfiltered count */
+  public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";
+
+  /* Total number of paths to copy, includes directories. Unfiltered count */
+  public static final String CONF_LABEL_TOTAL_NUMBER_OF_RECORDS = "mapred.number.of.records";
+
+  /* SSL keystore resource */
+  public static final String CONF_LABEL_SSL_KEYSTORE = "dfs.https.client.keystore.resource";
+
+  /* If input is based -f <<source listing>>, file containing the src paths */
+  public static final String CONF_LABEL_LISTING_FILE_PATH = "distcp.listing.file.path";
+
+  /* Directory where the mapreduce job will write to. If not atomic commit, then same
+    as CONF_LABEL_TARGET_FINAL_PATH
+   */
+  public static final String CONF_LABEL_TARGET_WORK_PATH = "distcp.target.work.path";
+
+  /* Directory where the final data will be committed to. If not atomic commit, then same
+    as CONF_LABEL_TARGET_WORK_PATH
+   */
+  public static final String CONF_LABEL_TARGET_FINAL_PATH = "distcp.target.final.path";
+
+  /**
+   * DistCp job id for consumers of the Disctp 
+   */
+  public static final String CONF_LABEL_DISTCP_JOB_ID = "distcp.job.id";
+
+  /* Meta folder where the job's intermediate data is kept */
+  public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder";
+
+  /**
+   * Conf label for SSL Trust-store location.
+   */
+  public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION
+      = "ssl.client.truststore.location";
+
+  /**
+   * Conf label for SSL Key-store location.
+   */
+  public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION
+      = "ssl.client.keystore.location";
+
+  /**
+   * Constants for DistCp return code to shell / consumer of ToolRunner's run
+   */
+  public static final int SUCCESS = 0;
+  public static final int INVALID_ARGUMENT = -1;
+  public static final int DUPLICATE_INPUT = -2;
+  public static final int UNKNOWN_ERROR = -999;
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.cli.Option;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Enumeration mapping configuration keys to distcp command line
+ * options.
+ */
+public enum DistCpOptionSwitch {
+
+  /**
+   * Ignores any failures during copy, and continues with rest.
+   * Logs failures in a file
+   */
+  IGNORE_FAILURES(DistCpConstants.CONF_LABEL_IGNORE_FAILURES,
+      new Option("i", false, "Ignore failures during copy")),
+
+  /**
+   * Preserves status of file/path in the target.
+   * Default behavior with -p, is to preserve replication,
+   * block size, user, group and permission on the target file
+   *
+   * If any of the optional switches are present among rbugp, then
+   * only the corresponding file attribute is preserved
+   *
+   */
+  PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+      new Option("p", true, "preserve status (rbugp)" +
+          "(replication, block-size, user, group, permission)")),
+
+  /**
+   * Update target location by copying only files that are missing
+   * in the target. This can be used to periodically sync two folders
+   * across source and target. Typically used with DELETE_MISSING
+   * Incompatible with ATOMIC_COMMIT
+   */
+  SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, 
+      new Option("update", false, "Update target, copying only missing" +
+          "files or directories")),
+
+  /**
+   * Deletes missing files in target that are missing from source
+   * This allows the target to be in sync with the source contents
+   * Typically used in conjunction with SYNC_FOLDERS
+   * Incompatible with ATOMIC_COMMIT
+   */
+  DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
+      new Option("delete", false, "Delete from target, " +
+          "files missing in source")),
+
+  /**
+   * Configuration file to use with hftps:// for securely copying
+   * files across clusters. Typically the configuration file contains
+   * truststore/keystore information such as location, password and type
+   */
+  SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF,
+      new Option("mapredSslConf", true, "Configuration for ssl config file" +
+          ", to use with hftps://")),
+
+  /**
+   * Max number of maps to use during copy. DistCp will split work
+   * as equally as possible among these maps
+   */
+  MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS, 
+      new Option("m", true, "Max number of concurrent maps to use for copy")),
+
+  /**
+   * Source file listing can be provided to DistCp in a file.
+   * This allows DistCp to copy random list of files from source
+   * and copy them to target
+   */
+  SOURCE_FILE_LISTING(DistCpConstants.CONF_LABEL_SOURCE_LISTING,
+      new Option("f", true, "List of files that need to be copied")),
+
+  /**
+   * Copy all the source files and commit them atomically to the target
+   * This is typically useful in cases where there is a process
+   * polling for availability of a file/dir. This option is incompatible
+   * with SYNC_FOLDERS & DELETE_MISSING
+   */
+  ATOMIC_COMMIT(DistCpConstants.CONF_LABEL_ATOMIC_COPY,
+      new Option("atomic", false, "Commit all changes or none")),
+
+  /**
+   * Work path to be used only in conjunction in Atomic commit
+   */
+  WORK_PATH(DistCpConstants.CONF_LABEL_WORK_PATH,
+      new Option("tmp", true, "Intermediate work path to be used for atomic commit")),
+
+  /**
+   * Log path where distcp output logs are written to
+   */
+  LOG_PATH(DistCpConstants.CONF_LABEL_LOG_PATH,
+      new Option("log", true, "Folder on DFS where distcp execution logs are saved")),
+
+  /**
+   * Copy strategy is use. This could be dynamic or uniform size etc.
+   * DistCp would use an appropriate input format based on this.
+   */
+  COPY_STRATEGY(DistCpConstants.CONF_LABEL_COPY_STRATEGY,
+      new Option("strategy", true, "Copy strategy to use. Default is " +
+          "dividing work based on file sizes")),
+
+  /**
+   * Skip CRC checks between source and target, when determining what
+   * files need to be copied.
+   */
+  SKIP_CRC(DistCpConstants.CONF_LABEL_SKIP_CRC,
+      new Option("skipcrccheck", false, "Whether to skip CRC checks between " +
+          "source and target paths.")),
+
+  /**
+   * Overwrite target-files unconditionally.
+   */
+  OVERWRITE(DistCpConstants.CONF_LABEL_OVERWRITE,
+      new Option("overwrite", false, "Choose to overwrite target files " +
+          "unconditionally, even if they exist.")),
+
+  /**
+   * Should DisctpExecution be blocking
+   */
+  BLOCKING("",
+      new Option("async", false, "Should distcp execution be blocking")),
+
+  FILE_LIMIT("",
+      new Option("filelimit", true, "(Deprecated!) Limit number of files " +
+              "copied to <= n")),
+
+  SIZE_LIMIT("",
+      new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
+              "copied to <= n bytes")),
+
+  /**
+   * Specify bandwidth per map in MB
+   */
+  BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+      new Option("bandwidth", true, "Specify bandwidth per map in MB"));
+
+  private final String confLabel;
+  private final Option option;
+
+  DistCpOptionSwitch(String confLabel, Option option) {
+    this.confLabel = confLabel;
+    this.option = option;
+  }
+
+  /**
+   * Get Configuration label for the option
+   * @return configuration label name
+   */
+  public String getConfigLabel() {
+    return confLabel;
+  }
+
+  /**
+   * Get CLI Option corresponding to the distcp option
+   * @return option
+   */
+  public Option getOption() {
+    return option;
+  }
+
+  /**
+   * Get Switch symbol
+   * @return switch symbol char
+   */
+  public String getSwitch() {
+    return option.getOpt();
+  }
+
+  @Override
+  public String toString() {
+    return  super.name() + " {" +
+        "confLabel='" + confLabel + '\'' +
+        ", option=" + option + '}';
+  }
+
+  /**
+   * Helper function to add an option to hadoop configuration object
+   * @param conf - Configuration object to include the option
+   * @param option - Option to add
+   * @param value - Value
+   */
+  public static void addToConf(Configuration conf,
+                               DistCpOptionSwitch option,
+                               String value) {
+    conf.set(option.getConfigLabel(), value);
+  }
+
+  /**
+   * Helper function to set an option to hadoop configuration object
+   * @param conf - Configuration object to include the option
+   * @param option - Option to add
+   */
+  public static void addToConf(Configuration conf,
+                               DistCpOptionSwitch option) {
+    conf.set(option.getConfigLabel(), "true");
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.util.DistCpUtils;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * The Options class encapsulates all DistCp options.
+ * These may be set from command-line (via the OptionsParser)
+ * or may be set manually.
+ */
+public class DistCpOptions {
+
+  private boolean atomicCommit = false;
+  private boolean syncFolder = false;
+  private boolean deleteMissing = false;
+  private boolean ignoreFailures = false;
+  private boolean overwrite = false;
+  private boolean skipCRC = false;
+  private boolean blocking = true;
+
+  private int maxMaps = DistCpConstants.DEFAULT_MAPS;
+  private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
+
+  private String sslConfigurationFile;
+
+  private String copyStrategy = DistCpConstants.UNIFORMSIZE;
+
+  private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class);
+
+  private Path atomicWorkPath;
+
+  private Path logPath;
+
+  private Path sourceFileListing;
+  private List<Path> sourcePaths;
+
+  private Path targetPath;
+
+  public static enum FileAttribute{
+    REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION;
+
+    public static FileAttribute getAttribute(char symbol) {
+      for (FileAttribute attribute : values()) {
+        if (attribute.name().charAt(0) == Character.toUpperCase(symbol)) {
+          return attribute;
+        }
+      }
+      throw new NoSuchElementException("No attribute for " + symbol);
+    }
+  }
+
+  /**
+   * Constructor, to initialize source/target paths.
+   * @param sourcePaths List of source-paths (including wildcards)
+   *                     to be copied to target.
+   * @param targetPath Destination path for the dist-copy.
+   */
+  public DistCpOptions(List<Path> sourcePaths, Path targetPath) {
+    assert sourcePaths != null && !sourcePaths.isEmpty() : "Invalid source paths";
+    assert targetPath != null : "Invalid Target path";
+
+    this.sourcePaths = sourcePaths;
+    this.targetPath = targetPath;
+  }
+
+  /**
+   * Constructor, to initialize source/target paths.
+   * @param sourceFileListing File containing list of source paths
+   * @param targetPath Destination path for the dist-copy.
+   */
+  public DistCpOptions(Path sourceFileListing, Path targetPath) {
+    assert sourceFileListing != null : "Invalid source paths";
+    assert targetPath != null : "Invalid Target path";
+
+    this.sourceFileListing = sourceFileListing;
+    this.targetPath = targetPath;
+  }
+
+  /**
+   * Copy constructor.
+   * @param that DistCpOptions being copied from.
+   */
+  public DistCpOptions(DistCpOptions that) {
+    if (this != that && that != null) {
+      this.atomicCommit = that.atomicCommit;
+      this.syncFolder = that.syncFolder;
+      this.deleteMissing = that.deleteMissing;
+      this.ignoreFailures = that.ignoreFailures;
+      this.overwrite = that.overwrite;
+      this.skipCRC = that.skipCRC;
+      this.blocking = that.blocking;
+      this.maxMaps = that.maxMaps;
+      this.mapBandwidth = that.mapBandwidth;
+      this.sslConfigurationFile = that.getSslConfigurationFile();
+      this.copyStrategy = that.copyStrategy;
+      this.preserveStatus = that.preserveStatus;
+      this.atomicWorkPath = that.getAtomicWorkPath();
+      this.logPath = that.getLogPath();
+      this.sourceFileListing = that.getSourceFileListing();
+      this.sourcePaths = that.getSourcePaths();
+      this.targetPath = that.getTargetPath();
+    }
+  }
+
+  /**
+   * Should the data be committed atomically?
+   *
+   * @return true if data should be committed automically. false otherwise
+   */
+  public boolean shouldAtomicCommit() {
+    return atomicCommit;
+  }
+
+  /**
+   * Set if data need to be committed automatically
+   *
+   * @param atomicCommit - boolean switch
+   */
+  public void setAtomicCommit(boolean atomicCommit) {
+    validate(DistCpOptionSwitch.ATOMIC_COMMIT, atomicCommit);
+    this.atomicCommit = atomicCommit;
+  }
+
+  /**
+   * Should the data be sync'ed between source and target paths?
+   *
+   * @return true if data should be sync'ed up. false otherwise
+   */
+  public boolean shouldSyncFolder() {
+    return syncFolder;
+  }
+
+  /**
+   * Set if source and target folder contents be sync'ed up
+   *
+   * @param syncFolder - boolean switch
+   */
+  public void setSyncFolder(boolean syncFolder) {
+    validate(DistCpOptionSwitch.SYNC_FOLDERS, syncFolder);
+    this.syncFolder = syncFolder;
+  }
+
+  /**
+   * Should target files missing in source should be deleted?
+   *
+   * @return true if zoombie target files to be removed. false otherwise
+   */
+  public boolean shouldDeleteMissing() {
+    return deleteMissing;
+  }
+
+  /**
+   * Set if files only present in target should be deleted
+   *
+   * @param deleteMissing - boolean switch
+   */
+  public void setDeleteMissing(boolean deleteMissing) {
+    validate(DistCpOptionSwitch.DELETE_MISSING, deleteMissing);
+    this.deleteMissing = deleteMissing;
+  }
+
+  /**
+   * Should failures be logged and ignored during copy?
+   *
+   * @return true if failures are to be logged and ignored. false otherwise
+   */
+  public boolean shouldIgnoreFailures() {
+    return ignoreFailures;
+  }
+
+  /**
+   * Set if failures during copy be ignored
+   *
+   * @param ignoreFailures - boolean switch
+   */
+  public void setIgnoreFailures(boolean ignoreFailures) {
+    this.ignoreFailures = ignoreFailures;
+  }
+
+  /**
+   * Should DistCp be running in blocking mode
+   *
+   * @return true if should run in blocking, false otherwise
+   */
+  public boolean shouldBlock() {
+    return blocking;
+  }
+
+  /**
+   * Set if Disctp should run blocking or non-blocking
+   *
+   * @param blocking - boolean switch
+   */
+  public void setBlocking(boolean blocking) {
+    this.blocking = blocking;
+  }
+
+  /**
+   * Should files be overwritten always?
+   *
+   * @return true if files in target that may exist before distcp, should always
+   *         be overwritten. false otherwise
+   */
+  public boolean shouldOverwrite() {
+    return overwrite;
+  }
+
+  /**
+   * Set if files should always be overwritten on target
+   *
+   * @param overwrite - boolean switch
+   */
+  public void setOverwrite(boolean overwrite) {
+    validate(DistCpOptionSwitch.OVERWRITE, overwrite);
+    this.overwrite = overwrite;
+  }
+
+  /**
+   * Should CRC/checksum check be skipped while checking files are identical
+   *
+   * @return true if checksum check should be skipped while checking files are
+   *         identical. false otherwise
+   */
+  public boolean shouldSkipCRC() {
+    return skipCRC;
+  }
+
+  /**
+   * Set if checksum comparison should be skipped while determining if
+   * source and destination files are identical
+   *
+   * @param skipCRC - boolean switch
+   */
+  public void setSkipCRC(boolean skipCRC) {
+    validate(DistCpOptionSwitch.SKIP_CRC, skipCRC);
+    this.skipCRC = skipCRC;
+  }
+
+  /** Get the max number of maps to use for this copy
+   *
+   * @return Max number of maps
+   */
+  public int getMaxMaps() {
+    return maxMaps;
+  }
+
+  /**
+   * Set the max number of maps to use for copy
+   *
+   * @param maxMaps - Number of maps
+   */
+  public void setMaxMaps(int maxMaps) {
+    this.maxMaps = maxMaps;
+  }
+
+  /** Get the map bandwidth in MB
+   *
+   * @return Bandwidth in MB
+   */
+  public int getMapBandwidth() {
+    return mapBandwidth;
+  }
+
+  /**
+   * Set per map bandwidth
+   *
+   * @param mapBandwidth - per map bandwidth
+   */
+  public void setMapBandwidth(int mapBandwidth) {
+    assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
+    this.mapBandwidth = mapBandwidth;
+  }
+
+  /**
+   * Get path where the ssl configuration file is present to use for hftps://
+   *
+   * @return Path on local file system
+   */
+  public String getSslConfigurationFile() {
+    return sslConfigurationFile;
+  }
+
+  /**
+   * Set the SSL configuration file path to use with hftps:// (local path)
+   *
+   * @param sslConfigurationFile - Local ssl config file path
+   */
+  public void setSslConfigurationFile(String sslConfigurationFile) {
+    this.sslConfigurationFile = sslConfigurationFile;
+  }
+
+  /**
+   * Returns an iterator with the list of file attributes to preserve
+   *
+   * @return iterator of file attributes to preserve
+   */
+  public Iterator<FileAttribute> preserveAttributes() {
+    return preserveStatus.iterator();
+  }
+
+  /**
+   * Checks if the input attibute should be preserved or not
+   *
+   * @param attribute - Attribute to check
+   * @return True if attribute should be preserved, false otherwise
+   */
+  public boolean shouldPreserve(FileAttribute attribute) {
+    return preserveStatus.contains(attribute);
+  }
+
+  /**
+   * Add file attributes that need to be preserved. This method may be
+   * called multiple times to add attributes.
+   *
+   * @param fileAttribute - Attribute to add, one at a time
+   */
+  public void preserve(FileAttribute fileAttribute) {
+    for (FileAttribute attribute : preserveStatus) {
+      if (attribute.equals(fileAttribute)) {
+        return;
+      }
+    }
+    preserveStatus.add(fileAttribute);
+  }
+
+  /** Get work path for atomic commit. If null, the work
+   * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath)
+   *
+   * @return Atomic work path on the target cluster. Null if not set
+   */
+  public Path getAtomicWorkPath() {
+    return atomicWorkPath;
+  }
+
+  /**
+   * Set the work path for atomic commit
+   *
+   * @param atomicWorkPath - Path on the target cluster
+   */
+  public void setAtomicWorkPath(Path atomicWorkPath) {
+    this.atomicWorkPath = atomicWorkPath;
+  }
+
+  /** Get output directory for writing distcp logs. Otherwise logs
+   * are temporarily written to JobStagingDir/_logs and deleted
+   * upon job completion
+   *
+   * @return Log output path on the cluster where distcp job is run
+   */
+  public Path getLogPath() {
+    return logPath;
+  }
+
+  /**
+   * Set the log path where distcp output logs are stored
+   * Uses JobStagingDir/_logs by default
+   *
+   * @param logPath - Path where logs will be saved
+   */
+  public void setLogPath(Path logPath) {
+    this.logPath = logPath;
+  }
+
+  /**
+   * Get the copy strategy to use. Uses appropriate input format
+   *
+   * @return copy strategy to use
+   */
+  public String getCopyStrategy() {
+    return copyStrategy;
+  }
+
+  /**
+   * Set the copy strategy to use. Should map to a strategy implementation
+   * in distp-default.xml
+   *
+   * @param copyStrategy - copy Strategy to use
+   */
+  public void setCopyStrategy(String copyStrategy) {
+    this.copyStrategy = copyStrategy;
+  }
+
+  /**
+   * File path (hdfs:// or file://) that contains the list of actual
+   * files to copy
+   *
+   * @return - Source listing file path
+   */
+  public Path getSourceFileListing() {
+    return sourceFileListing;
+  }
+
+  /**
+   * Getter for sourcePaths.
+   * @return List of source-paths.
+   */
+  public List<Path> getSourcePaths() {
+    return sourcePaths;
+  }
+
+  /**
+   * Setter for sourcePaths.
+   * @param sourcePaths The new list of source-paths.
+   */
+  public void setSourcePaths(List<Path> sourcePaths) {
+    assert sourcePaths != null && sourcePaths.size() != 0;
+    this.sourcePaths = sourcePaths;
+  }
+
+  /**
+   * Getter for the targetPath.
+   * @return The target-path.
+   */
+  public Path getTargetPath() {
+    return targetPath;
+  }
+
+  public void validate(DistCpOptionSwitch option, boolean value) {
+
+    boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
+        value : this.syncFolder);
+    boolean overwrite = (option == DistCpOptionSwitch.OVERWRITE ?
+        value : this.overwrite);
+    boolean deleteMissing = (option == DistCpOptionSwitch.DELETE_MISSING ?
+        value : this.deleteMissing);
+    boolean atomicCommit = (option == DistCpOptionSwitch.ATOMIC_COMMIT ?
+        value : this.atomicCommit);
+    boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
+        value : this.skipCRC);
+
+    if (syncFolder && atomicCommit) {
+      throw new IllegalArgumentException("Atomic commit can't be used with " +
+          "sync folder or overwrite options");
+    }
+
+    if (deleteMissing && !(overwrite || syncFolder)) {
+      throw new IllegalArgumentException("Delete missing is applicable " +
+          "only with update or overwrite options");
+    }
+
+    if (overwrite && syncFolder) {
+      throw new IllegalArgumentException("Overwrite and update options are " +
+          "mutually exclusive");
+    }
+
+    if (!syncFolder && skipCRC) {
+      throw new IllegalArgumentException("Skip CRC is valid only with update options");
+    }
+
+  }
+
+  /**
+   * Add options to configuration. These will be used in the Mapper/committer
+   *
+   * @param conf - Configruation object to which the options need to be added
+   */
+  public void appendToConf(Configuration conf) {
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
+        String.valueOf(atomicCommit));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
+        String.valueOf(ignoreFailures));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
+        String.valueOf(syncFolder));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
+        String.valueOf(deleteMissing));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
+        String.valueOf(overwrite));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
+        String.valueOf(skipCRC));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
+        String.valueOf(mapBandwidth));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
+        DistCpUtils.packAttributes(preserveStatus));
+  }
+
+  /**
+   * Utility to easily string-ify Options, for logging.
+   *
+   * @return String representation of the Options.
+   */
+  @Override
+  public String toString() {
+    return "DistCpOptions{" +
+        "atomicCommit=" + atomicCommit +
+        ", syncFolder=" + syncFolder +
+        ", deleteMissing=" + deleteMissing +
+        ", ignoreFailures=" + ignoreFailures +
+        ", maxMaps=" + maxMaps +
+        ", sslConfigurationFile='" + sslConfigurationFile + '\'' +
+        ", copyStrategy='" + copyStrategy + '\'' +
+        ", sourceFileListing=" + sourceFileListing +
+        ", sourcePaths=" + sourcePaths +
+        ", targetPath=" + targetPath +
+        '}';
+  }
+
+  @Override
+  protected DistCpOptions clone() throws CloneNotSupportedException {
+    return (DistCpOptions) super.clone();
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * FileBasedCopyListing implements the CopyListing interface,
+ * to create the copy-listing for DistCp,
+ * by iterating over all source paths mentioned in a specified input-file.
+ */
+public class FileBasedCopyListing extends CopyListing {
+
+  private final CopyListing globbedListing;
+  /**
+   * Constructor, to initialize base-class.
+   * @param configuration The input Configuration object.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  public FileBasedCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+    globbedListing = new GlobbedCopyListing(getConf(), credentials);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException {
+  }
+
+  /**
+   * Implementation of CopyListing::buildListing().
+   *   Iterates over all source paths mentioned in the input-file.
+   * @param pathToListFile Path on HDFS where the listing file is written.
+   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @throws IOException
+   */
+  @Override
+  public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException {
+    DistCpOptions newOption = new DistCpOptions(options);
+    newOption.setSourcePaths(fetchFileList(options.getSourceFileListing()));
+    globbedListing.buildListing(pathToListFile, newOption);
+  }
+
+  private List<Path> fetchFileList(Path sourceListing) throws IOException {
+    List<Path> result = new ArrayList<Path>();
+    FileSystem fs = sourceListing.getFileSystem(getConf());
+    BufferedReader input = null;
+    try {
+      input = new BufferedReader(new InputStreamReader(fs.open(sourceListing)));
+      String line = input.readLine();
+      while (line != null) {
+        result.add(new Path(line));
+        line = input.readLine();
+      }
+    } finally {
+      IOUtils.closeStream(input);
+    }
+    return result;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getBytesToCopy() {
+    return globbedListing.getBytesToCopy();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getNumberOfPaths() {
+    return globbedListing.getNumberOfPaths();
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * GlobbedCopyListing implements the CopyListing interface, to create the copy
+ * listing-file by "globbing" all specified source paths (wild-cards and all.)
+ */
+public class GlobbedCopyListing extends CopyListing {
+  private static final Log LOG = LogFactory.getLog(GlobbedCopyListing.class);
+
+  private final CopyListing simpleListing;
+  /**
+   * Constructor, to initialize the configuration.
+   * @param configuration The input Configuration object.
+   * @param credentials Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  public GlobbedCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+    simpleListing = new SimpleCopyListing(getConf(), credentials) ;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException {
+  }
+
+  /**
+   * Implementation of CopyListing::buildListing().
+   * Creates the copy listing by "globbing" all source-paths.
+   * @param pathToListingFile The location at which the copy-listing file
+   *                           is to be created.
+   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @throws IOException
+   */
+  @Override
+  public void doBuildListing(Path pathToListingFile,
+                             DistCpOptions options) throws IOException {
+
+    List<Path> globbedPaths = new ArrayList<Path>();
+    if (options.getSourcePaths().isEmpty()) {
+      throw new InvalidInputException("Nothing to process. Source paths::EMPTY");  
+    }
+
+    for (Path p : options.getSourcePaths()) {
+      FileSystem fs = p.getFileSystem(getConf());
+      FileStatus[] inputs = fs.globStatus(p);
+
+      if(inputs != null && inputs.length > 0) {
+        for (FileStatus onePath: inputs) {
+          globbedPaths.add(onePath.getPath());
+        }
+      } else {
+        throw new InvalidInputException(p + " doesn't exist");        
+      }
+    }
+
+    DistCpOptions optionsGlobbed = new DistCpOptions(options);
+    optionsGlobbed.setSourcePaths(globbedPaths);
+    simpleListing.buildListing(pathToListingFile, optionsGlobbed);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getBytesToCopy() {
+    return simpleListing.getBytesToCopy();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getNumberOfPaths() {
+    return simpleListing.getNumberOfPaths();
+  }
+
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.cli.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+
+import java.util.*;
+
+/**
+ * The OptionsParser parses out the command-line options passed to DistCp,
+ * and interprets those specific to DistCp, to create an Options object.
+ */
+public class OptionsParser {
+
+  private static final Log LOG = LogFactory.getLog(OptionsParser.class);
+
+  private static final Options cliOptions = new Options();      
+
+  static {
+    for (DistCpOptionSwitch option : DistCpOptionSwitch.values()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding option " + option.getOption());
+      }
+      cliOptions.addOption(option.getOption());
+    }
+  }
+
+  private static class CustomParser extends GnuParser {
+    @Override
+    protected String[] flatten(Options options, String[] arguments, boolean stopAtNonOption) {
+      for (int index = 0; index < arguments.length; index++) {
+        if (arguments[index].equals("-" + DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+          arguments[index] = "-prbugp";
+        }
+      }
+      return super.flatten(options, arguments, stopAtNonOption);
+    }
+  }
+
+  /**
+   * The parse method parses the command-line options, and creates
+   * a corresponding Options object.
+   * @param args Command-line arguments (excluding the options consumed
+   *              by the GenericOptionsParser).
+   * @return The Options object, corresponding to the specified command-line.
+   * @throws IllegalArgumentException: Thrown if the parse fails.
+   */
+  public static DistCpOptions parse(String args[]) throws IllegalArgumentException {
+
+    CommandLineParser parser = new CustomParser();
+
+    CommandLine command;
+    try {
+      command = parser.parse(cliOptions, args, true);
+    } catch (ParseException e) {
+      throw new IllegalArgumentException("Unable to parse arguments. " +
+        Arrays.toString(args), e);
+    }
+
+    DistCpOptions option;
+    Path targetPath;
+    List<Path> sourcePaths = new ArrayList<Path>();
+
+    String leftOverArgs[] = command.getArgs();
+    if (leftOverArgs == null || leftOverArgs.length < 1) {
+      throw new IllegalArgumentException("Target path not specified");
+    }
+
+    //Last Argument is the target path
+    targetPath = new Path(leftOverArgs[leftOverArgs.length -1].trim());
+
+    //Copy any source paths in the arguments to the list
+    for (int index = 0; index < leftOverArgs.length - 1; index++) {
+      sourcePaths.add(new Path(leftOverArgs[index].trim()));
+    }
+
+    /* If command has source file listing, use it else, fall back on source paths in args
+       If both are present, throw exception and bail */
+    if (command.hasOption(DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) {
+      if (!sourcePaths.isEmpty()) {
+        throw new IllegalArgumentException("Both source file listing and source paths present");
+      }
+      option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
+              SOURCE_FILE_LISTING.getSwitch())), targetPath);
+    } else {
+      if (sourcePaths.isEmpty()) {
+        throw new IllegalArgumentException("Neither source file listing nor source paths present");
+      }
+      option = new DistCpOptions(sourcePaths, targetPath);
+    }
+
+    //Process all the other option switches and set options appropriately
+    if (command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) {
+      option.setIgnoreFailures(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch())) {
+      option.setAtomicCommit(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch()) &&
+        option.shouldAtomicCommit()) {
+      String workPath = getVal(command, DistCpOptionSwitch.WORK_PATH.getSwitch());
+      if (workPath != null && !workPath.isEmpty()) {
+        option.setAtomicWorkPath(new Path(workPath));
+      }
+    } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
+      throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic");      
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
+      option.setLogPath(new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch())));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch())) {
+      option.setSyncFolder(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch())) {
+      option.setOverwrite(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
+      option.setDeleteMissing(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch())) {
+      option.setSkipCRC(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) {
+      option.setBlocking(false);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
+      try {
+        Integer mapBandwidth = Integer.parseInt(
+            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
+        option.setMapBandwidth(mapBandwidth);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Bandwidth specified is invalid: " +
+            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
+      }
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) {
+      option.setSslConfigurationFile(command.
+          getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
+      try {
+        Integer maps = Integer.parseInt(
+            getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
+        option.setMaxMaps(maps);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Number of maps is invalid: " +
+            getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
+      }
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
+      option.setCopyStrategy(
+            getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+      String attributes =
+          getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch());
+      if (attributes == null || attributes.isEmpty()) {
+        for (FileAttribute attribute : FileAttribute.values()) {
+          option.preserve(attribute);
+        }
+      } else {
+        for (int index = 0; index < attributes.length(); index++) {
+          option.preserve(FileAttribute.
+              getAttribute(attributes.charAt(index)));
+        }
+      }
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
+      String fileLimitString = getVal(command,
+                              DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
+      try {
+        Integer.parseInt(fileLimitString);
+      }
+      catch (NumberFormatException e) {
+        throw new IllegalArgumentException("File-limit is invalid: "
+                                            + fileLimitString, e);
+      }
+      LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
+              " option. Ignoring.");
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
+      String sizeLimitString = getVal(command,
+                              DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
+      try {
+        Long.parseLong(sizeLimitString);
+      }
+      catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Size-limit is invalid: "
+                                            + sizeLimitString, e);
+      }
+      LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
+              " option. Ignoring.");
+    }
+
+    return option;
+  }
+
+  private static String getVal(CommandLine command, String swtch) {
+    String optionValue = command.getOptionValue(swtch);
+    if (optionValue == null) {
+      return null;
+    } else {
+      return optionValue.trim();
+    }
+  }
+
+  public static void usage() {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp("distcp OPTIONS [source_path...] <target_path>\n\nOPTIONS", cliOptions);
+  }
+}