You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2013/03/28 06:00:10 UTC
svn commit: r1461952 [1/6] - in /hadoop/common/branches/branch-1: ./ bin/
src/docs/src/documentation/content/xdocs/ src/test/
src/test/org/apache/hadoop/tools/distcp2/
src/test/org/apache/hadoop/tools/distcp2/mapred/
src/test/org/apache/hadoop/tools/di...
Author: szetszwo
Date: Thu Mar 28 05:00:09 2013
New Revision: 1461952
URL: http://svn.apache.org/r1461952
Log:
MAPREDUCE-5081. Backport DistCpV2.
Added:
hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/distcp2.xml
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/StubContext.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestCopyListing.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestFileBasedCopyListing.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestGlobbedCopyListing.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestIntegration.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestOptionsParser.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyCommitter.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyMapper.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyOutputFormat.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestUniformSizeInputFormat.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/lib/TestDynamicInputFormat.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestDistCpUtils.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestRetriableCommand.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/util/TestThrottledInputStream.java
hadoop/common/branches/branch-1/src/test/sslConfig.xml
hadoop/common/branches/branch-1/src/tools/distcp-default.xml
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyMapper.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyOutputFormat.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableDirectoryCreateCommand.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableFileCopyCommand.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/UniformSizeInputFormat.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputChunk.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputFormat.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicRecordReader.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/util/
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/util/DistCpUtils.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/util/RetriableCommand.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/util/ThrottledInputStream.java
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/bin/hadoop
hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/site.xml
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1461952&r1=1461951&r2=1461952&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Thu Mar 28 05:00:09 2013
@@ -94,6 +94,8 @@ Release 1.2.0 - unreleased
HDFS-4597. Backport WebHDFS concat. (szetszwo)
+ MAPREDUCE-5081. Backport DistCpV2. (szetszwo)
+
IMPROVEMENTS
HADOOP-9434. Backport HADOOP-9267: hadoop -h|-{0,2}help should print usage.
Modified: hadoop/common/branches/branch-1/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/bin/hadoop?rev=1461952&r1=1461951&r2=1461952&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/bin/hadoop (original)
+++ hadoop/common/branches/branch-1/bin/hadoop Thu Mar 28 05:00:09 2013
@@ -91,6 +91,7 @@ print_usage()
echo " version print the version"
echo " jar <jar> run a jar file"
echo " distcp <srcurl> <desturl> copy file or directories recursively"
+ echo " distcp2 <srcurl> <desturl> DistCp version 2"
echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive"
echo " classpath prints the class path needed to get the"
echo " Hadoop jar and the required libraries"
@@ -316,6 +317,10 @@ elif [ "$COMMAND" = "distcp" ] ; then
CLASS=org.apache.hadoop.tools.DistCp
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+elif [ "$COMMAND" = "distcp2" ] ; then
+ CLASS=org.apache.hadoop.tools.distcp2.DistCp
+ CLASSPATH=${CLASSPATH}:${TOOL_PATH}
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "daemonlog" ] ; then
CLASS=org.apache.hadoop.log.LogLevel
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
Added: hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/distcp2.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/distcp2.xml?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/distcp2.xml (added)
+++ hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/distcp2.xml Thu Mar 28 05:00:09 2013
@@ -0,0 +1,673 @@
+<?xml version="1.0"?>
+<!--
+ Copyright 2002-2004 The Apache Software Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document>
+
+ <header>
+ <title>DistCp Version 2 Guide</title>
+ </header>
+
+ <body>
+
+ <section id="Overview">
+ <title>Overview</title>
+
+ <p>DistCp Version 2 (distributed copy) is a tool used for large inter/intra-cluster
+ copying. It uses MapReduce to effect its distribution, error
+ handling and recovery, and reporting. It expands a list of files and
+ directories into input to map tasks, each of which will copy a partition
+ of the files specified in the source list.
+ </p>
+ <p>
+ The erstwhile implementation of DistCp has its share of quirks and
+ drawbacks, both in its usage, as well as its extensibility and
+ performance. The purpose of the DistCp refactor was to fix these shortcomings,
+ enabling it to be used and extended programmatically. New paradigms have
+ been introduced to improve runtime and setup performance, while simultaneously
+ retaining the legacy behaviour as default.
+ </p>
+ <p>
+ This document aims to describe the design of the new DistCp, its spanking
+ new features, their optimal use, and any deviance from the legacy
+ implementation.
+ </p>
+ </section>
+
+ <section id="Usage">
+ <title>Usage</title>
+ <section id="BasicUsage">
+ <title>Basic Usage</title>
+ <p>The most common invocation of DistCp is an inter-cluster copy:</p>
+ <p><code>bash$ hadoop distcp2 hdfs://nn1:8020/foo/bar \</code><br/>
+ <code> hdfs://nn2:8020/bar/foo</code></p>
+
+ <p>This will expand the namespace under <code>/foo/bar</code> on nn1
+ into a temporary file, partition its contents among a set of map
+ tasks, and start a copy on each TaskTracker from nn1 to nn2.</p>
+
+ <p>One can also specify multiple source directories on the command
+ line:</p>
+ <p><code>bash$ hadoop distcp2 hdfs://nn1:8020/foo/a \</code><br/>
+ <code> hdfs://nn1:8020/foo/b \</code><br/>
+ <code> hdfs://nn2:8020/bar/foo</code></p>
+
+ <p>Or, equivalently, from a file using the <code>-f</code> option:<br/>
+ <code>bash$ hadoop distcp2 -f hdfs://nn1:8020/srclist \</code><br/>
+ <code> hdfs://nn2:8020/bar/foo</code><br/></p>
+
+ <p>Where <code>srclist</code> contains<br/>
+ <code>hdfs://nn1:8020/foo/a</code><br/>
+ <code>hdfs://nn1:8020/foo/b</code></p>
+
+ <p>When copying from multiple sources, DistCp will abort the copy with
+ an error message if two sources collide, but collisions at the
+ destination are resolved per the <a href="#CommandLineOptions">options</a>
+ specified. By default, files already existing at the destination are
+ skipped (i.e. not replaced by the source file). A count of skipped
+ files is reported at the end of each job, but it may be inaccurate if a
+ copier failed for some subset of its files, but succeeded on a later
+ attempt.</p>
+
+ <p>It is important that each TaskTracker can reach and communicate with
+ both the source and destination file systems. For HDFS, both the source
+ and destination must be running the same version of the protocol or use
+ a backwards-compatible protocol;
+ see <a href="#CopyingBetweenVersionsOfHDFS">Copying Between Versions</a>.
+ </p>
+
+ <p>After a copy, it is recommended that one generates and cross-checks
+ a listing of the source and destination to verify that the copy was
+ truly successful. Since DistCp employs both Map/Reduce and the
+ FileSystem API, issues in or between any of the three could adversely
+ and silently affect the copy. Some have had success running with
+ <code>-update</code> enabled to perform a second pass, but users should
+ be acquainted with its semantics before attempting this.</p>
+
+ <p>It's also worth noting that if another client is still writing to a
+ source file, the copy will likely fail. Attempting to overwrite a file
+ being written at the destination should also fail on HDFS. If a source
+ file is (re)moved before it is copied, the copy will fail with a
+ FileNotFoundException.</p>
+
+ <p>Please refer to the detailed Command Line Reference for information
+ on all the options available in DistCp.</p>
+ </section>
+
+ <section id="UpdateAndOverwrite">
+ <title>Update and Overwrite</title>
+ <p><code>-update</code> is used to copy files from source that don't
+ exist at the target, or have different contents. <code>-overwrite</code>
+ overwrites target-files even if they exist at the source, or have the
+ same contents.</p>
+
+ <p><br/>Update and Overwrite options warrant special attention, since their
+ handling of source-paths varies from the defaults in a very subtle manner.
+ Consider a copy from <code>/source/first/</code> and
+ <code>/source/second/</code> to <code>/target/</code>, where the source
+ paths have the following contents:</p>
+
+ <p><code>hdfs://nn1:8020/source/first/1</code><br/>
+ <code>hdfs://nn1:8020/source/first/2</code><br/>
+ <code>hdfs://nn1:8020/source/second/10</code><br/>
+ <code>hdfs://nn1:8020/source/second/20</code><br/></p>
+
+ <p><br/>When DistCp is invoked without <code>-update</code> or
+ <code>-overwrite</code>, the DistCp defaults would create directories
+ <code>first/</code> and <code>second/</code>, under <code>/target</code>.
+ Thus:<br/></p>
+
+ <p><code>distcp2 hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target</code></p>
+ <p><br/>would yield the following contents in <code>/target</code>: </p>
+
+ <p><code>hdfs://nn2:8020/target/first/1</code><br/>
+ <code>hdfs://nn2:8020/target/first/2</code><br/>
+ <code>hdfs://nn2:8020/target/second/10</code><br/>
+ <code>hdfs://nn2:8020/target/second/20</code><br/></p>
+
+ <p><br/>When either <code>-update</code> or <code>-overwrite</code> is
+ specified, the <strong>contents</strong> of the source-directories
+ are copied to target, and not the source directories themselves. Thus: </p>
+
+ <p><code>distcp2 -update hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target</code></p>
+
+ <p><br/>would yield the following contents in <code>/target</code>: </p>
+
+ <p><code>hdfs://nn2:8020/target/1</code><br/>
+ <code>hdfs://nn2:8020/target/2</code><br/>
+ <code>hdfs://nn2:8020/target/10</code><br/>
+ <code>hdfs://nn2:8020/target/20</code><br/></p>
+
+ <p><br/>By extension, if both source folders contained a file with the same
+ name (say, <code>0</code>), then both sources would map an entry to
+ <code>/target/0</code> at the destination. Rather than to permit this
+ conflict, DistCp will abort.</p>
+
+ <p><br/>Now, consider the following copy operation:</p>
+
+ <p><code>distcp2 hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target</code></p>
+
+ <p><br/>With sources/sizes:</p>
+
+ <p><code>hdfs://nn1:8020/source/first/1 32</code><br/>
+ <code>hdfs://nn1:8020/source/first/2 32</code><br/>
+ <code>hdfs://nn1:8020/source/second/10 64</code><br/>
+ <code>hdfs://nn1:8020/source/second/20 32</code><br/></p>
+
+ <p><br/>And destination/sizes:</p>
+
+ <p><code>hdfs://nn2:8020/target/1 32</code><br/>
+ <code>hdfs://nn2:8020/target/10 32</code><br/>
+ <code>hdfs://nn2:8020/target/20 64</code><br/></p>
+
+ <p><br/>Will effect: </p>
+
+ <p><code>hdfs://nn2:8020/target/1 32</code><br/>
+ <code>hdfs://nn2:8020/target/2 32</code><br/>
+ <code>hdfs://nn2:8020/target/10 64</code><br/>
+ <code>hdfs://nn2:8020/target/20 32</code><br/></p>
+
+ <p><br/><code>1</code> is skipped because the file-length and contents match.
+ <code>2</code> is copied because it doesn't exist at the target.
+ <code>10</code> and <code>20</code> are overwritten since the contents
+ don't match the source. </p>
+
+ <p>If <code>-update</code> is used, <code>1</code> is overwritten as well.</p>
+ </section>
+ </section>
+
+ <section id="CommandLineOptions">
+ <title>Command Line Options</title>
+ <table>
+ <tr><th> Flag </th><th> Description </th><th> Notes </th></tr>
+
+ <tr><td><code>-p[rbugp]</code></td>
+ <td>Preserve<br/>
+ r: replication number<br/>
+ b: block size<br/>
+ u: user<br/>
+ g: group<br/>
+ p: permission<br/></td>
+ <td>Modification times are not preserved. Also, when
+ <code>-update</code> is specified, status updates will
+ <strong>not</strong> be synchronized unless the file sizes
+ also differ (i.e. unless the file is re-created).
+ </td></tr>
+ <tr><td><code>-i</code></td>
+ <td>Ignore failures</td>
+ <td>As explained in the Appendix, this option
+ will keep more accurate statistics about the copy than the
+ default case. It also preserves logs from failed copies, which
+ can be valuable for debugging. Finally, a failing map will not
+ cause the job to fail before all splits are attempted.
+ </td></tr>
+ <tr><td><code>-log <logdir></code></td>
+ <td>Write logs to <logdir></td>
+ <td>DistCp keeps logs of each file it attempts to copy as map
+ output. If a map fails, the log output will not be retained if
+ it is re-executed.
+ </td></tr>
+ <tr><td><code>-m <num_maps></code></td>
+ <td>Maximum number of simultaneous copies</td>
+ <td>Specify the number of maps to copy data. Note that more maps
+ may not necessarily improve throughput.
+ </td></tr>
+ <tr><td><code>-overwrite</code></td>
+ <td>Overwrite destination</td>
+ <td>If a map fails and <code>-i</code> is not specified, all the
+ files in the split, not only those that failed, will be recopied.
+ As discussed in the Usage documentation, it also changes
+ the semantics for generating destination paths, so users should
+ use this carefully.
+ </td></tr>
+ <tr><td><code>-update</code></td>
+ <td>Overwrite if src size different from dst size</td>
+ <td>As noted in the preceding, this is not a "sync"
+ operation. The only criterion examined is the source and
+ destination file sizes; if they differ, the source file
+ replaces the destination file. As discussed in the
+ Usage documentation, it also changes the semantics for
+ generating destination paths, so users should use this carefully.
+ </td></tr>
+ <tr><td><code>-f <urilist_uri></code></td>
+ <td>Use list at <urilist_uri> as src list</td>
+ <td>This is equivalent to listing each source on the command
+ line. The <code>urilist_uri</code> list should be a fully
+ qualified URI.
+ </td></tr>
+ <tr><td><code>-filelimit <n></code></td>
+ <td>Limit the total number of files to be <= n</td>
+ <td><strong>Deprecated!</strong> Ignored in the new DistCp.
+ </td></tr>
+ <tr><td><code>-sizelimit <n></code></td>
+ <td>Limit the total size to be <= n bytes</td>
+ <td><strong>Deprecated!</strong> Ignored in the new DistCp.
+ </td></tr>
+ <tr><td><code>-delete</code></td>
+ <td>Delete the files existing in the dst but not in src</td>
+ <td>The deletion is done by FS Shell. So the trash will be used,
+ if it is enable.
+ </td></tr>
+ <tr><td><code>-strategy {dynamic|uniformsize}</code></td>
+ <td>Choose the copy-strategy to be used in DistCp.</td>
+ <td>By default, uniformsize is used. (i.e. Maps are balanced on the
+ total size of files copied by each map. Similar to legacy.)
+ If "dynamic" is specified, <code>DynamicInputFormat</code> is
+ used instead. (This is described in the Architecture section,
+ under InputFormats.)
+ </td></tr>
+ <tr><td><code>-bandwidth</code></td>
+ <td>Specify bandwidth per map, in MB/second.</td>
+ <td>Each map will be restricted to consume only the specified
+ bandwidth. This is not always exact. The map throttles back
+ its bandwidth consumption during a copy, such that the
+ <strong>net</strong> bandwidth used tends towards the
+ specified value.
+ </td></tr>
+ <tr><td><code>-atomic {-tmp <tmp_dir>}</code></td>
+ <td>Specify atomic commit, with optional tmp directory.</td>
+ <td><code>-atomic</code> instructs DistCp to copy the source
+ data to a temporary target location, and then move the
+ temporary target to the final-location atomically. Data will
+ either be available at final target in a complete and consistent
+ form, or not at all.
+ Optionally, <code>-tmp</code> may be used to specify the
+ location of the tmp-target. If not specified, a default is
+ chosen. <strong>Note:</strong> tmp_dir must be on the final
+ target cluster.
+ </td></tr>
+ <tr><td><code>-mapredSslConf <ssl_conf_file></code></td>
+ <td>Specify SSL Config file, to be used with HSFTP source</td>
+ <td>When using the hsftp protocol with a source, the security-
+ related properties may be specified in a config-file and
+ passed to DistCp. <ssl_conf_file> needs to be in
+ the classpath.
+ </td></tr>
+ <tr><td><code>-async</code></td>
+ <td>Run DistCp asynchronously. Quits as soon as the Hadoop
+ Job is launched.</td>
+ <td>The Hadoop Job-id is logged, for tracking.
+ </td></tr>
+ </table>
+ </section>
+
+ <section id="ArchitectureOfDistCp">
+ <title>Architecture of DistCp</title>
+ <p>The components of the new DistCp may be classified into the following
+ categories: </p>
+
+ <ul>
+ <li>DistCp Driver</li>
+ <li>Copy-listing generator</li>
+ <li>Input-formats and Map-Reduce components</li>
+ </ul>
+
+ <section id="DistCpDriver">
+ <title>DistCp Driver</title>
+
+ <p>The DistCp Driver components are responsible for:</p>
+
+ <ul>
+ <li>Parsing the arguments passed to the DistCp command on the
+ command-line, via:
+ <ul>
+ <li>OptionsParser, and</li>
+ <li>DistCpOptionsSwitch</li>
+ </ul>
+ </li>
+ <li>Assembling the command arguments into an appropriate
+ DistCpOptions object, and initializing DistCp. These arguments
+ include:
+ <ul>
+ <li>Source-paths</li>
+ <li>Target location</li>
+ <li>Copy options (e.g. whether to update-copy, overwrite, which
+ file-attributes to preserve, etc.)</li>
+ </ul>
+ </li>
+ <li>Orchestrating the copy operation by:
+ <ul>
+ <li>Invoking the copy-listing-generator to create the list of
+ files to be copied.</li>
+ <li>Setting up and launching the Hadoop Map-Reduce Job to carry
+ out the copy.</li>
+ <li>Based on the options, either returning a handle to the
+ Hadoop MR Job immediately, or waiting till completion.</li>
+ </ul>
+ </li>
+ </ul>
+
+ <p>The parser-elements are exercised only from the command-line (or if
+ DistCp::run() is invoked). The DistCp class may also be used
+ programmatically, by constructing the DistCpOptions object, and
+ initializing a DistCp object appropriately.</p>
+ </section>
+
+ <section id="Copy-listingGenerator">
+ <title>Copy-listing Generator</title>
+
+ <p>The copy-listing-generator classes are responsible for creating the
+ list of files/directories to be copied from source. They examine
+ the contents of the source-paths (files/directories, including
+ wild-cards), and record all paths that need copy into a sequence-
+ file, for consumption by the DistCp Hadoop Job. The main classes in
+ this module include:</p>
+
+ <ol>
+ <li>CopyListing: The interface that should be implemented by any
+ copy-listing-generator implementation. Also provides the factory
+ method by which the concrete CopyListing implementation is
+ chosen.</li>
+
+ <li>SimpleCopyListing: An implementation of CopyListing that accepts
+ multiple source paths (files/directories), and recursively lists
+ all the individual files and directories under each, for
+ copy.</li>
+
+ <li>GlobbedCopyListing: Another implementation of CopyListing that
+ expands wild-cards in the source paths.</li>
+
+ <li>FileBasedCopyListing: An implementation of CopyListing that
+ reads the source-path list from a specified file.</li>
+ </ol>
+
+ <p>Based on whether a source-file-list is specified in the
+ DistCpOptions, the source-listing is generated in one of the
+ following ways:</p>
+
+ <ol>
+ <li>If there's no source-file-list, the GlobbedCopyListing is used.
+ All wild-cards are expanded, and all the expansions are
+ forwarded to the SimpleCopyListing, which in turn constructs the
+ listing (via recursive descent of each path). </li>
+
+ <li>If a source-file-list is specified, the FileBasedCopyListing is
+ used. Source-paths are read from the specified file, and then
+ forwarded to the GlobbedCopyListing. The listing is then
+ constructed as described above.</li>
+ </ol>
+
+ <p>One may customize the method by which the copy-listing is
+ constructed by providing a custom implementation of the CopyListing
+ interface. The behaviour of DistCp differs here from the legacy
+ DistCp, in how paths are considered for copy. </p>
+
+ <p>The legacy implementation only lists those paths that must
+ definitely be copied on to target.
+ E.g. if a file already exists at the target (and -overwrite isn't
+ specified), the file isn't even considered in the Map-Reduce Copy
+ Job. Determining this during setup (i.e. before the Map-Reduce Job)
+ involves file-size and checksum-comparisons that are potentially
+ time-consuming.</p>
+
+ <p>The new DistCp postpones such checks until the Map-Reduce Job, thus
+ reducing setup time. Performance is enhanced further since these
+ checks are parallelized across multiple maps.</p>
+ </section>
+
+ <section id="Input-formatsAndMap-ReduceComponents">
+ <title>Input-formats and Map-Reduce Components</title>
+
+ <p> The Input-formats and Map-Reduce components are responsible for
+ the actual copy of files and directories from the source to the
+ destination path. The listing-file created during copy-listing
+ generation is consumed at this point, when the copy is carried
+ out. The classes of interest here include:</p>
+
+ <ul>
+ <li><strong>UniformSizeInputFormat:</strong> This implementation of
+ org.apache.hadoop.mapreduce.InputFormat provides equivalence
+ with Legacy DistCp in balancing load across maps.
+ The aim of the UniformSizeInputFormat is to make each map copy
+ roughly the same number of bytes. Apropos, the listing file is
+ split into groups of paths, such that the sum of file-sizes in
+ each InputSplit is nearly equal to every other map. The splitting
+ isn't always perfect, but its trivial implementation keeps the
+ setup-time low.</li>
+
+ <li><strong>DynamicInputFormat and DynamicRecordReader:</strong>
+ <p> The DynamicInputFormat implements org.apache.hadoop.mapreduce.InputFormat,
+ and is new to DistCp. The listing-file is split into several
+ "chunk-files", the exact number of chunk-files being a multiple
+ of the number of maps requested for in the Hadoop Job. Each map
+ task is "assigned" one of the chunk-files (by renaming the chunk
+ to the task's id), before the Job is launched.</p>
+
+ <p>Paths are read from each chunk using the DynamicRecordReader,
+ and processed in the CopyMapper. After all the paths in a chunk
+ are processed, the current chunk is deleted and a new chunk is
+ acquired. The process continues until no more chunks are
+ available.</p>
+ <p>This "dynamic" approach allows faster map-tasks to consume
+ more paths than slower ones, thus speeding up the DistCp job
+ overall. </p>
+ </li>
+
+ <li><strong>CopyMapper:</strong> This class implements the physical
+ file-copy. The input-paths are checked against the input-options
+ (specified in the Job's Configuration), to determine whether a
+ file needs copy. A file will be copied only if at least one of
+ the following is true:
+ <ul>
+ <li>A file with the same name doesn't exist at target.</li>
+ <li>A file with the same name exists at target, but has a
+ different file size.</li>
+ <li>A file with the same name exists at target, but has a
+ different checksum, and -skipcrccheck isn't mentioned.</li>
+ <li>A file with the same name exists at target, but -overwrite
+ is specified.</li>
+ <li>A file with the same name exists at target, but differs in
+ block-size (and block-size needs to be preserved.</li>
+ </ul>
+ </li>
+
+ <li><strong>CopyCommitter:</strong>
+ This class is responsible for the commit-phase of the DistCp
+ job, including:
+ <ul>
+ <li>Preservation of directory-permissions (if specified in the
+ options)</li>
+ <li>Clean-up of temporary-files, work-directories, etc.</li>
+ </ul>
+ </li>
+ </ul>
+ </section>
+ </section>
+
+ <section id="Appendix">
+ <title>Appendix</title>
+
+ <section id="MapSizing">
+ <title>Map sizing</title>
+
+ <p> By default, DistCp makes an attempt to size each map comparably so
+ that each copies roughly the same number of bytes. Note that files are the
+ finest level of granularity, so increasing the number of simultaneous
+ copiers (i.e. maps) may not always increase the number of
+ simultaneous copies nor the overall throughput.</p>
+
+ <p> The new DistCp also provides a strategy to "dynamically" size maps,
+ allowing faster data-nodes to copy more bytes than slower nodes. Using
+ <code>-strategy dynamic</code> (explained in the Architecture), rather
+ than to assign a fixed set of source-files to each map-task, files are
+ instead split into several sets. The number of sets exceeds the number of
+ maps, usually by a factor of 2-3. Each map picks up and copies all files
+ listed in a chunk. When a chunk is exhausted, a new chunk is acquired and
+ processed, until no more chunks remain.</p>
+
+ <p> By not assigning a source-path to a fixed map, faster map-tasks (i.e.
+ data-nodes) are able to consume more chunks, and thus copy more data,
+ than slower nodes. While this distribution isn't uniform, it is
+ <strong>fair</strong> with regard to each mapper's capacity.</p>
+
+ <p>The dynamic-strategy is implemented by the DynamicInputFormat. It
+ provides superior performance under most conditions. </p>
+
+ <p>Tuning the number of maps to the size of the source and
+ destination clusters, the size of the copy, and the available
+ bandwidth is recommended for long-running and regularly run jobs.</p>
+ </section>
+
+ <section id="CopyingBetweenVersionsOfHDFS">
+ <title>Copying Between Versions of HDFS</title>
+
+ <p>For copying between two different versions of Hadoop, one will
+ usually use HftpFileSystem. This is a read-only FileSystem, so DistCp
+ must be run on the destination cluster (more specifically, on
+ TaskTrackers that can write to the destination cluster). Each source is
+ specified as <code>hftp://<dfs.http.address>/<path></code>
+ (the default <code>dfs.http.address</code> is
+ <namenode>:50070).</p>
+ </section>
+
+ <section id="MapReduceAndOtherSide-effects">
+ <title>Map/Reduce and other side-effects</title>
+
+ <p>As has been mentioned in the preceding, should a map fail to copy
+ one of its inputs, there will be several side-effects.</p>
+
+ <ul>
+ <li>Unless <code>-overwrite</code> is specified, files successfully
+ copied by a previous map on a re-execution will be marked as
+ "skipped".</li>
+
+ <li>If a map fails <code>mapred.map.max.attempts</code> times, the
+ remaining map tasks will be killed (unless <code>-i</code> is
+ set).</li>
+
+ <li>If <code>mapred.speculative.execution</code> is set set
+ <code>final</code> and <code>true</code>, the result of the copy is
+ undefined.</li>
+ </ul>
+ </section>
+
+ <section id="SSLConfigurationsForHSFTPSources">
+ <title>SSL Configurations for HSFTP sources</title>
+
+ <p>To use an HSFTP source (i.e. using the hsftp protocol), a Map-Red SSL
+ configuration file needs to be specified (via the <code>-mapredSslConf</code>
+ option). This must specify 3 parameters:</p>
+
+ <ul>
+ <li><code>ssl.client.truststore.location</code>: The local-filesystem
+ location of the trust-store file, containing the certificate for
+ the namenode.</li>
+
+ <li><code>ssl.client.truststore.type</code>: (Optional) The format of
+ the trust-store file.</li>
+
+ <li><code>ssl.client.truststore.password</code>: (Optional) Password
+ for the trust-store file.</li>
+ </ul>
+
+ <p>The following is an example of the contents of the contents of
+ a Map-Red SSL Configuration file:</p>
+
+ <p> <br/> <code> <configuration> </code> </p>
+
+ <p> <br/> <code><property> </code> </p>
+ <p> <code><name>ssl.client.truststore.location</name> </code> </p>
+ <p> <code><value>/work/keystore.jks</value> </code> </p>
+ <p> <code><description>Truststore to be used by clients like distcp. Must be specified. </description></code> </p>
+ <p> <br/> <code></property> </code> </p>
+
+ <p><code> <property> </code> </p>
+ <p> <code><name>ssl.client.truststore.password</name> </code> </p>
+ <p> <code><value>changeme</value> </code> </p>
+ <p> <code><description>Optional. Default value is "". </description> </code> </p>
+ <p> <code></property> </code> </p>
+
+ <p> <br/> <code> <property> </code> </p>
+ <p> <code> <name>ssl.client.truststore.type</name></code> </p>
+ <p> <code> <value>jks</value></code> </p>
+ <p> <code> <description>Optional. Default value is "jks". </description></code> </p>
+ <p> <code> </property> </code> </p>
+
+ <p> <code> </configuration> </code> </p>
+
+ <p><br/>The SSL configuration file must be in the class-path of the
+ DistCp program.</p>
+ </section>
+ </section>
+
+ <section id="FrequentlyAskedQuestions">
+ <title>Frequently Asked Questions</title>
+
+ <ol>
+ <li><strong>Why does -update not create the parent source-directory under
+ a pre-existing target directory?</strong>
+
+ <p>The behaviour of <code>-update</code> and <code>-overwrite</code>
+ is described in detail in the Usage section of this document. In short,
+ if either option is used with a pre-existing destination directory, the
+ <strong>contents</strong> of each source directory is copied over, rather
+ than the source-directory itself.
+ This behaviour is consistent with the legacy DistCp implementation as well.
+ </p>
+ </li>
+
+ <li><strong>How does the new DistCp differ in semantics from the Legacy
+ DistCp?</strong>
+
+ <ul>
+ <li>Files that are skipped during copy used to also have their
+ file-attributes (permissions, owner/group info, etc.) unchanged,
+ when copied with Legacy DistCp. These are now updated, even if
+ the file-copy is skipped.</li>
+ <li>Empty root directories among the source-path inputs were not
+ created at the target, in Legacy DistCp. These are now created.</li>
+ </ul>
+ </li>
+
+ <li><strong>Why does the new DistCp use more maps than legacy DistCp?</strong>
+ <p>Legacy DistCp works by figuring out what files need to be actually
+ copied to target <strong>before</strong> the copy-job is launched, and then
+ launching as many maps as required for copy. So if a majority of the files
+ need to be skipped (because they already exist, for example), fewer maps
+ will be needed. As a consequence, the time spent in setup (i.e. before the
+ M/R job) is higher.</p>
+ <p>The new DistCp calculates only the contents of the source-paths. It
+ doesn't try to filter out what files can be skipped. That decision is put-
+ off till the M/R job runs. This is much faster (vis-a-vis execution-time),
+ but the number of maps launched will be as specified in the <code>-m</code>
+ option, or 20 (default) if unspecified.</p>
+ </li>
+
+ <li><strong>Why does DistCp not run faster when more maps are specified?</strong>
+ <p>At present, the smallest unit of work for DistCp is a file. i.e.,
+ a file is processed by only one map. Increasing the number of maps to
+ a value exceeding the number of files would yield no performance
+ benefit. The number of maps lauched would equal the number of files.</p>
+ </li>
+
+ <li><strong>Why does DistCp run out of memory?</strong>
+ <p>If the number of individual files/directories being copied from
+ the source path(s) is extremely large (e.g. 1,000,000 paths), DistCp might
+ run out of memory while determining the list of paths for copy. This is
+ not unique to the new DistCp implementation.</p>
+ <p>To get around this, consider changing the <code>-Xmx</code> JVM
+ heap-size parameters, as follows:</p>
+ <p><code>bash$ export HADOOP_CLIENT_OPTS="-Xms64m -Xmx1024m"</code></p>
+ <p><code>bash$ hadoop distcp2 /source /target</code></p>
+ </li>
+ </ol>
+ </section>
+
+ </body>
+
+</document>
Modified: hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/site.xml?rev=1461952&r1=1461951&r2=1461952&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ hadoop/common/branches/branch-1/src/docs/src/documentation/content/xdocs/site.xml Thu Mar 28 05:00:09 2013
@@ -47,6 +47,7 @@ See http://forrest.apache.org/docs/linki
<streaming label="Hadoop Streaming" href="streaming.html" />
<commands label="Hadoop Commands" href="commands_manual.html" />
<distcp label="DistCp" href="distcp.html" />
+ <distcp2 label="DistCp Version 2" href="distcp2.html" />
<vaidya label="Vaidya" href="vaidya.html"/>
<archives label="Hadoop Archives" href="hadoop_archives.html"/>
<gridmix label="Gridmix" href="gridmix.html"/>
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/StubContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/StubContext.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/StubContext.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/StubContext.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class StubContext {
+
+ private StubStatusReporter reporter = new StubStatusReporter();
+ private RecordReader<Text, FileStatus> reader;
+ private StubInMemoryWriter writer = new StubInMemoryWriter();
+ private Mapper<Text, FileStatus, Text, Text>.Context mapperContext;
+
+ public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
+ int taskId) throws IOException, InterruptedException {
+
+ Mapper<Text, FileStatus, Text, Text> mapper
+ = new Mapper<Text, FileStatus, Text, Text>();
+
+ Mapper<Text, FileStatus, Text, Text>.Context contextImpl
+ = mapper.new Context(conf,
+ getTaskAttemptID(taskId), reader, writer,
+ null, reporter, null);
+
+ this.reader = reader;
+ this.mapperContext = contextImpl;
+ }
+
+ public Mapper<Text, FileStatus, Text, Text>.Context getContext() {
+ return mapperContext;
+ }
+
+ public StatusReporter getReporter() {
+ return reporter;
+ }
+
+ public RecordReader<Text, FileStatus> getReader() {
+ return reader;
+ }
+
+ public StubInMemoryWriter getWriter() {
+ return writer;
+ }
+
+ public static class StubStatusReporter extends StatusReporter {
+
+ private Counters counters = new Counters();
+
+ public StubStatusReporter() {
+ /*
+ final CounterGroup counterGroup
+ = new CounterGroup("FileInputFormatCounters",
+ "FileInputFormatCounters");
+ counterGroup.addCounter(new Counter("BYTES_READ",
+ "BYTES_READ",
+ 0));
+ counters.addGroup(counterGroup);
+ */
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return counters.findCounter(name);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return counters.findCounter(group, name);
+ }
+
+ @Override
+ public void progress() {}
+
+ @Override
+ public float getProgress() {
+ return 0F;
+ }
+
+ @Override
+ public void setStatus(String status) {}
+ }
+
+
+ public static class StubInMemoryWriter extends RecordWriter<Text, Text> {
+
+ List<Text> keys = new ArrayList<Text>();
+
+ List<Text> values = new ArrayList<Text>();
+
+ @Override
+ public void write(Text key, Text value) throws IOException, InterruptedException {
+ keys.add(key);
+ values.add(value);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ }
+
+ public List<Text> keys() {
+ return keys;
+ }
+
+ public List<Text> values() {
+ return values;
+ }
+
+ }
+
+ public static TaskAttemptID getTaskAttemptID(int taskId) {
+ return new TaskAttemptID("", 0, true, taskId, 0);
+ }
+}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestCopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestCopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestCopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.distcp2.util.TestDistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCopyListing extends SimpleCopyListing {
+ private static final Log LOG = LogFactory.getLog(TestCopyListing.class);
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+ private static final Configuration config = new Configuration();
+ private static MiniDFSCluster cluster;
+
+ @BeforeClass
+ public static void create() throws IOException {
+ cluster = new MiniDFSCluster(config, 1, true, null);
+ }
+
+ @AfterClass
+ public static void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ public TestCopyListing() {
+ super(config, CREDENTIALS);
+ }
+
+ protected TestCopyListing(Configuration configuration) {
+ super(configuration, CREDENTIALS);
+ }
+
+ @Override
+ protected long getBytesToCopy() {
+ return 0;
+ }
+
+ @Override
+ protected long getNumberOfPaths() {
+ return 0;
+ }
+
+ @Test(timeout=10000)
+ public void testSkipCopy() throws Exception {
+ SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS) {
+ @Override
+ protected boolean shouldCopy(Path path, DistCpOptions options) {
+ return !path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME);
+ }
+ };
+ FileSystem fs = FileSystem.get(getConf());
+ List<Path> srcPaths = new ArrayList<Path>();
+ srcPaths.add(new Path("/tmp/in4/1"));
+ srcPaths.add(new Path("/tmp/in4/2"));
+ Path target = new Path("/tmp/out4/1");
+ TestDistCpUtils.createFile(fs, "/tmp/in4/1/_SUCCESS");
+ TestDistCpUtils.createFile(fs, "/tmp/in4/1/file");
+ TestDistCpUtils.createFile(fs, "/tmp/in4/2");
+ fs.mkdirs(target);
+ DistCpOptions options = new DistCpOptions(srcPaths, target);
+ Path listingFile = new Path("/tmp/list4");
+ listing.buildListing(listingFile, options);
+ Assert.assertEquals(listing.getNumberOfPaths(), 2);
+ SequenceFile.Reader reader = new SequenceFile.Reader(
+ fs, listingFile, getConf());
+ FileStatus fileStatus = new FileStatus();
+ Text relativePath = new Text();
+ Assert.assertTrue(reader.next(relativePath, fileStatus));
+ Assert.assertEquals(relativePath.toString(), "/1/file");
+ Assert.assertTrue(reader.next(relativePath, fileStatus));
+ Assert.assertEquals(relativePath.toString(), "/2");
+ Assert.assertFalse(reader.next(relativePath, fileStatus));
+ }
+
+ @Test(timeout=10000)
+ public void testMultipleSrcToFile() {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(getConf());
+ List<Path> srcPaths = new ArrayList<Path>();
+ srcPaths.add(new Path("/tmp/in/1"));
+ srcPaths.add(new Path("/tmp/in/2"));
+ Path target = new Path("/tmp/out/1");
+ TestDistCpUtils.createFile(fs, "/tmp/in/1");
+ TestDistCpUtils.createFile(fs, "/tmp/in/2");
+ fs.mkdirs(target);
+ DistCpOptions options = new DistCpOptions(srcPaths, target);
+ validatePaths(options);
+ TestDistCpUtils.delete(fs, "/tmp");
+ //No errors
+
+ target = new Path("/tmp/out/1");
+ fs.create(target).close();
+ options = new DistCpOptions(srcPaths, target);
+ try {
+ validatePaths(options);
+ Assert.fail("Invalid inputs accepted");
+ } catch (InvalidInputException ignore) { }
+ TestDistCpUtils.delete(fs, "/tmp");
+
+ srcPaths.clear();
+ srcPaths.add(new Path("/tmp/in/1"));
+ fs.mkdirs(new Path("/tmp/in/1"));
+ target = new Path("/tmp/out/1");
+ fs.create(target).close();
+ options = new DistCpOptions(srcPaths, target);
+ try {
+ validatePaths(options);
+ Assert.fail("Invalid inputs accepted");
+ } catch (InvalidInputException ignore) { }
+ TestDistCpUtils.delete(fs, "/tmp");
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test input validation failed");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test(timeout=10000)
+ public void testDuplicates() {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(getConf());
+ List<Path> srcPaths = new ArrayList<Path>();
+ srcPaths.add(new Path("/tmp/in/*/*"));
+ TestDistCpUtils.createFile(fs, "/tmp/in/src1/1.txt");
+ TestDistCpUtils.createFile(fs, "/tmp/in/src2/1.txt");
+ Path target = new Path("/tmp/out");
+ Path listingFile = new Path("/tmp/list");
+ DistCpOptions options = new DistCpOptions(srcPaths, target);
+ CopyListing listing = CopyListing.getCopyListing(getConf(), CREDENTIALS, options);
+ try {
+ listing.buildListing(listingFile, options);
+ Assert.fail("Duplicates not detected");
+ } catch (DuplicateFileException ignore) {
+ }
+ } catch (IOException e) {
+ LOG.error("Exception encountered in test", e);
+ Assert.fail("Test failed " + e.getMessage());
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test(timeout=10000)
+ public void testBuildListing() {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(getConf());
+ List<Path> srcPaths = new ArrayList<Path>();
+ Path p1 = new Path("/tmp/in/1");
+ Path p2 = new Path("/tmp/in/2");
+ Path p3 = new Path("/tmp/in2/2");
+ Path target = new Path("/tmp/out/1");
+ srcPaths.add(p1.getParent());
+ srcPaths.add(p3.getParent());
+ TestDistCpUtils.createFile(fs, "/tmp/in/1");
+ TestDistCpUtils.createFile(fs, "/tmp/in/2");
+ TestDistCpUtils.createFile(fs, "/tmp/in2/2");
+ fs.mkdirs(target);
+ OutputStream out = fs.create(p1);
+ out.write("ABC".getBytes());
+ out.close();
+
+ out = fs.create(p2);
+ out.write("DEF".getBytes());
+ out.close();
+
+ out = fs.create(p3);
+ out.write("GHIJ".getBytes());
+ out.close();
+
+ Path listingFile = new Path("/tmp/file");
+
+ DistCpOptions options = new DistCpOptions(srcPaths, target);
+ options.setSyncFolder(true);
+ CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+ try {
+ listing.buildListing(listingFile, options);
+ Assert.fail("Duplicates not detected");
+ } catch (DuplicateFileException ignore) {
+ }
+ Assert.assertEquals(listing.getBytesToCopy(), 10);
+ Assert.assertEquals(listing.getNumberOfPaths(), 3);
+ TestDistCpUtils.delete(fs, "/tmp");
+
+ try {
+ listing.buildListing(listingFile, options);
+ Assert.fail("Invalid input not detected");
+ } catch (InvalidInputException ignore) {
+ }
+ TestDistCpUtils.delete(fs, "/tmp");
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test build listing failed");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test(timeout=10000)
+ public void testBuildListingForSingleFile() {
+ FileSystem fs = null;
+ String testRootString = "/singleFileListing";
+ Path testRoot = new Path(testRootString);
+ SequenceFile.Reader reader = null;
+ try {
+ fs = FileSystem.get(getConf());
+ if (fs.exists(testRoot))
+ TestDistCpUtils.delete(fs, testRootString);
+
+ Path sourceFile = new Path(testRoot, "/source/foo/bar/source.txt");
+ Path decoyFile = new Path(testRoot, "/target/moo/source.txt");
+ Path targetFile = new Path(testRoot, "/target/moo/target.txt");
+
+ TestDistCpUtils.createFile(fs, sourceFile.toString());
+ TestDistCpUtils.createFile(fs, decoyFile.toString());
+ TestDistCpUtils.createFile(fs, targetFile.toString());
+
+ List<Path> srcPaths = new ArrayList<Path>();
+ srcPaths.add(sourceFile);
+
+ DistCpOptions options = new DistCpOptions(srcPaths, targetFile);
+ CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+
+ final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
+ listing.buildListing(listFile, options);
+
+ reader = new SequenceFile.Reader(fs, listFile, getConf());
+ FileStatus fileStatus = new FileStatus();
+ Text relativePath = new Text();
+ Assert.assertTrue(reader.next(relativePath, fileStatus));
+ Assert.assertTrue(relativePath.toString().equals(""));
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception encountered.");
+ LOG.error("Unexpected exception: ", e);
+ }
+ finally {
+ TestDistCpUtils.delete(fs, testRootString);
+ IOUtils.closeStream(reader);
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestFileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestFileBasedCopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestFileBasedCopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestFileBasedCopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.distcp2.util.TestDistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileBasedCopyListing {
+ private static final Log LOG = LogFactory.getLog(TestFileBasedCopyListing.class);
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+ private static final Configuration config = new Configuration();
+ private static MiniDFSCluster cluster;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void create() throws IOException {
+ cluster = new MiniDFSCluster(config, 1, true, null);
+ fs = cluster.getFileSystem();
+ buildExpectedValuesMap();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private static Map<String, String> map = new HashMap<String, String>();
+
+ private static void buildExpectedValuesMap() {
+ map.put("/file1", "/tmp/singlefile1/file1");
+ map.put("/file2", "/tmp/singlefile2/file2");
+ map.put("/file3", "/tmp/multifile/file3");
+ map.put("/file4", "/tmp/multifile/file4");
+ map.put("/file5", "/tmp/multifile/file5");
+ map.put("/multifile/file3", "/tmp/multifile/file3");
+ map.put("/multifile/file4", "/tmp/multifile/file4");
+ map.put("/multifile/file5", "/tmp/multifile/file5");
+ map.put("/Ufile3", "/tmp/Umultifile/Ufile3");
+ map.put("/Ufile4", "/tmp/Umultifile/Ufile4");
+ map.put("/Ufile5", "/tmp/Umultifile/Ufile5");
+ map.put("/dir1", "/tmp/singledir/dir1");
+ map.put("/singledir/dir1", "/tmp/singledir/dir1");
+ map.put("/dir2", "/tmp/singledir/dir2");
+ map.put("/singledir/dir2", "/tmp/singledir/dir2");
+ map.put("/Udir1", "/tmp/Usingledir/Udir1");
+ map.put("/Udir2", "/tmp/Usingledir/Udir2");
+ map.put("/dir2/file6", "/tmp/singledir/dir2/file6");
+ map.put("/singledir/dir2/file6", "/tmp/singledir/dir2/file6");
+ map.put("/file7", "/tmp/singledir1/dir3/file7");
+ map.put("/file8", "/tmp/singledir1/dir3/file8");
+ map.put("/file9", "/tmp/singledir1/dir3/file9");
+ map.put("/dir3/file7", "/tmp/singledir1/dir3/file7");
+ map.put("/dir3/file8", "/tmp/singledir1/dir3/file8");
+ map.put("/dir3/file9", "/tmp/singledir1/dir3/file9");
+ map.put("/Ufile7", "/tmp/Usingledir1/Udir3/Ufile7");
+ map.put("/Ufile8", "/tmp/Usingledir1/Udir3/Ufile8");
+ map.put("/Ufile9", "/tmp/Usingledir1/Udir3/Ufile9");
+ }
+
+ @Test
+ public void testSingleFileMissingTarget() {
+ caseSingleFileMissingTarget(false);
+ caseSingleFileMissingTarget(true);
+ }
+
+ private void caseSingleFileMissingTarget(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singlefile1/file1");
+ createFiles("/tmp/singlefile1/file1");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 0);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testSingleFileTargetFile() {
+ caseSingleFileTargetFile(false);
+ caseSingleFileTargetFile(true);
+ }
+
+ private void caseSingleFileTargetFile(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singlefile1/file1");
+ createFiles("/tmp/singlefile1/file1", target.toString());
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 0);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testSingleFileTargetDir() {
+ caseSingleFileTargetDir(false);
+ caseSingleFileTargetDir(true);
+ }
+
+ private void caseSingleFileTargetDir(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singlefile2/file2");
+ createFiles("/tmp/singlefile2/file2");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testSingleDirTargetMissing() {
+ caseSingleDirTargetMissing(false);
+ caseSingleDirTargetMissing(true);
+ }
+
+ private void caseSingleDirTargetMissing(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singledir");
+ mkdirs("/tmp/singledir/dir1");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testSingleDirTargetPresent() {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/singledir");
+ mkdirs("/tmp/singledir/dir1");
+ mkdirs(target.toString());
+
+ runTest(listFile, target);
+
+ checkResult(listFile, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testUpdateSingleDirTargetPresent() {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/Usingledir");
+ mkdirs("/tmp/Usingledir/Udir1");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, true);
+
+ checkResult(listFile, 1);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testMultiFileTargetPresent() {
+ caseMultiFileTargetPresent(false);
+ caseMultiFileTargetPresent(true);
+ }
+
+ private void caseMultiFileTargetPresent(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 3);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testMultiFileTargetMissing() {
+ caseMultiFileTargetMissing(false);
+ caseMultiFileTargetMissing(true);
+ }
+
+ private void caseMultiFileTargetMissing(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 3);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testMultiDirTargetPresent() {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/multifile", "/tmp/singledir");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ mkdirs(target.toString(), "/tmp/singledir/dir1");
+
+ runTest(listFile, target);
+
+ checkResult(listFile, 4);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testUpdateMultiDirTargetPresent() {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/Umultifile", "/tmp/Usingledir");
+ createFiles("/tmp/Umultifile/Ufile3", "/tmp/Umultifile/Ufile4", "/tmp/Umultifile/Ufile5");
+ mkdirs(target.toString(), "/tmp/Usingledir/Udir1");
+
+ runTest(listFile, target, true);
+
+ checkResult(listFile, 4);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testMultiDirTargetMissing() {
+ caseMultiDirTargetMissing(false);
+ caseMultiDirTargetMissing(true);
+ }
+
+ private void caseMultiDirTargetMissing(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/multifile", "/tmp/singledir");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ mkdirs("/tmp/singledir/dir1");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 4);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testGlobTargetMissingSingleLevel() {
+ caseGlobTargetMissingSingleLevel(false);
+ caseGlobTargetMissingSingleLevel(true);
+ }
+
+ private void caseGlobTargetMissingSingleLevel(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp1/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/*");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/singledir/dir2/file6");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 5);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+ }
+
+ @Test
+ public void testGlobTargetMissingMultiLevel() {
+ caseGlobTargetMissingMultiLevel(false);
+ caseGlobTargetMissingMultiLevel(true);
+ }
+
+ private void caseGlobTargetMissingMultiLevel(boolean sync) {
+
+ try {
+ Path listFile = new Path("/tmp1/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/*/*");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/singledir1/dir3/file7", "/tmp/singledir1/dir3/file8",
+ "/tmp/singledir1/dir3/file9");
+
+ runTest(listFile, target, sync);
+
+ checkResult(listFile, 6);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+ }
+
+ @Test
+ public void testGlobTargetDirMultiLevel() {
+
+ try {
+ Path listFile = new Path("/tmp1/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/*/*");
+ createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
+ createFiles("/tmp/singledir1/dir3/file7", "/tmp/singledir1/dir3/file8",
+ "/tmp/singledir1/dir3/file9");
+ mkdirs(target.toString());
+
+ runTest(listFile, target);
+
+ checkResult(listFile, 6);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+ }
+
+ @Test
+ public void testUpdateGlobTargetDirMultiLevel() {
+
+ try {
+ Path listFile = new Path("/tmp1/listing");
+ Path target = new Path("/tmp/target");
+
+ addEntries(listFile, "/tmp/*/*");
+ createFiles("/tmp/Umultifile/Ufile3", "/tmp/Umultifile/Ufile4", "/tmp/Umultifile/Ufile5");
+ createFiles("/tmp/Usingledir1/Udir3/Ufile7", "/tmp/Usingledir1/Udir3/Ufile8",
+ "/tmp/Usingledir1/Udir3/Ufile9");
+ mkdirs(target.toString());
+
+ runTest(listFile, target, true);
+
+ checkResult(listFile, 6);
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing build listing", e);
+ Assert.fail("build listing failure");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ TestDistCpUtils.delete(fs, "/tmp1");
+ }
+ }
+
+ private void addEntries(Path listFile, String... entries) throws IOException {
+ OutputStream out = fs.create(listFile);
+ try {
+ for (String entry : entries){
+ out.write(entry.getBytes());
+ out.write("\n".getBytes());
+ }
+ } finally {
+ out.close();
+ }
+ }
+
+ private void createFiles(String... entries) throws IOException {
+ for (String entry : entries){
+ OutputStream out = fs.create(new Path(entry));
+ try {
+ out.write(entry.getBytes());
+ out.write("\n".getBytes());
+ } finally {
+ out.close();
+ }
+ }
+ }
+
+ private void mkdirs(String... entries) throws IOException {
+ for (String entry : entries){
+ fs.mkdirs(new Path(entry));
+ }
+ }
+
+ private void runTest(Path listFile, Path target) throws IOException {
+ runTest(listFile, target, true);
+ }
+
+ private void runTest(Path listFile, Path target, boolean sync) throws IOException {
+ CopyListing listing = new FileBasedCopyListing(config, CREDENTIALS);
+ DistCpOptions options = new DistCpOptions(listFile, target);
+ options.setSyncFolder(sync);
+ listing.buildListing(listFile, options);
+ }
+
+ private void checkResult(Path listFile, int count) throws IOException {
+ if (count == 0) {
+ return;
+ }
+
+ int recCount = 0;
+ SequenceFile.Reader reader = new SequenceFile.Reader(
+ listFile.getFileSystem(config), listFile, config);
+ try {
+ Text relPath = new Text();
+ FileStatus fileStatus = new FileStatus();
+ while (reader.next(relPath, fileStatus)) {
+ Assert.assertEquals(fileStatus.getPath().toUri().getPath(), map.get(relPath.toString()));
+ recCount++;
+ }
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ Assert.assertEquals(recCount, count);
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestGlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestGlobbedCopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestGlobbedCopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestGlobbedCopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestGlobbedCopyListing {
+
+ private static MiniDFSCluster cluster;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+ public static Map<String, String> expectedValues = new HashMap<String, String>();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+ createSourceData();
+ }
+
+ private static void createSourceData() throws Exception {
+ mkdirs("/tmp/source/1");
+ mkdirs("/tmp/source/2");
+ mkdirs("/tmp/source/2/3");
+ mkdirs("/tmp/source/2/3/4");
+ mkdirs("/tmp/source/5");
+ touchFile("/tmp/source/5/6");
+ mkdirs("/tmp/source/7");
+ mkdirs("/tmp/source/7/8");
+ touchFile("/tmp/source/7/8/9");
+ }
+
+ private static void mkdirs(String path) throws Exception {
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ fileSystem.mkdirs(new Path(path));
+ recordInExpectedValues(path);
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem);
+ }
+ }
+
+ private static void touchFile(String path) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ recordInExpectedValues(path);
+ }
+ finally {
+ IOUtils.cleanup(null, fileSystem, outputStream);
+ }
+ }
+
+ private static void recordInExpectedValues(String path) throws Exception {
+ FileSystem fileSystem = cluster.getFileSystem();
+ Path sourcePath = new Path(fileSystem.getUri().toString() + path);
+ expectedValues.put(sourcePath.toString(), DistCpUtils.getRelativePath(
+ new Path("/tmp/source"), sourcePath));
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ final URI uri = cluster.getFileSystem().getUri();
+ final String pathString = uri.toString();
+ Path fileSystemPath = new Path(pathString);
+ Path source = new Path(fileSystemPath.toString() + "/tmp/source");
+ Path target = new Path(fileSystemPath.toString() + "/tmp/target");
+ Path listingPath = new Path(fileSystemPath.toString() + "/tmp/META/fileList.seq");
+ DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+
+ new GlobbedCopyListing(new Configuration(), CREDENTIALS).buildListing(listingPath, options);
+
+ verifyContents(listingPath);
+ }
+
+ private void verifyContents(Path listingPath) throws Exception {
+ SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
+ listingPath, new Configuration());
+ Text key = new Text();
+ FileStatus value = new FileStatus();
+ Map<String, String> actualValues = new HashMap<String, String>();
+ while (reader.next(key, value)) {
+ actualValues.put(value.getPath().toString(), key.toString());
+ }
+
+ Assert.assertEquals(expectedValues.size(), actualValues.size());
+ for (Map.Entry<String, String> entry : actualValues.entrySet()) {
+ Assert.assertEquals(entry.getValue(), expectedValues.get(entry.getKey()));
+ }
+ }
+}