You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:36:54 UTC
svn commit: r1236045 [1/5] - in /hadoop/common/trunk: hadoop-project/
hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/
hadoop-tools/hadoop-distcp/src/main/
hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/hadoop-distcp/sr...
Author: mahadev
Date: Thu Jan 26 06:36:52 2012
New Revision: 1236045
URL: http://svn.apache.org/viewvc?rev=1236045&view=rev
Log:
MAPREDUCE-2765. DistCp Rewrite. (Mithun Radhakrishnan via mahadev)
Added:
hadoop/common/trunk/hadoop-tools/hadoop-distcp/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/README
hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestFileBasedCopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestGlobbedCopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyOutputFormat.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestRetriableCommand.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/resources/sslConfig.xml
Modified:
hadoop/common/trunk/hadoop-project/pom.xml
hadoop/common/trunk/hadoop-tools/pom.xml
Modified: hadoop/common/trunk/hadoop-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-project/pom.xml?rev=1236045&r1=1236044&r2=1236045&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-project/pom.xml (original)
+++ hadoop/common/trunk/hadoop-project/pom.xml Thu Jan 26 06:36:52 2012
@@ -710,10 +710,20 @@
<version>2.4</version>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.2</version>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2</version>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pdf-plugin</artifactId>
+ <version>1.1</version>
+ </plugin>
</plugins>
</pluginManagement>
@@ -773,6 +783,14 @@
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pdf-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.reporting.outputDirectory}</outputDirectory>
+ <includeReports>false</includeReports>
+ </configuration>
+ </plugin>
</plugins>
</build>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/README
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/README?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/README (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/README Thu Jan 26 06:36:52 2012
@@ -0,0 +1,7 @@
+DistCp (distributed copy) is a tool used for large inter/intra-cluster copying.
+It uses Map/Reduce to effect its distribution, error handling and recovery,
+and reporting. It expands a list of files and directories into input to map tasks,
+each of which will copy a partition of the files specified in the source list.
+
+Version 0.1 (2010/08/02 sriksun)
+ - Initial Version
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/pom.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,185 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-project</artifactId>
+ <version>0.23.1-SNAPSHOT</version>
+ <relativePath>../../hadoop-project</relativePath>
+ </parent>
+ <groupId>org.apache.hadoop.tools</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <version>0.23.1-SNAPSHOT</version>
+ <description>Apache Hadoop Distributed Copy</description>
+ <name>Apache Hadoop Distributed Copy</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <file.encoding>UTF-8</file.encoding>
+ <downloadSources>true</downloadSources>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-hs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ <filtering>true</filtering>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>always</forkMode>
+ <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
+ <argLine>-Xmx1024m</argLine>
+ <includes>
+ <include>**/Test*.java</include>
+ </includes>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <systemProperties>
+ <property>
+ <name>test.build.data</name>
+ <value>${basedir}/target/test/data</value>
+ </property>
+ <property>
+ <name>hadoop.log.dir</name>
+ <value>target/test/logs</value>
+ </property>
+ <property>
+ <name>org.apache.commons.logging.Log</name>
+ <value>org.apache.commons.logging.impl.SimpleLog</value>
+ </property>
+ <property>
+ <name>org.apache.commons.logging.simplelog.defaultlog</name>
+ <value>warn</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <enableRulesSummary>true</enableRulesSummary>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.hadoop.tools.DistCp</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <configuration>
+ <attach>true</attach>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pdf-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>pdf</id>
+ <phase>package</phase>
+ <goals>
+ <goal>pdf</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+
+/**
+ * The CopyListing abstraction is responsible for how the list of
+ * sources and targets is constructed, for DistCp's copy function.
+ * The copy-listing should be a SequenceFile<Text, FileStatus>,
+ * located at the path specified to buildListing(),
+ * each entry being a pair of (Source relative path, source file status),
+ * all the paths being fully qualified.
+ */
+public abstract class CopyListing extends Configured {
+
+ private Credentials credentials;
+
+ /**
+ * Build listing function creates the input listing that distcp uses to
+ * perform the copy.
+ *
+ * The build listing is a sequence file that has relative path of a file in the key
+ * and the file status information of the source file in the value
+ *
+ * For instance if the source path is /tmp/data and the traversed path is
+ * /tmp/data/dir1/dir2/file1, then the sequence file would contain
+ *
+ * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+ *
+ * File would also contain directory entries. Meaning, if /tmp/data/dir1/dir2/file1
+ * is the only file under /tmp/data, the resulting sequence file would contain the
+ * following entries
+ *
+ * key: /dir1 and value: FileStatus(/tmp/data/dir1)
+ * key: /dir1/dir2 and value: FileStatus(/tmp/data/dir1/dir2)
+ * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+ *
+ * Cases requiring special handling:
+ * If source path is a file (/tmp/file1), contents of the file will be as follows
+ *
+ * TARGET DOES NOT EXIST: Key-"", Value-FileStatus(/tmp/file1)
+ * TARGET IS FILE : Key-"", Value-FileStatus(/tmp/file1)
+ * TARGET IS DIR : Key-"/file1", Value-FileStatus(/tmp/file1)
+ *
+ * @param pathToListFile - Output file where the listing would be stored
+ * @param options - Input options to distcp
+ * @throws IOException - Exception if any
+ */
+ public final void buildListing(Path pathToListFile,
+ DistCpOptions options) throws IOException {
+ validatePaths(options);
+ doBuildListing(pathToListFile, options);
+ Configuration config = getConf();
+
+ config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, pathToListFile.toString());
+ config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy());
+ config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
+
+ checkForDuplicates(pathToListFile);
+ }
+
+ /**
+ * Validate input and output paths
+ *
+ * @param options - Input options
+ * @throws InvalidInputException: If inputs are invalid
+ * @throws IOException: any Exception with FS
+ */
+ protected abstract void validatePaths(DistCpOptions options)
+ throws IOException, InvalidInputException;
+
+ /**
+ * The interface to be implemented by sub-classes, to create the source/target file listing.
+ * @param pathToListFile Path on HDFS where the listing file is written.
+ * @param options Input Options for DistCp (indicating source/target paths.)
+ * @throws IOException: Thrown on failure to create the listing file.
+ */
+ protected abstract void doBuildListing(Path pathToListFile,
+ DistCpOptions options) throws IOException;
+
+ /**
+ * Return the total bytes that distCp should copy for the source paths
+ * This doesn't consider whether file is same should be skipped during copy
+ *
+ * @return total bytes to copy
+ */
+ protected abstract long getBytesToCopy();
+
+ /**
+ * Return the total number of paths to distcp, includes directories as well
+ * This doesn't consider whether file/dir is already present and should be skipped during copy
+ *
+ * @return Total number of paths to distcp
+ */
+ protected abstract long getNumberOfPaths();
+
+ /**
+ * Validate the final resulting path listing to see if there are any duplicate entries
+ *
+ * @param pathToListFile - path listing build by doBuildListing
+ * @throws IOException - Any issues while checking for duplicates and throws
+ * @throws DuplicateFileException - if there are duplicates
+ */
+ private void checkForDuplicates(Path pathToListFile)
+ throws DuplicateFileException, IOException {
+
+ Configuration config = getConf();
+ FileSystem fs = pathToListFile.getFileSystem(config);
+
+ Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(
+ config, SequenceFile.Reader.file(sortedList));
+ try {
+ Text lastKey = new Text("*"); //source relative path can never hold *
+ FileStatus lastFileStatus = new FileStatus();
+
+ Text currentKey = new Text();
+ while (reader.next(currentKey)) {
+ if (currentKey.equals(lastKey)) {
+ FileStatus currentFileStatus = new FileStatus();
+ reader.getCurrentValue(currentFileStatus);
+ throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
+ currentFileStatus.getPath() + " would cause duplicates. Aborting");
+ }
+ reader.getCurrentValue(lastFileStatus);
+ lastKey.set(currentKey);
+ }
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ /**
+ * Protected constructor, to initialize configuration.
+ * @param configuration The input configuration,
+ * with which the source/target FileSystems may be accessed.
+ * @param credentials - Credentials object on which the FS delegation tokens are cached.If null
+ * delegation token caching is skipped
+ */
+ protected CopyListing(Configuration configuration, Credentials credentials) {
+ setConf(configuration);
+ setCredentials(credentials);
+ }
+
+ /**
+ * set Credentials store, on which FS delegatin token will be cached
+ * @param credentials - Credentials object
+ */
+ protected void setCredentials(Credentials credentials) {
+ this.credentials = credentials;
+ }
+
+ /**
+ * get credentials to update the delegation tokens for accessed FS objects
+ * @return Credentials object
+ */
+ protected Credentials getCredentials() {
+ return credentials;
+ }
+
+ /**
+ * Public Factory method with which the appropriate CopyListing implementation may be retrieved.
+ * @param configuration The input configuration.
+ * @param credentials Credentials object on which the FS delegation tokens are cached
+ * @param options The input Options, to help choose the appropriate CopyListing Implementation.
+ * @return An instance of the appropriate CopyListing implementation.
+ */
+ public static CopyListing getCopyListing(Configuration configuration,
+ Credentials credentials,
+ DistCpOptions options) {
+ if (options.getSourceFileListing() == null) {
+ return new GlobbedCopyListing(configuration, credentials);
+ } else {
+ return new FileBasedCopyListing(configuration, credentials);
+ }
+ }
+
+ static class DuplicateFileException extends RuntimeException {
+ public DuplicateFileException(String message) {
+ super(message);
+ }
+ }
+
+ static class InvalidInputException extends RuntimeException {
+ public InvalidInputException(String message) {
+ super(message);
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.tools.CopyListing.*;
+import org.apache.hadoop.tools.mapred.CopyMapper;
+import org.apache.hadoop.tools.mapred.CopyOutputFormat;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * DistCp is the main driver-class for DistCpV2.
+ * For command-line use, DistCp::main() orchestrates the parsing of command-line
+ * parameters and the launch of the DistCp job.
+ * For programmatic use, a DistCp object can be constructed by specifying
+ * options (in a DistCpOptions object), and DistCp::execute() may be used to
+ * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
+ * behaviour.
+ */
+public class DistCp extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(DistCp.class);
+
+ private DistCpOptions inputOptions;
+ private Path metaFolder;
+
+ private static final String PREFIX = "_distcp";
+ private static final String WIP_PREFIX = "._WIP_";
+ private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
+ public static final Random rand = new Random();
+
+ private boolean submitted;
+ private FileSystem jobFS;
+
+ /**
+ * Public Constructor. Creates DistCp object with specified input-parameters.
+ * (E.g. source-paths, target-location, etc.)
+ * @param inputOptions Options (indicating source-paths, target-location.)
+ * @param configuration The Hadoop configuration against which the Copy-mapper must run.
+ * @throws Exception, on failure.
+ */
+ public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
+ Configuration config = new Configuration(configuration);
+ config.addResource(DISTCP_DEFAULT_XML);
+ setConf(config);
+ this.inputOptions = inputOptions;
+ this.metaFolder = createMetaFolderPath();
+ }
+
+ /**
+ * To be used with the ToolRunner. Not for public consumption.
+ */
+ private DistCp() {}
+
+ /**
+ * Implementation of Tool::run(). Orchestrates the copy of source file(s)
+ * to target location, by:
+ * 1. Creating a list of files to be copied to target.
+ * 2. Launching a Map-only job to copy the files. (Delegates to execute().)
+ * @param argv List of arguments passed to DistCp, from the ToolRunner.
+ * @return On success, it returns 0. Else, -1.
+ */
+ public int run(String[] argv) {
+ try {
+ inputOptions = (OptionsParser.parse(argv));
+
+ LOG.info("Input Options: " + inputOptions);
+ } catch (Throwable e) {
+ LOG.error("Invalid arguments: ", e);
+ System.err.println("Invalid arguments: " + e.getMessage());
+ OptionsParser.usage();
+ return DistCpConstants.INVALID_ARGUMENT;
+ }
+
+ try {
+ execute();
+ } catch (InvalidInputException e) {
+ LOG.error("Invalid input: ", e);
+ return DistCpConstants.INVALID_ARGUMENT;
+ } catch (DuplicateFileException e) {
+ LOG.error("Duplicate files in input path: ", e);
+ return DistCpConstants.DUPLICATE_INPUT;
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ return DistCpConstants.UNKNOWN_ERROR;
+ }
+ return DistCpConstants.SUCCESS;
+ }
+
+ /**
+ * Implements the core-execution. Creates the file-list for copy,
+ * and launches the Hadoop-job, to do the copy.
+ * @return Job handle
+ * @throws Exception, on failure.
+ */
+ public Job execute() throws Exception {
+ assert inputOptions != null;
+ assert getConf() != null;
+
+ Job job = null;
+ try {
+ metaFolder = createMetaFolderPath();
+ jobFS = metaFolder.getFileSystem(getConf());
+
+ job = createJob();
+ createInputFileListing(job);
+
+ job.submit();
+ submitted = true;
+ } finally {
+ if (!submitted) {
+ cleanup();
+ }
+ }
+
+ String jobID = job.getJobID().toString();
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+
+ LOG.info("DistCp job-id: " + jobID);
+ if (inputOptions.shouldBlock()) {
+ job.waitForCompletion(true);
+ }
+ return job;
+ }
+
+ /**
+ * Create Job object for submitting it, with all the configuration
+ *
+ * @return Reference to job object.
+ * @throws IOException - Exception if any
+ */
+ private Job createJob() throws IOException {
+ String jobName = "distcp";
+ String userChosenName = getConf().get(JobContext.JOB_NAME);
+ if (userChosenName != null)
+ jobName += ": " + userChosenName;
+ Job job = Job.getInstance(getConf());
+ job.setJobName(jobName);
+ job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
+ job.setJarByClass(CopyMapper.class);
+ configureOutputFormat(job);
+
+ job.setMapperClass(CopyMapper.class);
+ job.setNumReduceTasks(0);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputFormatClass(CopyOutputFormat.class);
+ job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
+ job.getConfiguration().set(JobContext.NUM_MAPS,
+ String.valueOf(inputOptions.getMaxMaps()));
+
+ if (inputOptions.getSslConfigurationFile() != null) {
+ setupSSLConfig(job);
+ }
+
+ inputOptions.appendToConf(job.getConfiguration());
+ return job;
+ }
+
+ /**
+ * Setup ssl configuration on the job configuration to enable hsftp access
+ * from map job. Also copy the ssl configuration file to Distributed cache
+ *
+ * @param job - Reference to job's handle
+ * @throws java.io.IOException - Exception if unable to locate ssl config file
+ */
+ private void setupSSLConfig(Job job) throws IOException {
+ Configuration configuration = job.getConfiguration();
+ Path sslConfigPath = new Path(configuration.
+ getResource(inputOptions.getSslConfigurationFile()).toString());
+
+ addSSLFilesToDistCache(job, sslConfigPath);
+ configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName());
+ configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName());
+ }
+
+ /**
+ * Add SSL files to distributed cache. Trust store, key store and ssl config xml
+ *
+ * @param job - Job handle
+ * @param sslConfigPath - ssl Configuration file specified through options
+ * @throws IOException - If any
+ */
+ private void addSSLFilesToDistCache(Job job,
+ Path sslConfigPath) throws IOException {
+ Configuration configuration = job.getConfiguration();
+ FileSystem localFS = FileSystem.getLocal(configuration);
+
+ Configuration sslConf = new Configuration(false);
+ sslConf.addResource(sslConfigPath);
+
+ Path localStorePath = getLocalStorePath(sslConf,
+ DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
+ job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
+ localFS.getWorkingDirectory()).toUri());
+ configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
+ localStorePath.getName());
+
+ localStorePath = getLocalStorePath(sslConf,
+ DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
+ job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
+ localFS.getWorkingDirectory()).toUri());
+ configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
+ localStorePath.getName());
+
+ job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(),
+ localFS.getWorkingDirectory()).toUri());
+
+ }
+
+ /**
+ * Get Local Trust store/key store path
+ *
+ * @param sslConf - Config from SSL Client xml
+ * @param storeKey - Key for either trust store or key store
+ * @return - Path where the store is present
+ * @throws IOException -If any
+ */
+ private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException {
+ if (sslConf.get(storeKey) != null) {
+ return new Path(sslConf.get(storeKey));
+ } else {
+ throw new IOException("Store for " + storeKey + " is not set in " +
+ inputOptions.getSslConfigurationFile());
+ }
+ }
+
+ /**
+ * Setup output format appropriately
+ *
+ * @param job - Job handle
+ * @throws IOException - Exception if any
+ */
+ private void configureOutputFormat(Job job) throws IOException {
+ final Configuration configuration = job.getConfiguration();
+ Path targetPath = inputOptions.getTargetPath();
+ FileSystem targetFS = targetPath.getFileSystem(configuration);
+ targetPath = targetPath.makeQualified(targetFS.getUri(),
+ targetFS.getWorkingDirectory());
+
+ if (inputOptions.shouldAtomicCommit()) {
+ Path workDir = inputOptions.getAtomicWorkPath();
+ if (workDir == null) {
+ workDir = targetPath.getParent();
+ }
+ workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
+ + rand.nextInt());
+ FileSystem workFS = workDir.getFileSystem(configuration);
+ if (!DistCpUtils.compareFs(targetFS, workFS)) {
+ throw new IllegalArgumentException("Work path " + workDir +
+ " and target path " + targetPath + " are in different file system");
+ }
+ CopyOutputFormat.setWorkingDirectory(job, workDir);
+ } else {
+ CopyOutputFormat.setWorkingDirectory(job, targetPath);
+ }
+ CopyOutputFormat.setCommitDirectory(job, targetPath);
+
+ Path logPath = inputOptions.getLogPath();
+ if (logPath == null) {
+ logPath = new Path(metaFolder, "_logs");
+ } else {
+ LOG.info("DistCp job log path: " + logPath);
+ }
+ CopyOutputFormat.setOutputPath(job, logPath);
+ }
+
+ /**
+ * Create input listing by invoking an appropriate copy listing
+ * implementation. Also add delegation tokens for each path
+ * to job's credential store
+ *
+ * @param job - Handle to job
+ * @return Returns the path where the copy listing is created
+ * @throws IOException - If any
+ */
+ private Path createInputFileListing(Job job) throws IOException {
+ Path fileListingPath = getFileListingPath();
+ CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
+ job.getCredentials(), inputOptions);
+ copyListing.buildListing(fileListingPath, inputOptions);
+ return fileListingPath;
+ }
+
+ /**
+ * Get default name of the copy listing file. Use the meta folder
+ * to create the copy listing file
+ *
+ * @return - Path where the copy listing file has to be saved
+ * @throws IOException - Exception if any
+ */
+ private Path getFileListingPath() throws IOException {
+ String fileListPathStr = metaFolder + "/fileList.seq";
+ Path path = new Path(fileListPathStr);
+ return new Path(path.toUri().normalize().toString());
+ }
+
+ /**
+ * Create a default working folder for the job, under the
+ * job staging directory
+ *
+ * @return Returns the working folder information
+ * @throws Exception - EXception if any
+ */
+ private Path createMetaFolderPath() throws Exception {
+ Configuration configuration = getConf();
+ Path stagingDir = JobSubmissionFiles.getStagingDir(
+ new Cluster(configuration), configuration);
+ Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
+ if (LOG.isDebugEnabled())
+ LOG.debug("Meta folder location: " + metaFolderPath);
+ configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());
+ return metaFolderPath;
+ }
+
+ /**
+ * Main function of the DistCp program. Parses the input arguments (via OptionsParser),
+ * and invokes the DistCp::run() method, via the ToolRunner.
+ * @param argv Command-line arguments sent to DistCp.
+ */
+ public static void main(String argv[]) {
+ try {
+ DistCp distCp = new DistCp();
+ Cleanup CLEANUP = new Cleanup(distCp);
+
+ Runtime.getRuntime().addShutdownHook(CLEANUP);
+ System.exit(ToolRunner.run(getDefaultConf(), distCp, argv));
+ }
+ catch (Exception e) {
+ LOG.error("Couldn't complete DistCp operation: ", e);
+ System.exit(DistCpConstants.UNKNOWN_ERROR);
+ }
+ }
+
+ /**
+ * Loads properties from distcp-default.xml into configuration
+ * object
+ * @return Configuration which includes properties from distcp-default.xml
+ */
+ private static Configuration getDefaultConf() {
+ Configuration config = new Configuration();
+ config.addResource(DISTCP_DEFAULT_XML);
+ return config;
+ }
+
+ private synchronized void cleanup() {
+ try {
+ if (metaFolder == null) return;
+
+ jobFS.delete(metaFolder, true);
+ metaFolder = null;
+ } catch (IOException e) {
+ LOG.error("Unable to cleanup meta folder: " + metaFolder, e);
+ }
+ }
+
+ private boolean isSubmitted() {
+ return submitted;
+ }
+
+ private static class Cleanup extends Thread {
+ private final DistCp distCp;
+
+ public Cleanup(DistCp distCp) {
+ this.distCp = distCp;
+ }
+
+ @Override
+ public void run() {
+ if (distCp.isSubmitted()) return;
+
+ distCp.cleanup();
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,104 @@
+package org.apache.hadoop.tools;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility class to hold commonly used constants.
+ */
+public class DistCpConstants {
+
+ /* Default number of maps to use for DistCp */
+ public static final int DEFAULT_MAPS = 20;
+
+ /* Default bandwidth if none specified */
+ public static final int DEFAULT_BANDWIDTH_MB = 100;
+
+ /* Default strategy for copying. Implementation looked up
+ from distcp-default.xml
+ */
+ public static final String UNIFORMSIZE = "uniformsize";
+
+ /**
+ * Constants mapping to command line switches/input options
+ */
+ public static final String CONF_LABEL_ATOMIC_COPY = "distcp.atomic.copy";
+ public static final String CONF_LABEL_WORK_PATH = "distcp.work.path";
+ public static final String CONF_LABEL_LOG_PATH = "distcp.log.path";
+ public static final String CONF_LABEL_IGNORE_FAILURES = "distcp.ignore.failures";
+ public static final String CONF_LABEL_PRESERVE_STATUS = "distcp.preserve.status";
+ public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
+ public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
+ public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
+ public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
+ public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
+ public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
+ public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
+ public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
+ public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
+
+ /* Total bytes to be copied. Updated by copylisting. Unfiltered count */
+ public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";
+
+ /* Total number of paths to copy, includes directories. Unfiltered count */
+ public static final String CONF_LABEL_TOTAL_NUMBER_OF_RECORDS = "mapred.number.of.records";
+
+ /* SSL keystore resource */
+ public static final String CONF_LABEL_SSL_KEYSTORE = "dfs.https.client.keystore.resource";
+
+ /* If input is based -f <<source listing>>, file containing the src paths */
+ public static final String CONF_LABEL_LISTING_FILE_PATH = "distcp.listing.file.path";
+
+ /* Directory where the mapreduce job will write to. If not atomic commit, then same
+ as CONF_LABEL_TARGET_FINAL_PATH
+ */
+ public static final String CONF_LABEL_TARGET_WORK_PATH = "distcp.target.work.path";
+
+ /* Directory where the final data will be committed to. If not atomic commit, then same
+ as CONF_LABEL_TARGET_WORK_PATH
+ */
+ public static final String CONF_LABEL_TARGET_FINAL_PATH = "distcp.target.final.path";
+
+ /**
+ * DistCp job id for consumers of the Disctp
+ */
+ public static final String CONF_LABEL_DISTCP_JOB_ID = "distcp.job.id";
+
+ /* Meta folder where the job's intermediate data is kept */
+ public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder";
+
+ /**
+ * Conf label for SSL Trust-store location.
+ */
+ public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION
+ = "ssl.client.truststore.location";
+
+ /**
+ * Conf label for SSL Key-store location.
+ */
+ public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION
+ = "ssl.client.keystore.location";
+
+ /**
+ * Constants for DistCp return code to shell / consumer of ToolRunner's run
+ */
+ public static final int SUCCESS = 0;
+ public static final int INVALID_ARGUMENT = -1;
+ public static final int DUPLICATE_INPUT = -2;
+ public static final int UNKNOWN_ERROR = -999;
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.cli.Option;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Enumeration mapping configuration keys to distcp command line
+ * options.
+ */
+public enum DistCpOptionSwitch {
+
+ /**
+ * Ignores any failures during copy, and continues with rest.
+ * Logs failures in a file
+ */
+ IGNORE_FAILURES(DistCpConstants.CONF_LABEL_IGNORE_FAILURES,
+ new Option("i", false, "Ignore failures during copy")),
+
+ /**
+ * Preserves status of file/path in the target.
+ * Default behavior with -p, is to preserve replication,
+ * block size, user, group and permission on the target file
+ *
+ * If any of the optional switches are present among rbugp, then
+ * only the corresponding file attribute is preserved
+ *
+ */
+ PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ new Option("p", true, "preserve status (rbugp)" +
+ "(replication, block-size, user, group, permission)")),
+
+ /**
+ * Update target location by copying only files that are missing
+ * in the target. This can be used to periodically sync two folders
+ * across source and target. Typically used with DELETE_MISSING
+ * Incompatible with ATOMIC_COMMIT
+ */
+ SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS,
+ new Option("update", false, "Update target, copying only missing" +
+ "files or directories")),
+
+ /**
+ * Deletes missing files in target that are missing from source
+ * This allows the target to be in sync with the source contents
+ * Typically used in conjunction with SYNC_FOLDERS
+ * Incompatible with ATOMIC_COMMIT
+ */
+ DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
+ new Option("delete", false, "Delete from target, " +
+ "files missing in source")),
+
+ /**
+ * Configuration file to use with hftps:// for securely copying
+ * files across clusters. Typically the configuration file contains
+ * truststore/keystore information such as location, password and type
+ */
+ SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF,
+ new Option("mapredSslConf", true, "Configuration for ssl config file" +
+ ", to use with hftps://")),
+
+ /**
+ * Max number of maps to use during copy. DistCp will split work
+ * as equally as possible among these maps
+ */
+ MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS,
+ new Option("m", true, "Max number of concurrent maps to use for copy")),
+
+ /**
+ * Source file listing can be provided to DistCp in a file.
+ * This allows DistCp to copy random list of files from source
+ * and copy them to target
+ */
+ SOURCE_FILE_LISTING(DistCpConstants.CONF_LABEL_SOURCE_LISTING,
+ new Option("f", true, "List of files that need to be copied")),
+
+ /**
+ * Copy all the source files and commit them atomically to the target
+ * This is typically useful in cases where there is a process
+ * polling for availability of a file/dir. This option is incompatible
+ * with SYNC_FOLDERS & DELETE_MISSING
+ */
+ ATOMIC_COMMIT(DistCpConstants.CONF_LABEL_ATOMIC_COPY,
+ new Option("atomic", false, "Commit all changes or none")),
+
+ /**
+ * Work path to be used only in conjunction in Atomic commit
+ */
+ WORK_PATH(DistCpConstants.CONF_LABEL_WORK_PATH,
+ new Option("tmp", true, "Intermediate work path to be used for atomic commit")),
+
+ /**
+ * Log path where distcp output logs are written to
+ */
+ LOG_PATH(DistCpConstants.CONF_LABEL_LOG_PATH,
+ new Option("log", true, "Folder on DFS where distcp execution logs are saved")),
+
+ /**
+ * Copy strategy is use. This could be dynamic or uniform size etc.
+ * DistCp would use an appropriate input format based on this.
+ */
+ COPY_STRATEGY(DistCpConstants.CONF_LABEL_COPY_STRATEGY,
+ new Option("strategy", true, "Copy strategy to use. Default is " +
+ "dividing work based on file sizes")),
+
+ /**
+ * Skip CRC checks between source and target, when determining what
+ * files need to be copied.
+ */
+ SKIP_CRC(DistCpConstants.CONF_LABEL_SKIP_CRC,
+ new Option("skipcrccheck", false, "Whether to skip CRC checks between " +
+ "source and target paths.")),
+
+ /**
+ * Overwrite target-files unconditionally.
+ */
+ OVERWRITE(DistCpConstants.CONF_LABEL_OVERWRITE,
+ new Option("overwrite", false, "Choose to overwrite target files " +
+ "unconditionally, even if they exist.")),
+
+ /**
+ * Should DisctpExecution be blocking
+ */
+ BLOCKING("",
+ new Option("async", false, "Should distcp execution be blocking")),
+
+ FILE_LIMIT("",
+ new Option("filelimit", true, "(Deprecated!) Limit number of files " +
+ "copied to <= n")),
+
+ SIZE_LIMIT("",
+ new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
+ "copied to <= n bytes")),
+
+ /**
+ * Specify bandwidth per map in MB
+ */
+ BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+ new Option("bandwidth", true, "Specify bandwidth per map in MB"));
+
+ private final String confLabel;
+ private final Option option;
+
+ DistCpOptionSwitch(String confLabel, Option option) {
+ this.confLabel = confLabel;
+ this.option = option;
+ }
+
+ /**
+ * Get Configuration label for the option
+ * @return configuration label name
+ */
+ public String getConfigLabel() {
+ return confLabel;
+ }
+
+ /**
+ * Get CLI Option corresponding to the distcp option
+ * @return option
+ */
+ public Option getOption() {
+ return option;
+ }
+
+ /**
+ * Get Switch symbol
+ * @return switch symbol char
+ */
+ public String getSwitch() {
+ return option.getOpt();
+ }
+
+ @Override
+ public String toString() {
+ return super.name() + " {" +
+ "confLabel='" + confLabel + '\'' +
+ ", option=" + option + '}';
+ }
+
+ /**
+ * Helper function to add an option to hadoop configuration object
+ * @param conf - Configuration object to include the option
+ * @param option - Option to add
+ * @param value - Value
+ */
+ public static void addToConf(Configuration conf,
+ DistCpOptionSwitch option,
+ String value) {
+ conf.set(option.getConfigLabel(), value);
+ }
+
+ /**
+ * Helper function to set an option to hadoop configuration object
+ * @param conf - Configuration object to include the option
+ * @param option - Option to add
+ */
+ public static void addToConf(Configuration conf,
+ DistCpOptionSwitch option) {
+ conf.set(option.getConfigLabel(), "true");
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.util.DistCpUtils;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * The Options class encapsulates all DistCp options.
+ * These may be set from command-line (via the OptionsParser)
+ * or may be set manually.
+ */
+public class DistCpOptions {
+
+ private boolean atomicCommit = false;
+ private boolean syncFolder = false;
+ private boolean deleteMissing = false;
+ private boolean ignoreFailures = false;
+ private boolean overwrite = false;
+ private boolean skipCRC = false;
+ private boolean blocking = true;
+
+ private int maxMaps = DistCpConstants.DEFAULT_MAPS;
+ private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
+
+ private String sslConfigurationFile;
+
+ private String copyStrategy = DistCpConstants.UNIFORMSIZE;
+
+ private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class);
+
+ private Path atomicWorkPath;
+
+ private Path logPath;
+
+ private Path sourceFileListing;
+ private List<Path> sourcePaths;
+
+ private Path targetPath;
+
+ public static enum FileAttribute{
+ REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION;
+
+ public static FileAttribute getAttribute(char symbol) {
+ for (FileAttribute attribute : values()) {
+ if (attribute.name().charAt(0) == Character.toUpperCase(symbol)) {
+ return attribute;
+ }
+ }
+ throw new NoSuchElementException("No attribute for " + symbol);
+ }
+ }
+
+ /**
+ * Constructor, to initialize source/target paths.
+ * @param sourcePaths List of source-paths (including wildcards)
+ * to be copied to target.
+ * @param targetPath Destination path for the dist-copy.
+ */
+ public DistCpOptions(List<Path> sourcePaths, Path targetPath) {
+ assert sourcePaths != null && !sourcePaths.isEmpty() : "Invalid source paths";
+ assert targetPath != null : "Invalid Target path";
+
+ this.sourcePaths = sourcePaths;
+ this.targetPath = targetPath;
+ }
+
+ /**
+ * Constructor, to initialize source/target paths.
+ * @param sourceFileListing File containing list of source paths
+ * @param targetPath Destination path for the dist-copy.
+ */
+ public DistCpOptions(Path sourceFileListing, Path targetPath) {
+ assert sourceFileListing != null : "Invalid source paths";
+ assert targetPath != null : "Invalid Target path";
+
+ this.sourceFileListing = sourceFileListing;
+ this.targetPath = targetPath;
+ }
+
+ /**
+ * Copy constructor.
+ * @param that DistCpOptions being copied from.
+ */
+ public DistCpOptions(DistCpOptions that) {
+ if (this != that && that != null) {
+ this.atomicCommit = that.atomicCommit;
+ this.syncFolder = that.syncFolder;
+ this.deleteMissing = that.deleteMissing;
+ this.ignoreFailures = that.ignoreFailures;
+ this.overwrite = that.overwrite;
+ this.skipCRC = that.skipCRC;
+ this.blocking = that.blocking;
+ this.maxMaps = that.maxMaps;
+ this.mapBandwidth = that.mapBandwidth;
+ this.sslConfigurationFile = that.getSslConfigurationFile();
+ this.copyStrategy = that.copyStrategy;
+ this.preserveStatus = that.preserveStatus;
+ this.atomicWorkPath = that.getAtomicWorkPath();
+ this.logPath = that.getLogPath();
+ this.sourceFileListing = that.getSourceFileListing();
+ this.sourcePaths = that.getSourcePaths();
+ this.targetPath = that.getTargetPath();
+ }
+ }
+
+ /**
+ * Should the data be committed atomically?
+ *
+ * @return true if data should be committed automically. false otherwise
+ */
+ public boolean shouldAtomicCommit() {
+ return atomicCommit;
+ }
+
+ /**
+ * Set if data need to be committed automatically
+ *
+ * @param atomicCommit - boolean switch
+ */
+ public void setAtomicCommit(boolean atomicCommit) {
+ validate(DistCpOptionSwitch.ATOMIC_COMMIT, atomicCommit);
+ this.atomicCommit = atomicCommit;
+ }
+
+ /**
+ * Should the data be sync'ed between source and target paths?
+ *
+ * @return true if data should be sync'ed up. false otherwise
+ */
+ public boolean shouldSyncFolder() {
+ return syncFolder;
+ }
+
+ /**
+ * Set if source and target folder contents be sync'ed up
+ *
+ * @param syncFolder - boolean switch
+ */
+ public void setSyncFolder(boolean syncFolder) {
+ validate(DistCpOptionSwitch.SYNC_FOLDERS, syncFolder);
+ this.syncFolder = syncFolder;
+ }
+
+ /**
+ * Should target files missing in source should be deleted?
+ *
+ * @return true if zoombie target files to be removed. false otherwise
+ */
+ public boolean shouldDeleteMissing() {
+ return deleteMissing;
+ }
+
+ /**
+ * Set if files only present in target should be deleted
+ *
+ * @param deleteMissing - boolean switch
+ */
+ public void setDeleteMissing(boolean deleteMissing) {
+ validate(DistCpOptionSwitch.DELETE_MISSING, deleteMissing);
+ this.deleteMissing = deleteMissing;
+ }
+
+ /**
+ * Should failures be logged and ignored during copy?
+ *
+ * @return true if failures are to be logged and ignored. false otherwise
+ */
+ public boolean shouldIgnoreFailures() {
+ return ignoreFailures;
+ }
+
+ /**
+ * Set if failures during copy be ignored
+ *
+ * @param ignoreFailures - boolean switch
+ */
+ public void setIgnoreFailures(boolean ignoreFailures) {
+ this.ignoreFailures = ignoreFailures;
+ }
+
+ /**
+ * Should DistCp be running in blocking mode
+ *
+ * @return true if should run in blocking, false otherwise
+ */
+ public boolean shouldBlock() {
+ return blocking;
+ }
+
+ /**
+ * Set if Disctp should run blocking or non-blocking
+ *
+ * @param blocking - boolean switch
+ */
+ public void setBlocking(boolean blocking) {
+ this.blocking = blocking;
+ }
+
+ /**
+ * Should files be overwritten always?
+ *
+ * @return true if files in target that may exist before distcp, should always
+ * be overwritten. false otherwise
+ */
+ public boolean shouldOverwrite() {
+ return overwrite;
+ }
+
+ /**
+ * Set if files should always be overwritten on target
+ *
+ * @param overwrite - boolean switch
+ */
+ public void setOverwrite(boolean overwrite) {
+ validate(DistCpOptionSwitch.OVERWRITE, overwrite);
+ this.overwrite = overwrite;
+ }
+
+ /**
+ * Should CRC/checksum check be skipped while checking files are identical
+ *
+ * @return true if checksum check should be skipped while checking files are
+ * identical. false otherwise
+ */
+ public boolean shouldSkipCRC() {
+ return skipCRC;
+ }
+
+ /**
+ * Set if checksum comparison should be skipped while determining if
+ * source and destination files are identical
+ *
+ * @param skipCRC - boolean switch
+ */
+ public void setSkipCRC(boolean skipCRC) {
+ validate(DistCpOptionSwitch.SKIP_CRC, skipCRC);
+ this.skipCRC = skipCRC;
+ }
+
+ /** Get the max number of maps to use for this copy
+ *
+ * @return Max number of maps
+ */
+ public int getMaxMaps() {
+ return maxMaps;
+ }
+
+ /**
+ * Set the max number of maps to use for copy
+ *
+ * @param maxMaps - Number of maps
+ */
+ public void setMaxMaps(int maxMaps) {
+ this.maxMaps = maxMaps;
+ }
+
+ /** Get the map bandwidth in MB
+ *
+ * @return Bandwidth in MB
+ */
+ public int getMapBandwidth() {
+ return mapBandwidth;
+ }
+
+ /**
+ * Set per map bandwidth
+ *
+ * @param mapBandwidth - per map bandwidth
+ */
+ public void setMapBandwidth(int mapBandwidth) {
+ assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
+ this.mapBandwidth = mapBandwidth;
+ }
+
+ /**
+ * Get path where the ssl configuration file is present to use for hftps://
+ *
+ * @return Path on local file system
+ */
+ public String getSslConfigurationFile() {
+ return sslConfigurationFile;
+ }
+
+ /**
+ * Set the SSL configuration file path to use with hftps:// (local path)
+ *
+ * @param sslConfigurationFile - Local ssl config file path
+ */
+ public void setSslConfigurationFile(String sslConfigurationFile) {
+ this.sslConfigurationFile = sslConfigurationFile;
+ }
+
+ /**
+ * Returns an iterator with the list of file attributes to preserve
+ *
+ * @return iterator of file attributes to preserve
+ */
+ public Iterator<FileAttribute> preserveAttributes() {
+ return preserveStatus.iterator();
+ }
+
+ /**
+ * Checks if the input attibute should be preserved or not
+ *
+ * @param attribute - Attribute to check
+ * @return True if attribute should be preserved, false otherwise
+ */
+ public boolean shouldPreserve(FileAttribute attribute) {
+ return preserveStatus.contains(attribute);
+ }
+
+ /**
+ * Add file attributes that need to be preserved. This method may be
+ * called multiple times to add attributes.
+ *
+ * @param fileAttribute - Attribute to add, one at a time
+ */
+ public void preserve(FileAttribute fileAttribute) {
+ for (FileAttribute attribute : preserveStatus) {
+ if (attribute.equals(fileAttribute)) {
+ return;
+ }
+ }
+ preserveStatus.add(fileAttribute);
+ }
+
+ /** Get work path for atomic commit. If null, the work
+ * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath)
+ *
+ * @return Atomic work path on the target cluster. Null if not set
+ */
+ public Path getAtomicWorkPath() {
+ return atomicWorkPath;
+ }
+
+ /**
+ * Set the work path for atomic commit
+ *
+ * @param atomicWorkPath - Path on the target cluster
+ */
+ public void setAtomicWorkPath(Path atomicWorkPath) {
+ this.atomicWorkPath = atomicWorkPath;
+ }
+
+ /** Get output directory for writing distcp logs. Otherwise logs
+ * are temporarily written to JobStagingDir/_logs and deleted
+ * upon job completion
+ *
+ * @return Log output path on the cluster where distcp job is run
+ */
+ public Path getLogPath() {
+ return logPath;
+ }
+
+ /**
+ * Set the log path where distcp output logs are stored
+ * Uses JobStagingDir/_logs by default
+ *
+ * @param logPath - Path where logs will be saved
+ */
+ public void setLogPath(Path logPath) {
+ this.logPath = logPath;
+ }
+
+ /**
+ * Get the copy strategy to use. Uses appropriate input format
+ *
+ * @return copy strategy to use
+ */
+ public String getCopyStrategy() {
+ return copyStrategy;
+ }
+
+ /**
+ * Set the copy strategy to use. Should map to a strategy implementation
+ * in distp-default.xml
+ *
+ * @param copyStrategy - copy Strategy to use
+ */
+ public void setCopyStrategy(String copyStrategy) {
+ this.copyStrategy = copyStrategy;
+ }
+
+ /**
+ * File path (hdfs:// or file://) that contains the list of actual
+ * files to copy
+ *
+ * @return - Source listing file path
+ */
+ public Path getSourceFileListing() {
+ return sourceFileListing;
+ }
+
+ /**
+ * Getter for sourcePaths.
+ * @return List of source-paths.
+ */
+ public List<Path> getSourcePaths() {
+ return sourcePaths;
+ }
+
+ /**
+ * Setter for sourcePaths.
+ * @param sourcePaths The new list of source-paths.
+ */
+ public void setSourcePaths(List<Path> sourcePaths) {
+ assert sourcePaths != null && sourcePaths.size() != 0;
+ this.sourcePaths = sourcePaths;
+ }
+
+ /**
+ * Getter for the targetPath.
+ * @return The target-path.
+ */
+ public Path getTargetPath() {
+ return targetPath;
+ }
+
+ public void validate(DistCpOptionSwitch option, boolean value) {
+
+ boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
+ value : this.syncFolder);
+ boolean overwrite = (option == DistCpOptionSwitch.OVERWRITE ?
+ value : this.overwrite);
+ boolean deleteMissing = (option == DistCpOptionSwitch.DELETE_MISSING ?
+ value : this.deleteMissing);
+ boolean atomicCommit = (option == DistCpOptionSwitch.ATOMIC_COMMIT ?
+ value : this.atomicCommit);
+ boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
+ value : this.skipCRC);
+
+ if (syncFolder && atomicCommit) {
+ throw new IllegalArgumentException("Atomic commit can't be used with " +
+ "sync folder or overwrite options");
+ }
+
+ if (deleteMissing && !(overwrite || syncFolder)) {
+ throw new IllegalArgumentException("Delete missing is applicable " +
+ "only with update or overwrite options");
+ }
+
+ if (overwrite && syncFolder) {
+ throw new IllegalArgumentException("Overwrite and update options are " +
+ "mutually exclusive");
+ }
+
+ if (!syncFolder && skipCRC) {
+ throw new IllegalArgumentException("Skip CRC is valid only with update options");
+ }
+
+ }
+
+ /**
+ * Add options to configuration. These will be used in the Mapper/committer
+ *
+ * @param conf - Configruation object to which the options need to be added
+ */
+ public void appendToConf(Configuration conf) {
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
+ String.valueOf(atomicCommit));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
+ String.valueOf(ignoreFailures));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
+ String.valueOf(syncFolder));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
+ String.valueOf(deleteMissing));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
+ String.valueOf(overwrite));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
+ String.valueOf(skipCRC));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
+ String.valueOf(mapBandwidth));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+ }
+
+ /**
+ * Utility to easily string-ify Options, for logging.
+ *
+ * @return String representation of the Options.
+ */
+ @Override
+ public String toString() {
+ return "DistCpOptions{" +
+ "atomicCommit=" + atomicCommit +
+ ", syncFolder=" + syncFolder +
+ ", deleteMissing=" + deleteMissing +
+ ", ignoreFailures=" + ignoreFailures +
+ ", maxMaps=" + maxMaps +
+ ", sslConfigurationFile='" + sslConfigurationFile + '\'' +
+ ", copyStrategy='" + copyStrategy + '\'' +
+ ", sourceFileListing=" + sourceFileListing +
+ ", sourcePaths=" + sourcePaths +
+ ", targetPath=" + targetPath +
+ '}';
+ }
+
+ @Override
+ protected DistCpOptions clone() throws CloneNotSupportedException {
+ return (DistCpOptions) super.clone();
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * FileBasedCopyListing implements the CopyListing interface,
+ * to create the copy-listing for DistCp,
+ * by iterating over all source paths mentioned in a specified input-file.
+ */
+public class FileBasedCopyListing extends CopyListing {
+
+ private final CopyListing globbedListing;
+ /**
+ * Constructor, to initialize base-class.
+ * @param configuration The input Configuration object.
+ * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+ * delegation token caching is skipped
+ */
+ public FileBasedCopyListing(Configuration configuration, Credentials credentials) {
+ super(configuration, credentials);
+ globbedListing = new GlobbedCopyListing(getConf(), credentials);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void validatePaths(DistCpOptions options)
+ throws IOException, InvalidInputException {
+ }
+
+ /**
+ * Implementation of CopyListing::buildListing().
+ * Iterates over all source paths mentioned in the input-file.
+ * @param pathToListFile Path on HDFS where the listing file is written.
+ * @param options Input Options for DistCp (indicating source/target paths.)
+ * @throws IOException
+ */
+ @Override
+ public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException {
+ DistCpOptions newOption = new DistCpOptions(options);
+ newOption.setSourcePaths(fetchFileList(options.getSourceFileListing()));
+ globbedListing.buildListing(pathToListFile, newOption);
+ }
+
+ private List<Path> fetchFileList(Path sourceListing) throws IOException {
+ List<Path> result = new ArrayList<Path>();
+ FileSystem fs = sourceListing.getFileSystem(getConf());
+ BufferedReader input = null;
+ try {
+ input = new BufferedReader(new InputStreamReader(fs.open(sourceListing)));
+ String line = input.readLine();
+ while (line != null) {
+ result.add(new Path(line));
+ line = input.readLine();
+ }
+ } finally {
+ IOUtils.closeStream(input);
+ }
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getBytesToCopy() {
+ return globbedListing.getBytesToCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getNumberOfPaths() {
+ return globbedListing.getNumberOfPaths();
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * GlobbedCopyListing implements the CopyListing interface, to create the copy
+ * listing-file by "globbing" all specified source paths (wild-cards and all.)
+ */
+public class GlobbedCopyListing extends CopyListing {
+ private static final Log LOG = LogFactory.getLog(GlobbedCopyListing.class);
+
+ private final CopyListing simpleListing;
+ /**
+ * Constructor, to initialize the configuration.
+ * @param configuration The input Configuration object.
+ * @param credentials Credentials object on which the FS delegation tokens are cached. If null
+ * delegation token caching is skipped
+ */
+ public GlobbedCopyListing(Configuration configuration, Credentials credentials) {
+ super(configuration, credentials);
+ simpleListing = new SimpleCopyListing(getConf(), credentials) ;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void validatePaths(DistCpOptions options)
+ throws IOException, InvalidInputException {
+ }
+
+ /**
+ * Implementation of CopyListing::buildListing().
+ * Creates the copy listing by "globbing" all source-paths.
+ * @param pathToListingFile The location at which the copy-listing file
+ * is to be created.
+ * @param options Input Options for DistCp (indicating source/target paths.)
+ * @throws IOException
+ */
+ @Override
+ public void doBuildListing(Path pathToListingFile,
+ DistCpOptions options) throws IOException {
+
+ List<Path> globbedPaths = new ArrayList<Path>();
+ if (options.getSourcePaths().isEmpty()) {
+ throw new InvalidInputException("Nothing to process. Source paths::EMPTY");
+ }
+
+ for (Path p : options.getSourcePaths()) {
+ FileSystem fs = p.getFileSystem(getConf());
+ FileStatus[] inputs = fs.globStatus(p);
+
+ if(inputs != null && inputs.length > 0) {
+ for (FileStatus onePath: inputs) {
+ globbedPaths.add(onePath.getPath());
+ }
+ } else {
+ throw new InvalidInputException(p + " doesn't exist");
+ }
+ }
+
+ DistCpOptions optionsGlobbed = new DistCpOptions(options);
+ optionsGlobbed.setSourcePaths(globbedPaths);
+ simpleListing.buildListing(pathToListingFile, optionsGlobbed);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getBytesToCopy() {
+ return simpleListing.getBytesToCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getNumberOfPaths() {
+ return simpleListing.getNumberOfPaths();
+ }
+
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.cli.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+
+import java.util.*;
+
+/**
+ * The OptionsParser parses out the command-line options passed to DistCp,
+ * and interprets those specific to DistCp, to create an Options object.
+ */
+public class OptionsParser {
+
+ private static final Log LOG = LogFactory.getLog(OptionsParser.class);
+
+ private static final Options cliOptions = new Options();
+
+ static {
+ for (DistCpOptionSwitch option : DistCpOptionSwitch.values()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding option " + option.getOption());
+ }
+ cliOptions.addOption(option.getOption());
+ }
+ }
+
+ private static class CustomParser extends GnuParser {
+ @Override
+ protected String[] flatten(Options options, String[] arguments, boolean stopAtNonOption) {
+ for (int index = 0; index < arguments.length; index++) {
+ if (arguments[index].equals("-" + DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+ arguments[index] = "-prbugp";
+ }
+ }
+ return super.flatten(options, arguments, stopAtNonOption);
+ }
+ }
+
+ /**
+ * The parse method parses the command-line options, and creates
+ * a corresponding Options object.
+ * @param args Command-line arguments (excluding the options consumed
+ * by the GenericOptionsParser).
+ * @return The Options object, corresponding to the specified command-line.
+ * @throws IllegalArgumentException: Thrown if the parse fails.
+ */
+ public static DistCpOptions parse(String args[]) throws IllegalArgumentException {
+
+ CommandLineParser parser = new CustomParser();
+
+ CommandLine command;
+ try {
+ command = parser.parse(cliOptions, args, true);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Unable to parse arguments. " +
+ Arrays.toString(args), e);
+ }
+
+ DistCpOptions option;
+ Path targetPath;
+ List<Path> sourcePaths = new ArrayList<Path>();
+
+ String leftOverArgs[] = command.getArgs();
+ if (leftOverArgs == null || leftOverArgs.length < 1) {
+ throw new IllegalArgumentException("Target path not specified");
+ }
+
+ //Last Argument is the target path
+ targetPath = new Path(leftOverArgs[leftOverArgs.length -1].trim());
+
+ //Copy any source paths in the arguments to the list
+ for (int index = 0; index < leftOverArgs.length - 1; index++) {
+ sourcePaths.add(new Path(leftOverArgs[index].trim()));
+ }
+
+ /* If command has source file listing, use it else, fall back on source paths in args
+ If both are present, throw exception and bail */
+ if (command.hasOption(DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) {
+ if (!sourcePaths.isEmpty()) {
+ throw new IllegalArgumentException("Both source file listing and source paths present");
+ }
+ option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
+ SOURCE_FILE_LISTING.getSwitch())), targetPath);
+ } else {
+ if (sourcePaths.isEmpty()) {
+ throw new IllegalArgumentException("Neither source file listing nor source paths present");
+ }
+ option = new DistCpOptions(sourcePaths, targetPath);
+ }
+
+ //Process all the other option switches and set options appropriately
+ if (command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) {
+ option.setIgnoreFailures(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch())) {
+ option.setAtomicCommit(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch()) &&
+ option.shouldAtomicCommit()) {
+ String workPath = getVal(command, DistCpOptionSwitch.WORK_PATH.getSwitch());
+ if (workPath != null && !workPath.isEmpty()) {
+ option.setAtomicWorkPath(new Path(workPath));
+ }
+ } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
+ throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic");
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
+ option.setLogPath(new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch())));
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch())) {
+ option.setSyncFolder(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch())) {
+ option.setOverwrite(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
+ option.setDeleteMissing(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch())) {
+ option.setSkipCRC(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) {
+ option.setBlocking(false);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
+ try {
+ Integer mapBandwidth = Integer.parseInt(
+ getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
+ option.setMapBandwidth(mapBandwidth);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Bandwidth specified is invalid: " +
+ getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
+ }
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) {
+ option.setSslConfigurationFile(command.
+ getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
+ try {
+ Integer maps = Integer.parseInt(
+ getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
+ option.setMaxMaps(maps);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Number of maps is invalid: " +
+ getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
+ }
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
+ option.setCopyStrategy(
+ getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+ String attributes =
+ getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch());
+ if (attributes == null || attributes.isEmpty()) {
+ for (FileAttribute attribute : FileAttribute.values()) {
+ option.preserve(attribute);
+ }
+ } else {
+ for (int index = 0; index < attributes.length(); index++) {
+ option.preserve(FileAttribute.
+ getAttribute(attributes.charAt(index)));
+ }
+ }
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
+ String fileLimitString = getVal(command,
+ DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
+ try {
+ Integer.parseInt(fileLimitString);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("File-limit is invalid: "
+ + fileLimitString, e);
+ }
+ LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
+ " option. Ignoring.");
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
+ String sizeLimitString = getVal(command,
+ DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
+ try {
+ Long.parseLong(sizeLimitString);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Size-limit is invalid: "
+ + sizeLimitString, e);
+ }
+ LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
+ " option. Ignoring.");
+ }
+
+ return option;
+ }
+
+ private static String getVal(CommandLine command, String swtch) {
+ String optionValue = command.getOptionValue(swtch);
+ if (optionValue == null) {
+ return null;
+ } else {
+ return optionValue.trim();
+ }
+ }
+
+ public static void usage() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("distcp OPTIONS [source_path...] <target_path>\n\nOPTIONS", cliOptions);
+ }
+}