You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:36:54 UTC
svn commit: r1236045 [3/5] - in /hadoop/common/trunk: hadoop-project/
hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/
hadoop-tools/hadoop-distcp/src/main/
hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/hadoop-distcp/sr...
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The DynamicRecordReader is used in conjunction with the DynamicInputFormat
+ * to implement the "Worker pattern" for DistCp.
+ * The DynamicRecordReader is responsible for:
+ * 1. Presenting the contents of each chunk to DistCp's mapper.
+ * 2. Acquiring a new chunk when the current chunk has been completely consumed,
+ * transparently.
+ */
+public class DynamicRecordReader<K, V> extends RecordReader<K, V> {
+ private static final Log LOG = LogFactory.getLog(DynamicRecordReader.class);
+ private TaskAttemptContext taskAttemptContext;
+ private Configuration configuration;
+ private DynamicInputChunk<K, V> chunk;
+ private TaskID taskId;
+
+ // Data required for progress indication.
+ private int numRecordsPerChunk; // Constant per job.
+ private int totalNumRecords; // Constant per job.
+ private int numRecordsProcessedByThisMap = 0;
+ private long timeOfLastChunkDirScan = 0;
+ private boolean isChunkDirAlreadyScanned = false;
+
+ private static long TIME_THRESHOLD_FOR_DIR_SCANS = TimeUnit.MINUTES.toMillis(5);
+
+ /**
+ * Implementation for RecordReader::initialize(). Initializes the internal
+ * RecordReader to read from chunks.
+ * @param inputSplit The InputSplit for the map. Ignored entirely.
+ * @param taskAttemptContext The AttemptContext.
+ * @throws IOException, on failure.
+ * @throws InterruptedException
+ */
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ numRecordsPerChunk = DynamicInputFormat.getNumEntriesPerChunk(
+ taskAttemptContext.getConfiguration());
+ this.taskAttemptContext = taskAttemptContext;
+ configuration = taskAttemptContext.getConfiguration();
+ taskId = taskAttemptContext.getTaskAttemptID().getTaskID();
+ chunk = DynamicInputChunk.acquire(this.taskAttemptContext);
+ timeOfLastChunkDirScan = System.currentTimeMillis();
+ isChunkDirAlreadyScanned = false;
+
+ totalNumRecords = getTotalNumRecords();
+
+ }
+
+ private int getTotalNumRecords() {
+ return DistCpUtils.getInt(configuration,
+ DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS);
+ }
+
+ /**
+ * Implementation of RecordReader::nextValue().
+ * Reads the contents of the current chunk and returns them. When a chunk has
+ * been completely exhausted, an new chunk is acquired and read,
+ * transparently.
+ * @return True, if the nextValue() could be traversed to. False, otherwise.
+ * @throws IOException, on failure.
+ * @throws InterruptedException
+ */
+ @Override
+ public boolean nextKeyValue()
+ throws IOException, InterruptedException {
+
+ if (chunk == null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug(taskId + ": RecordReader is null. No records to be read.");
+ return false;
+ }
+
+ if (chunk.getReader().nextKeyValue()) {
+ ++numRecordsProcessedByThisMap;
+ return true;
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(taskId + ": Current chunk exhausted. " +
+ " Attempting to pick up new one.");
+
+ chunk.release();
+ timeOfLastChunkDirScan = System.currentTimeMillis();
+ isChunkDirAlreadyScanned = false;
+
+ chunk = DynamicInputChunk.acquire(taskAttemptContext);
+
+ if (chunk == null) return false;
+
+ if (chunk.getReader().nextKeyValue()) {
+ ++numRecordsProcessedByThisMap;
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ /**
+ * Implementation of RecordReader::getCurrentKey().
+ * @return The key of the current record. (i.e. the source-path.)
+ * @throws IOException, on failure.
+ * @throws InterruptedException
+ */
+ @Override
+ public K getCurrentKey()
+ throws IOException, InterruptedException {
+ return chunk.getReader().getCurrentKey();
+ }
+
+ /**
+ * Implementation of RecordReader::getCurrentValue().
+ * @return The value of the current record. (i.e. the target-path.)
+ * @throws IOException, on failure.
+ * @throws InterruptedException
+ */
+ @Override
+ public V getCurrentValue()
+ throws IOException, InterruptedException {
+ return chunk.getReader().getCurrentValue();
+ }
+
+ /**
+ * Implementation of RecordReader::getProgress().
+ * @return A fraction [0.0,1.0] indicating the progress of a DistCp mapper.
+ * @throws IOException, on failure.
+ * @throws InterruptedException
+ */
+ @Override
+ public float getProgress()
+ throws IOException, InterruptedException {
+ final int numChunksLeft = getNumChunksLeft();
+ if (numChunksLeft < 0) {// Un-initialized. i.e. Before 1st dir-scan.
+ assert numRecordsProcessedByThisMap <= numRecordsPerChunk
+ : "numRecordsProcessedByThisMap:" + numRecordsProcessedByThisMap +
+ " exceeds numRecordsPerChunk:" + numRecordsPerChunk;
+ return ((float) numRecordsProcessedByThisMap) / totalNumRecords;
+ // Conservative estimate, till the first directory scan.
+ }
+
+ return ((float) numRecordsProcessedByThisMap)
+ /(numRecordsProcessedByThisMap + numRecordsPerChunk*numChunksLeft);
+ }
+
+ private int getNumChunksLeft() throws IOException {
+ long now = System.currentTimeMillis();
+ boolean tooLongSinceLastDirScan
+ = now - timeOfLastChunkDirScan > TIME_THRESHOLD_FOR_DIR_SCANS;
+
+ if (tooLongSinceLastDirScan
+ || (!isChunkDirAlreadyScanned &&
+ numRecordsProcessedByThisMap%numRecordsPerChunk
+ > numRecordsPerChunk/2)) {
+ DynamicInputChunk.getListOfChunkFiles();
+ isChunkDirAlreadyScanned = true;
+ timeOfLastChunkDirScan = now;
+ }
+
+ return DynamicInputChunk.getNumChunksLeft();
+ }
+ /**
+ * Implementation of RecordReader::close().
+ * Closes the RecordReader.
+ * @throws IOException, on failure.
+ */
+ @Override
+ public void close()
+ throws IOException {
+ if (chunk != null)
+ chunk.close();
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.mapreduce.InputFormat;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Locale;
+import java.text.DecimalFormat;
+import java.net.URI;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Utility functions used in DistCp.
+ */
+public class DistCpUtils {
+
+ private static final Log LOG = LogFactory.getLog(DistCpUtils.class);
+
+ /**
+ * Retrieves size of the file at the specified path.
+ * @param path The path of the file whose size is sought.
+ * @param configuration Configuration, to retrieve the appropriate FileSystem.
+ * @return The file-size, in number of bytes.
+ * @throws IOException, on failure.
+ */
+ public static long getFileSize(Path path, Configuration configuration)
+ throws IOException {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Retrieving file size for: " + path);
+ return path.getFileSystem(configuration).getFileStatus(path).getLen();
+ }
+
+ /**
+ * Utility to publish a value to a configuration.
+ * @param configuration The Configuration to which the value must be written.
+ * @param label The label for the value being published.
+ * @param value The value being published.
+ * @param <T> The type of the value.
+ */
+ public static <T> void publish(Configuration configuration,
+ String label, T value) {
+ configuration.set(label, String.valueOf(value));
+ }
+
+ /**
+ * Utility to retrieve a specified key from a Configuration. Throw exception
+ * if not found.
+ * @param configuration The Configuration in which the key is sought.
+ * @param label The key being sought.
+ * @return Integer value of the key.
+ */
+ public static int getInt(Configuration configuration, String label) {
+ int value = configuration.getInt(label, -1);
+ assert value >= 0 : "Couldn't find " + label;
+ return value;
+ }
+
+ /**
+ * Utility to retrieve a specified key from a Configuration. Throw exception
+ * if not found.
+ * @param configuration The Configuration in which the key is sought.
+ * @param label The key being sought.
+ * @return Long value of the key.
+ */
+ public static long getLong(Configuration configuration, String label) {
+ long value = configuration.getLong(label, -1);
+ assert value >= 0 : "Couldn't find " + label;
+ return value;
+ }
+
+ /**
+ * Returns the class that implements a copy strategy. Looks up the implementation for
+ * a particular strategy from distcp-default.xml
+ *
+ * @param conf - Configuration object
+ * @param options - Handle to input options
+ * @return Class implementing the strategy specified in options.
+ */
+ public static Class<? extends InputFormat> getStrategy(Configuration conf,
+ DistCpOptions options) {
+ String confLabel = "distcp." +
+ options.getCopyStrategy().toLowerCase(Locale.getDefault()) + ".strategy.impl";
+ return conf.getClass(confLabel, UniformSizeInputFormat.class, InputFormat.class);
+ }
+
+ /**
+ * Gets relative path of child path with respect to a root path
+ * For ex. If childPath = /tmp/abc/xyz/file and
+ * sourceRootPath = /tmp/abc
+ * Relative path would be /xyz/file
+ * If childPath = /file and
+ * sourceRootPath = /
+ * Relative path would be /file
+ * @param sourceRootPath - Source root path
+ * @param childPath - Path for which relative path is required
+ * @return - Relative portion of the child path (always prefixed with /
+ * unless it is empty
+ */
+ public static String getRelativePath(Path sourceRootPath, Path childPath) {
+ String childPathString = childPath.toUri().getPath();
+ String sourceRootPathString = sourceRootPath.toUri().getPath();
+ return sourceRootPathString.equals("/") ? childPathString :
+ childPathString.substring(sourceRootPathString.length());
+ }
+
+ /**
+ * Pack file preservation attributes into a string, containing
+ * just the first character of each preservation attribute
+ * @param attributes - Attribute set to preserve
+ * @return - String containing first letters of each attribute to preserve
+ */
+ public static String packAttributes(EnumSet<FileAttribute> attributes) {
+ StringBuffer buffer = new StringBuffer(5);
+ int len = 0;
+ for (FileAttribute attribute : attributes) {
+ buffer.append(attribute.name().charAt(0));
+ len++;
+ }
+ return buffer.substring(0, len);
+ }
+
+ /**
+ * Un packs preservation attribute string containing the first character of
+ * each preservation attribute back to a set of attributes to preserve
+ * @param attributes - Attribute string
+ * @return - Attribute set
+ */
+ public static EnumSet<FileAttribute> unpackAttributes(String attributes) {
+ EnumSet<FileAttribute> retValue = EnumSet.noneOf(FileAttribute.class);
+
+ if (attributes != null) {
+ for (int index = 0; index < attributes.length(); index++) {
+ retValue.add(FileAttribute.getAttribute(attributes.charAt(index)));
+ }
+ }
+
+ return retValue;
+ }
+
+ /**
+ * Preserve attribute on file matching that of the file status being sent
+ * as argument. Barring the block size, all the other attributes are preserved
+ * by this function
+ *
+ * @param targetFS - File system
+ * @param path - Path that needs to preserve original file status
+ * @param srcFileStatus - Original file status
+ * @param attributes - Attribute set that need to be preserved
+ * @throws IOException - Exception if any (particularly relating to group/owner
+ * change or any transient error)
+ */
+ public static void preserve(FileSystem targetFS, Path path,
+ FileStatus srcFileStatus,
+ EnumSet<FileAttribute> attributes) throws IOException {
+
+ FileStatus targetFileStatus = targetFS.getFileStatus(path);
+ String group = targetFileStatus.getGroup();
+ String user = targetFileStatus.getOwner();
+ boolean chown = false;
+
+ if (attributes.contains(FileAttribute.PERMISSION) &&
+ !srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) {
+ targetFS.setPermission(path, srcFileStatus.getPermission());
+ }
+
+ if (attributes.contains(FileAttribute.REPLICATION) && ! targetFileStatus.isDirectory() &&
+ srcFileStatus.getReplication() != targetFileStatus.getReplication()) {
+ targetFS.setReplication(path, srcFileStatus.getReplication());
+ }
+
+ if (attributes.contains(FileAttribute.GROUP) &&
+ !group.equals(srcFileStatus.getGroup())) {
+ group = srcFileStatus.getGroup();
+ chown = true;
+ }
+
+ if (attributes.contains(FileAttribute.USER) &&
+ !user.equals(srcFileStatus.getOwner())) {
+ user = srcFileStatus.getOwner();
+ chown = true;
+ }
+
+ if (chown) {
+ targetFS.setOwner(path, user, group);
+ }
+ }
+
+ /**
+ * Sort sequence file containing FileStatus and Text as key and value respecitvely
+ *
+ * @param fs - File System
+ * @param conf - Configuration
+ * @param sourceListing - Source listing file
+ * @return Path of the sorted file. Is source file with _sorted appended to the name
+ * @throws IOException - Any exception during sort.
+ */
+ public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing)
+ throws IOException {
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, FileStatus.class, conf);
+ Path output = new Path(sourceListing.toString() + "_sorted");
+
+ if (fs.exists(output)) {
+ fs.delete(output, false);
+ }
+
+ sorter.sort(sourceListing, output);
+ return output;
+ }
+
+ /**
+ * String utility to convert a number-of-bytes to human readable format.
+ */
+ private static ThreadLocal<DecimalFormat> FORMATTER
+ = new ThreadLocal<DecimalFormat>() {
+ @Override
+ protected DecimalFormat initialValue() {
+ return new DecimalFormat("0.0");
+ }
+ };
+
+ public static DecimalFormat getFormatter() {
+ return FORMATTER.get();
+ }
+
+ public static String getStringDescriptionFor(long nBytes) {
+
+ char units [] = {'B', 'K', 'M', 'G', 'T', 'P'};
+
+ double current = nBytes;
+ double prev = current;
+ int index = 0;
+
+ while ((current = current/1024) >= 1) {
+ prev = current;
+ ++index;
+ }
+
+ assert index < units.length : "Too large a number.";
+
+ return getFormatter().format(prev) + units[index];
+ }
+
+ /**
+ * Utility to compare checksums for the paths specified.
+ *
+ * If checksums's can't be retrieved, it doesn't fail the test
+ * Only time the comparison would fail is when checksums are
+ * available and they don't match
+ *
+ * @param sourceFS FileSystem for the source path.
+ * @param source The source path.
+ * @param targetFS FileSystem for the target path.
+ * @param target The target path.
+ * @return If either checksum couldn't be retrieved, the function returns
+ * false. If checksums are retrieved, the function returns true if they match,
+ * and false otherwise.
+ * @throws IOException if there's an exception while retrieving checksums.
+ */
+ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
+ FileSystem targetFS, Path target)
+ throws IOException {
+ FileChecksum sourceChecksum = null;
+ FileChecksum targetChecksum = null;
+ try {
+ sourceChecksum = sourceFS.getFileChecksum(source);
+ targetChecksum = targetFS.getFileChecksum(target);
+ } catch (IOException e) {
+ LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
+ }
+ return (sourceChecksum == null || targetChecksum == null ||
+ sourceChecksum.equals(targetChecksum));
+ }
+
+ /* see if two file systems are the same or not
+ *
+ */
+ public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
+ URI srcUri = srcFs.getUri();
+ URI dstUri = destFs.getUri();
+ if (srcUri.getScheme() == null) {
+ return false;
+ }
+ if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+ return false;
+ }
+ String srcHost = srcUri.getHost();
+ String dstHost = dstUri.getHost();
+ if ((srcHost != null) && (dstHost != null)) {
+ try {
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+ } catch(UnknownHostException ue) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Could not compare file-systems. Unknown host: ", ue);
+ return false;
+ }
+ if (!srcHost.equals(dstHost)) {
+ return false;
+ }
+ }
+ else if (srcHost == null && dstHost != null) {
+ return false;
+ }
+ else if (srcHost != null) {
+ return false;
+ }
+
+ //check for ports
+
+ return srcUri.getPort() == dstUri.getPort();
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/RetriableCommand.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.tools.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents commands that be retried on failure, in a configurable
+ * manner.
+ */
+public abstract class RetriableCommand {
+
+ private static Log LOG = LogFactory.getLog(RetriableCommand.class);
+
+ private static final long DELAY_MILLISECONDS = 500;
+ private static final int MAX_RETRIES = 3;
+
+ private RetryPolicy retryPolicy = RetryPolicies.
+ exponentialBackoffRetry(MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS);
+ protected String description;
+
+ /**
+ * Constructor.
+ * @param description The human-readable description of the command.
+ */
+ public RetriableCommand(String description) {
+ this.description = description;
+ }
+
+ /**
+ * Constructor.
+ * @param description The human-readable description of the command.
+ * @param retryPolicy The RetryHandler to be used to compute retries.
+ */
+ public RetriableCommand(String description, RetryPolicy retryPolicy) {
+ this(description);
+ setRetryPolicy(retryPolicy);
+ }
+
+ /**
+ * Implement this interface-method define the command-logic that will be
+ * retried on failure (i.e. with Exception).
+ * @param arguments Argument-list to the command.
+ * @return Generic "Object".
+ * @throws Exception Throws Exception on complete failure.
+ */
+ protected abstract Object doExecute(Object... arguments) throws Exception;
+
+ /**
+ * The execute() method invokes doExecute() until either:
+ * 1. doExecute() succeeds, or
+ * 2. the command may no longer be retried (e.g. runs out of retry-attempts).
+ * @param arguments The list of arguments for the command.
+ * @return Generic "Object" from doExecute(), on success.
+ * @throws IOException, IOException, on complete failure.
+ */
+ public Object execute(Object... arguments) throws Exception {
+ Exception latestException;
+ int counter = 0;
+ do {
+ try {
+ return doExecute(arguments);
+ } catch(Exception exception) {
+ LOG.error("Failure in Retriable command: " + description, exception);
+ latestException = exception;
+ }
+ counter++;
+ } while (retryPolicy.shouldRetry(latestException, counter, 0, true).equals(RetryPolicy.RetryAction.RETRY));
+
+ throw new IOException("Couldn't run retriable-command: " + description,
+ latestException);
+ }
+
+ /**
+ * Fluent-interface to change the RetryHandler.
+ * @param retryHandler The new RetryHandler instance to be used.
+ * @return Self.
+ */
+ public RetriableCommand setRetryPolicy(RetryPolicy retryHandler) {
+ this.retryPolicy = retryHandler;
+ return this;
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * The ThrottleInputStream provides bandwidth throttling on a specified
+ * InputStream. It is implemented as a wrapper on top of another InputStream
+ * instance.
+ * The throttling works by examining the number of bytes read from the underlying
+ * InputStream from the beginning, and sleep()ing for a time interval if
+ * the byte-transfer is found exceed the specified tolerable maximum.
+ * (Thus, while the read-rate might exceed the maximum for a given short interval,
+ * the average tends towards the specified maximum, overall.)
+ */
+public class ThrottledInputStream extends InputStream {
+
+ private final InputStream rawStream;
+ private final long maxBytesPerSec;
+ private final long startTime = System.currentTimeMillis();
+
+ private long bytesRead = 0;
+ private long totalSleepTime = 0;
+
+ private static final long SLEEP_DURATION_MS = 50;
+
+ public ThrottledInputStream(InputStream rawStream) {
+ this(rawStream, Long.MAX_VALUE);
+ }
+
+ public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
+ assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
+ this.rawStream = rawStream;
+ this.maxBytesPerSec = maxBytesPerSec;
+ }
+
+ /** @inheritDoc */
+ @Override
+ public int read() throws IOException {
+ throttle();
+ int data = rawStream.read();
+ if (data != -1) {
+ bytesRead++;
+ }
+ return data;
+ }
+
+ /** @inheritDoc */
+ @Override
+ public int read(byte[] b) throws IOException {
+ throttle();
+ int readLen = rawStream.read(b);
+ if (readLen != -1) {
+ bytesRead += readLen;
+ }
+ return readLen;
+ }
+
+ /** @inheritDoc */
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ throttle();
+ int readLen = rawStream.read(b, off, len);
+ if (readLen != -1) {
+ bytesRead += readLen;
+ }
+ return readLen;
+ }
+
+ private void throttle() throws IOException {
+ if (getBytesPerSec() > maxBytesPerSec) {
+ try {
+ Thread.sleep(SLEEP_DURATION_MS);
+ totalSleepTime += SLEEP_DURATION_MS;
+ } catch (InterruptedException e) {
+ throw new IOException("Thread aborted", e);
+ }
+ }
+ }
+
+ /**
+ * Getter for the number of bytes read from this stream, since creation.
+ * @return The number of bytes.
+ */
+ public long getTotalBytesRead() {
+ return bytesRead;
+ }
+
+ /**
+ * Getter for the read-rate from this stream, since creation.
+ * Calculated as bytesRead/elapsedTimeSinceStart.
+ * @return Read rate, in bytes/sec.
+ */
+ public long getBytesPerSec() {
+ long elapsed = (System.currentTimeMillis() - startTime) / 1000;
+ if (elapsed == 0) {
+ return bytesRead;
+ } else {
+ return bytesRead / elapsed;
+ }
+ }
+
+ /**
+ * Getter the total time spent in sleep.
+ * @return Number of milliseconds spent in sleep.
+ */
+ public long getTotalSleepTime() {
+ return totalSleepTime;
+ }
+
+ /** @inheritDoc */
+ @Override
+ public String toString() {
+ return "ThrottledInputStream{" +
+ "bytesRead=" + bytesRead +
+ ", maxBytesPerSec=" + maxBytesPerSec +
+ ", bytesPerSec=" + getBytesPerSec() +
+ ", totalSleepTime=" + totalSleepTime +
+ '}';
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Do not modify this file directly. Anything that need to be overwritten,
+ need to be done so, through -D switches or customized conf -->
+
+<configuration>
+
+ <property>
+ <name>distcp.dynamic.strategy.impl</name>
+ <value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
+ <description>Implementation of dynamic input format</description>
+ </property>
+
+ <property>
+ <name>distcp.static.strategy.impl</name>
+ <value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
+ <description>Implementation of static input format</description>
+ </property>
+
+ <property>
+ <name>mapred.job.map.memory.mb</name>
+ <value>1024</value>
+ </property>
+
+ <property>
+ <name>mapred.job.reduce.memory.mb</name>
+ <value>1024</value>
+ </property>
+
+ <property>
+ <name>mapred.reducer.new-api</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>mapreduce.reduce.class</name>
+ <value>org.apache.hadoop.mapreduce.Reducer</value>
+ </property>
+
+</configuration>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/fml/faq.fml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="ISO-8859-1" ?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<faqs xmlns="http://maven.apache.org/FML/1.0.1"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/FML/1.0.1 http://maven.apache.org/xsd/fml-1.0.1.xsd"
+ title="Frequently Asked Questions">
+ <part id="General">
+ <title>General</title>
+
+ <faq id="Update">
+ <question>Why does -update not create the parent source-directory under
+ a pre-existing target directory?</question>
+ <answer>The behaviour of <code>-update</code> and <code>-overwrite</code>
+ is described in detail in the Usage section of this document. In short,
+ if either option is used with a pre-existing destination directory, the
+ <strong>contents</strong> of each source directory is copied over, rather
+ than the source-directory itself.
+ This behaviour is consistent with the legacy DistCp implementation as well.
+ </answer>
+ </faq>
+
+ <faq id="Deviation">
+ <question>How does the new DistCp differ in semantics from the Legacy
+ DistCp?</question>
+ <answer>
+ <ul>
+ <li>Files that are skipped during copy used to also have their
+ file-attributes (permissions, owner/group info, etc.) unchanged,
+ when copied with Legacy DistCp. These are now updated, even if
+ the file-copy is skipped.</li>
+ <li>Empty root directories among the source-path inputs were not
+ created at the target, in Legacy DistCp. These are now created.</li>
+ </ul>
+ </answer>
+ </faq>
+
+ <faq id="nMaps">
+ <question>Why does the new DistCp use more maps than legacy DistCp?</question>
+ <answer>
+ <p>Legacy DistCp works by figuring out what files need to be actually
+ copied to target <strong>before</strong> the copy-job is launched, and then
+ launching as many maps as required for copy. So if a majority of the files
+ need to be skipped (because they already exist, for example), fewer maps
+ will be needed. As a consequence, the time spent in setup (i.e. before the
+ M/R job) is higher.</p>
+ <p>The new DistCp calculates only the contents of the source-paths. It
+ doesn't try to filter out what files can be skipped. That decision is put-
+ off till the M/R job runs. This is much faster (vis-a-vis execution-time),
+ but the number of maps launched will be as specified in the <code>-m</code>
+ option, or 20 (default) if unspecified.</p>
+ </answer>
+ </faq>
+
+ <faq id="more_maps">
+ <question>Why does DistCp not run faster when more maps are specified?</question>
+ <answer>
+ <p>At present, the smallest unit of work for DistCp is a file. i.e.,
+ a file is processed by only one map. Increasing the number of maps to
+ a value exceeding the number of files would yield no performance
+ benefit. The number of maps lauched would equal the number of files.</p>
+ </answer>
+ </faq>
+
+ <faq id="client_mem">
+ <question>Why does DistCp run out of memory?</question>
+ <answer>
+ <p>If the number of individual files/directories being copied from
+ the source path(s) is extremely large (e.g. 1,000,000 paths), DistCp might
+ run out of memory while determining the list of paths for copy. This is
+ not unique to the new DistCp implementation.</p>
+ <p>To get around this, consider changing the <code>-Xmx</code> JVM
+ heap-size parameters, as follows:</p>
+ <p><code>bash$ export HADOOP_CLIENT_OPTS="-Xms64m -Xmx1024m"</code></p>
+ <p><code>bash$ hadoop distcp /source /target</code></p>
+ </answer>
+ </faq>
+
+ </part>
+</faqs>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/pdf.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<!-- START SNIPPET: docDescriptor -->
+<document xmlns="http://maven.apache.org/DOCUMENT/1.0.1"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/DOCUMENT/1.0.1 http://maven.apache.org/xsd/document-1.0.1.xsd"
+ outputName="distcp">
+
+ <meta>
+ <title>${project.name}</title>
+ </meta>
+
+ <toc name="Table of Contents">
+ <item name="Introduction" ref="index.xml"/>
+ <item name="Usage" ref="usage.xml"/>
+ <item name="Command Line Reference" ref="cli.xml"/>
+ <item name="Architecture" ref="architecture.xml"/>
+ <item name="Appendix" ref="appendix.xml"/>
+ <item name="FAQ" ref="faq.fml"/>
+ </toc>
+ <cover>
+ <coverTitle>${project.name}</coverTitle>
+ <coverSubTitle>v. ${project.version}</coverSubTitle>
+ <coverType>User Guide</coverType>
+ <projectName>${project.name}</projectName>
+ <companyName>Apache Hadoop</companyName>
+ </cover>
+</document>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/appendix.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+ <head>
+ <title>Appendix</title>
+ </head>
+ <body>
+ <section name="Map sizing">
+
+ <p> By default, DistCp makes an attempt to size each map comparably so
+ that each copies roughly the same number of bytes. Note that files are the
+ finest level of granularity, so increasing the number of simultaneous
+ copiers (i.e. maps) may not always increase the number of
+ simultaneous copies nor the overall throughput.</p>
+
+ <p> The new DistCp also provides a strategy to "dynamically" size maps,
+ allowing faster data-nodes to copy more bytes than slower nodes. Using
+ <code>-strategy dynamic</code> (explained in the Architecture), rather
+ than to assign a fixed set of source-files to each map-task, files are
+ instead split into several sets. The number of sets exceeds the number of
+ maps, usually by a factor of 2-3. Each map picks up and copies all files
+ listed in a chunk. When a chunk is exhausted, a new chunk is acquired and
+ processed, until no more chunks remain.</p>
+
+ <p> By not assigning a source-path to a fixed map, faster map-tasks (i.e.
+ data-nodes) are able to consume more chunks, and thus copy more data,
+ than slower nodes. While this distribution isn't uniform, it is
+ <strong>fair</strong> with regard to each mapper's capacity.</p>
+
+ <p>The dynamic-strategy is implemented by the DynamicInputFormat. It
+ provides superior performance under most conditions. </p>
+
+ <p>Tuning the number of maps to the size of the source and
+ destination clusters, the size of the copy, and the available
+ bandwidth is recommended for long-running and regularly run jobs.</p>
+
+ </section>
+
+ <section name="Copying between versions of HDFS">
+
+ <p>For copying between two different versions of Hadoop, one will
+ usually use HftpFileSystem. This is a read-only FileSystem, so DistCp
+ must be run on the destination cluster (more specifically, on
+ TaskTrackers that can write to the destination cluster). Each source is
+ specified as <code>hftp://<dfs.http.address>/<path></code>
+ (the default <code>dfs.http.address</code> is
+ <namenode>:50070).</p>
+
+ </section>
+
+ <section name="Map/Reduce and other side-effects">
+
+ <p>As has been mentioned in the preceding, should a map fail to copy
+ one of its inputs, there will be several side-effects.</p>
+
+ <ul>
+
+ <li>Unless <code>-overwrite</code> is specified, files successfully
+ copied by a previous map on a re-execution will be marked as
+ "skipped".</li>
+
+ <li>If a map fails <code>mapred.map.max.attempts</code> times, the
+ remaining map tasks will be killed (unless <code>-i</code> is
+ set).</li>
+
+ <li>If <code>mapred.speculative.execution</code> is set set
+ <code>final</code> and <code>true</code>, the result of the copy is
+ undefined.</li>
+
+ </ul>
+
+ </section>
+
+ <section name="SSL Configurations for HSFTP sources:">
+
+ <p>To use an HSFTP source (i.e. using the hsftp protocol), a Map-Red SSL
+ configuration file needs to be specified (via the <code>-mapredSslConf</code>
+ option). This must specify 3 parameters:</p>
+
+ <ul>
+ <li><code>ssl.client.truststore.location</code>: The local-filesystem
+ location of the trust-store file, containing the certificate for
+ the namenode.</li>
+
+ <li><code>ssl.client.truststore.type</code>: (Optional) The format of
+ the trust-store file.</li>
+
+ <li><code>ssl.client.truststore.password</code>: (Optional) Password
+ for the trust-store file.</li>
+
+ </ul>
+
+ <p>The following is an example of the contents of the contents of
+ a Map-Red SSL Configuration file:</p>
+
+ <p> <br/> <code> <configuration> </code> </p>
+
+ <p> <br/> <code><property> </code> </p>
+ <p> <code><name>ssl.client.truststore.location</name> </code> </p>
+ <p> <code><value>/work/keystore.jks</value> </code> </p>
+ <p> <code><description>Truststore to be used by clients like distcp. Must be specified. </description></code> </p>
+ <p> <br/> <code></property> </code> </p>
+
+ <p><code> <property> </code> </p>
+ <p> <code><name>ssl.client.truststore.password</name> </code> </p>
+ <p> <code><value>changeme</value> </code> </p>
+ <p> <code><description>Optional. Default value is "". </description> </code> </p>
+ <p> <code></property> </code> </p>
+
+ <p> <br/> <code> <property> </code> </p>
+ <p> <code> <name>ssl.client.truststore.type</name></code> </p>
+ <p> <code> <value>jks</value></code> </p>
+ <p> <code> <description>Optional. Default value is "jks". </description></code> </p>
+ <p> <code> </property> </code> </p>
+
+ <p> <code> <br/> </configuration> </code> </p>
+
+ <p><br/>The SSL configuration file must be in the class-path of the
+ DistCp program.</p>
+
+ </section>
+
+ </body>
+</document>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/architecture.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+ <head>
+ <title>Architecture of DistCp</title>
+ </head>
+ <body>
+ <section name="Architecture">
+
+ <p>The components of the new DistCp may be classified into the following
+ categories: </p>
+
+ <ul>
+
+ <li>DistCp Driver</li>
+ <li>Copy-listing generator</li>
+ <li>Input-formats and Map-Reduce components</li>
+
+ </ul>
+
+ <subsection name="DistCp Driver">
+ <p>The DistCp Driver components are responsible for:</p>
+
+ <ul>
+ <li>Parsing the arguments passed to the DistCp command on the
+ command-line, via:
+ <ul>
+ <li>OptionsParser, and</li>
+ <li>DistCpOptionsSwitch</li>
+ </ul>
+ </li>
+ <li>Assembling the command arguments into an appropriate
+ DistCpOptions object, and initializing DistCp. These arguments
+ include:
+ <ul>
+ <li>Source-paths</li>
+ <li>Target location</li>
+ <li>Copy options (e.g. whether to update-copy, overwrite, which
+ file-attributes to preserve, etc.)</li>
+ </ul>
+ </li>
+ <li>Orchestrating the copy operation by:
+ <ul>
+ <li>Invoking the copy-listing-generator to create the list of
+ files to be copied.</li>
+ <li>Setting up and launching the Hadoop Map-Reduce Job to carry
+ out the copy.</li>
+ <li>Based on the options, either returning a handle to the
+ Hadoop MR Job immediately, or waiting till completion.</li>
+ </ul>
+ </li>
+ </ul>
+ <br/>
+
+ <p>The parser-elements are exercised only from the command-line (or if
+ DistCp::run() is invoked). The DistCp class may also be used
+ programmatically, by constructing the DistCpOptions object, and
+ initializing a DistCp object appropriately.</p>
+
+ </subsection>
+
+ <subsection name="Copy-listing generator">
+
+ <p>The copy-listing-generator classes are responsible for creating the
+ list of files/directories to be copied from source. They examine
+ the contents of the source-paths (files/directories, including
+ wild-cards), and record all paths that need copy into a sequence-
+ file, for consumption by the DistCp Hadoop Job. The main classes in
+ this module include:</p>
+
+ <ol>
+
+ <li>CopyListing: The interface that should be implemented by any
+ copy-listing-generator implementation. Also provides the factory
+ method by which the concrete CopyListing implementation is
+ chosen.</li>
+
+ <li>SimpleCopyListing: An implementation of CopyListing that accepts
+ multiple source paths (files/directories), and recursively lists
+ all the individual files and directories under each, for
+ copy.</li>
+
+ <li>GlobbedCopyListing: Another implementation of CopyListing that
+ expands wild-cards in the source paths.</li>
+
+ <li>FileBasedCopyListing: An implementation of CopyListing that
+ reads the source-path list from a specified file.</li>
+
+ </ol>
+ <p/>
+
+ <p>Based on whether a source-file-list is specified in the
+ DistCpOptions, the source-listing is generated in one of the
+ following ways:</p>
+
+ <ol>
+
+ <li>If there's no source-file-list, the GlobbedCopyListing is used.
+ All wild-cards are expanded, and all the expansions are
+ forwarded to the SimpleCopyListing, which in turn constructs the
+ listing (via recursive descent of each path). </li>
+
+ <li>If a source-file-list is specified, the FileBasedCopyListing is
+ used. Source-paths are read from the specified file, and then
+ forwarded to the GlobbedCopyListing. The listing is then
+ constructed as described above.</li>
+
+ </ol>
+
+ <br/>
+
+ <p>One may customize the method by which the copy-listing is
+ constructed by providing a custom implementation of the CopyListing
+ interface. The behaviour of DistCp differs here from the legacy
+ DistCp, in how paths are considered for copy. </p>
+
+ <p>The legacy implementation only lists those paths that must
+ definitely be copied on to target.
+ E.g. if a file already exists at the target (and -overwrite isn't
+ specified), the file isn't even considered in the Map-Reduce Copy
+ Job. Determining this during setup (i.e. before the Map-Reduce Job)
+ involves file-size and checksum-comparisons that are potentially
+ time-consuming.</p>
+
+ <p>The new DistCp postpones such checks until the Map-Reduce Job, thus
+ reducing setup time. Performance is enhanced further since these
+ checks are parallelized across multiple maps.</p>
+
+ </subsection>
+
+ <subsection name="Input-formats and Map-Reduce components">
+
+ <p> The Input-formats and Map-Reduce components are responsible for
+ the actual copy of files and directories from the source to the
+ destination path. The listing-file created during copy-listing
+ generation is consumed at this point, when the copy is carried
+ out. The classes of interest here include:</p>
+
+ <ul>
+ <li><strong>UniformSizeInputFormat:</strong> This implementation of
+ org.apache.hadoop.mapreduce.InputFormat provides equivalence
+ with Legacy DistCp in balancing load across maps.
+ The aim of the UniformSizeInputFormat is to make each map copy
+ roughly the same number of bytes. Apropos, the listing file is
+ split into groups of paths, such that the sum of file-sizes in
+ each InputSplit is nearly equal to every other map. The splitting
+ isn't always perfect, but its trivial implementation keeps the
+ setup-time low.</li>
+
+ <li><strong>DynamicInputFormat and DynamicRecordReader:</strong>
+ <p> The DynamicInputFormat implements org.apache.hadoop.mapreduce.InputFormat,
+ and is new to DistCp. The listing-file is split into several
+ "chunk-files", the exact number of chunk-files being a multiple
+ of the number of maps requested for in the Hadoop Job. Each map
+ task is "assigned" one of the chunk-files (by renaming the chunk
+ to the task's id), before the Job is launched.</p>
+
+ <p>Paths are read from each chunk using the DynamicRecordReader,
+ and processed in the CopyMapper. After all the paths in a chunk
+ are processed, the current chunk is deleted and a new chunk is
+ acquired. The process continues until no more chunks are
+ available.</p>
+ <p>This "dynamic" approach allows faster map-tasks to consume
+ more paths than slower ones, thus speeding up the DistCp job
+ overall. </p>
+ </li>
+
+ <li><strong>CopyMapper:</strong> This class implements the physical
+ file-copy. The input-paths are checked against the input-options
+ (specified in the Job's Configuration), to determine whether a
+ file needs copy. A file will be copied only if at least one of
+ the following is true:
+ <ul>
+ <li>A file with the same name doesn't exist at target.</li>
+ <li>A file with the same name exists at target, but has a
+ different file size.</li>
+ <li>A file with the same name exists at target, but has a
+ different checksum, and -skipcrccheck isn't mentioned.</li>
+ <li>A file with the same name exists at target, but -overwrite
+ is specified.</li>
+ <li>A file with the same name exists at target, but differs in
+ block-size (and block-size needs to be preserved.</li>
+ </ul>
+ </li>
+
+ <li><strong>CopyCommitter:</strong>
+ This class is responsible for the commit-phase of the DistCp
+ job, including:
+ <ul>
+ <li>Preservation of directory-permissions (if specified in the
+ options)</li>
+ <li>Clean-up of temporary-files, work-directories, etc.</li>
+ </ul>
+ </li>
+ </ul>
+ </subsection>
+ </section>
+ </body>
+</document>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/cli.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+ <head>
+ <title>Command Line Options</title>
+ </head>
+ <body>
+ <section name="Options Index">
+ <table>
+ <tr><th> Flag </th><th> Description </th><th> Notes </th></tr>
+
+ <tr><td><code>-p[rbugp]</code></td>
+ <td>Preserve<br/>
+ r: replication number<br/>
+ b: block size<br/>
+ u: user<br/>
+ g: group<br/>
+ p: permission<br/></td>
+ <td>Modification times are not preserved. Also, when
+ <code>-update</code> is specified, status updates will
+ <strong>not</strong> be synchronized unless the file sizes
+ also differ (i.e. unless the file is re-created).
+ </td></tr>
+ <tr><td><code>-i</code></td>
+ <td>Ignore failures</td>
+ <td>As explained in the Appendix, this option
+ will keep more accurate statistics about the copy than the
+ default case. It also preserves logs from failed copies, which
+ can be valuable for debugging. Finally, a failing map will not
+ cause the job to fail before all splits are attempted.
+ </td></tr>
+ <tr><td><code>-log <logdir></code></td>
+ <td>Write logs to <logdir></td>
+ <td>DistCp keeps logs of each file it attempts to copy as map
+ output. If a map fails, the log output will not be retained if
+ it is re-executed.
+ </td></tr>
+ <tr><td><code>-m <num_maps></code></td>
+ <td>Maximum number of simultaneous copies</td>
+ <td>Specify the number of maps to copy data. Note that more maps
+ may not necessarily improve throughput.
+ </td></tr>
+ <tr><td><code>-overwrite</code></td>
+ <td>Overwrite destination</td>
+ <td>If a map fails and <code>-i</code> is not specified, all the
+ files in the split, not only those that failed, will be recopied.
+ As discussed in the Usage documentation, it also changes
+ the semantics for generating destination paths, so users should
+ use this carefully.
+ </td></tr>
+ <tr><td><code>-update</code></td>
+ <td>Overwrite if src size different from dst size</td>
+ <td>As noted in the preceding, this is not a "sync"
+ operation. The only criterion examined is the source and
+ destination file sizes; if they differ, the source file
+ replaces the destination file. As discussed in the
+ Usage documentation, it also changes the semantics for
+ generating destination paths, so users should use this carefully.
+ </td></tr>
+ <tr><td><code>-f <urilist_uri></code></td>
+ <td>Use list at <urilist_uri> as src list</td>
+ <td>This is equivalent to listing each source on the command
+ line. The <code>urilist_uri</code> list should be a fully
+ qualified URI.
+ </td></tr>
+ <tr><td><code>-filelimit <n></code></td>
+ <td>Limit the total number of files to be <= n</td>
+ <td><strong>Deprecated!</strong> Ignored in the new DistCp.
+ </td></tr>
+ <tr><td><code>-sizelimit <n></code></td>
+ <td>Limit the total size to be <= n bytes</td>
+ <td><strong>Deprecated!</strong> Ignored in the new DistCp.
+ </td></tr>
+ <tr><td><code>-delete</code></td>
+ <td>Delete the files existing in the dst but not in src</td>
+ <td>The deletion is done by FS Shell. So the trash will be used,
+ if it is enable.
+ </td></tr>
+ <tr><td><code>-strategy {dynamic|uniformsize}</code></td>
+ <td>Choose the copy-strategy to be used in DistCp.</td>
+ <td>By default, uniformsize is used. (i.e. Maps are balanced on the
+ total size of files copied by each map. Similar to legacy.)
+ If "dynamic" is specified, <code>DynamicInputFormat</code> is
+ used instead. (This is described in the Architecture section,
+ under InputFormats.)
+ </td></tr>
+ <tr><td><code>-bandwidth</code></td>
+ <td>Specify bandwidth per map, in MB/second.</td>
+ <td>Each map will be restricted to consume only the specified
+ bandwidth. This is not always exact. The map throttles back
+ its bandwidth consumption during a copy, such that the
+ <strong>net</strong> bandwidth used tends towards the
+ specified value.
+ </td></tr>
+ <tr><td><code>-atomic {-tmp <tmp_dir>}</code></td>
+ <td>Specify atomic commit, with optional tmp directory.</td>
+ <td><code>-atomic</code> instructs DistCp to copy the source
+ data to a temporary target location, and then move the
+ temporary target to the final-location atomically. Data will
+ either be available at final target in a complete and consistent
+ form, or not at all.
+ Optionally, <code>-tmp</code> may be used to specify the
+ location of the tmp-target. If not specified, a default is
+ chosen. <strong>Note:</strong> tmp_dir must be on the final
+ target cluster.
+ </td></tr>
+ <tr><td><code>-mapredSslConf <ssl_conf_file></code></td>
+ <td>Specify SSL Config file, to be used with HSFTP source</td>
+ <td>When using the hsftp protocol with a source, the security-
+ related properties may be specified in a config-file and
+ passed to DistCp. <ssl_conf_file> needs to be in
+ the classpath.
+ </td></tr>
+ <tr><td><code>-async</code></td>
+ <td>Run DistCp asynchronously. Quits as soon as the Hadoop
+ Job is launched.</td>
+ <td>The Hadoop Job-id is logged, for tracking.
+ </td></tr>
+ </table>
+ </section>
+ </body>
+</document>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/index.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+ <head>
+ <title>DistCp</title>
+ </head>
+ <body>
+ <section name="Overview">
+ <p>
+ DistCp (distributed copy) is a tool used for large inter/intra-cluster
+ copying. It uses Map/Reduce to effect its distribution, error
+ handling and recovery, and reporting. It expands a list of files and
+ directories into input to map tasks, each of which will copy a partition
+ of the files specified in the source list.
+ </p>
+ <p>
+ The erstwhile implementation of DistCp has its share of quirks and
+ drawbacks, both in its usage, as well as its extensibility and
+ performance. The purpose of the DistCp refactor was to fix these shortcomings,
+ enabling it to be used and extended programmatically. New paradigms have
+ been introduced to improve runtime and setup performance, while simultaneously
+ retaining the legacy behaviour as default.
+ </p>
+ <p>
+ This document aims to describe the design of the new DistCp, its spanking
+ new features, their optimal use, and any deviance from the legacy
+ implementation.
+ </p>
+ </section>
+ </body>
+</document>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/site/xdoc/usage.xml Thu Jan 26 06:36:52 2012
@@ -0,0 +1,147 @@
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+ <head>
+ <title>Usage </title>
+ </head>
+ <body>
+ <section name="Basic Usage">
+ <p>The most common invocation of DistCp is an inter-cluster copy:</p>
+ <p><code>bash$ hadoop jar hadoop-distcp.jar hdfs://nn1:8020/foo/bar \</code><br/>
+ <code> hdfs://nn2:8020/bar/foo</code></p>
+
+ <p>This will expand the namespace under <code>/foo/bar</code> on nn1
+ into a temporary file, partition its contents among a set of map
+ tasks, and start a copy on each TaskTracker from nn1 to nn2.</p>
+
+ <p>One can also specify multiple source directories on the command
+ line:</p>
+ <p><code>bash$ hadoop jar hadoop-distcp.jar hdfs://nn1:8020/foo/a \</code><br/>
+ <code> hdfs://nn1:8020/foo/b \</code><br/>
+ <code> hdfs://nn2:8020/bar/foo</code></p>
+
+ <p>Or, equivalently, from a file using the <code>-f</code> option:<br/>
+ <code>bash$ hadoop jar hadoop-distcp.jar -f hdfs://nn1:8020/srclist \</code><br/>
+ <code> hdfs://nn2:8020/bar/foo</code><br/></p>
+
+ <p>Where <code>srclist</code> contains<br/>
+ <code>hdfs://nn1:8020/foo/a</code><br/>
+ <code>hdfs://nn1:8020/foo/b</code></p>
+
+ <p>When copying from multiple sources, DistCp will abort the copy with
+ an error message if two sources collide, but collisions at the
+ destination are resolved per the <a href="#options">options</a>
+ specified. By default, files already existing at the destination are
+ skipped (i.e. not replaced by the source file). A count of skipped
+ files is reported at the end of each job, but it may be inaccurate if a
+ copier failed for some subset of its files, but succeeded on a later
+ attempt.</p>
+
+ <p>It is important that each TaskTracker can reach and communicate with
+ both the source and destination file systems. For HDFS, both the source
+ and destination must be running the same version of the protocol or use
+ a backwards-compatible protocol (see <a href="#cpver">Copying Between
+ Versions</a>).</p>
+
+ <p>After a copy, it is recommended that one generates and cross-checks
+ a listing of the source and destination to verify that the copy was
+ truly successful. Since DistCp employs both Map/Reduce and the
+ FileSystem API, issues in or between any of the three could adversely
+ and silently affect the copy. Some have had success running with
+ <code>-update</code> enabled to perform a second pass, but users should
+ be acquainted with its semantics before attempting this.</p>
+
+ <p>It's also worth noting that if another client is still writing to a
+ source file, the copy will likely fail. Attempting to overwrite a file
+ being written at the destination should also fail on HDFS. If a source
+ file is (re)moved before it is copied, the copy will fail with a
+ FileNotFoundException.</p>
+
+ <p>Please refer to the detailed Command Line Reference for information
+ on all the options available in DistCp.</p>
+
+ </section>
+ <section name="Update and Overwrite">
+
+ <p><code>-update</code> is used to copy files from source that don't
+ exist at the target, or have different contents. <code>-overwrite</code>
+ overwrites target-files even if they exist at the source, or have the
+ same contents.</p>
+
+ <p><br/>Update and Overwrite options warrant special attention, since their
+ handling of source-paths varies from the defaults in a very subtle manner.
+ Consider a copy from <code>/source/first/</code> and
+ <code>/source/second/</code> to <code>/target/</code>, where the source
+ paths have the following contents:</p>
+
+ <p><code>hdfs://nn1:8020/source/first/1</code><br/>
+ <code>hdfs://nn1:8020/source/first/2</code><br/>
+ <code>hdfs://nn1:8020/source/second/10</code><br/>
+ <code>hdfs://nn1:8020/source/second/20</code><br/></p>
+
+ <p><br/>When DistCp is invoked without <code>-update</code> or
+ <code>-overwrite</code>, the DistCp defaults would create directories
+ <code>first/</code> and <code>second/</code>, under <code>/target</code>.
+ Thus:<br/></p>
+
+ <p><code>distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target</code></p>
+ <p><br/>would yield the following contents in <code>/target</code>: </p>
+
+ <p><code>hdfs://nn2:8020/target/first/1</code><br/>
+ <code>hdfs://nn2:8020/target/first/2</code><br/>
+ <code>hdfs://nn2:8020/target/second/10</code><br/>
+ <code>hdfs://nn2:8020/target/second/20</code><br/></p>
+
+ <p><br/>When either <code>-update</code> or <code>-overwrite</code> is
+ specified, the <strong>contents</strong> of the source-directories
+ are copied to target, and not the source directories themselves. Thus: </p>
+
+ <p><code>distcp -update hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target</code></p>
+
+ <p><br/>would yield the following contents in <code>/target</code>: </p>
+
+ <p><code>hdfs://nn2:8020/target/1</code><br/>
+ <code>hdfs://nn2:8020/target/2</code><br/>
+ <code>hdfs://nn2:8020/target/10</code><br/>
+ <code>hdfs://nn2:8020/target/20</code><br/></p>
+
+ <p><br/>By extension, if both source folders contained a file with the same
+ name (say, <code>0</code>), then both sources would map an entry to
+ <code>/target/0</code> at the destination. Rather than to permit this
+ conflict, DistCp will abort.</p>
+
+ <p><br/>Now, consider the following copy operation:</p>
+
+ <p><code>distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target</code></p>
+
+ <p><br/>With sources/sizes:</p>
+
+ <p><code>hdfs://nn1:8020/source/first/1 32</code><br/>
+ <code>hdfs://nn1:8020/source/first/2 32</code><br/>
+ <code>hdfs://nn1:8020/source/second/10 64</code><br/>
+ <code>hdfs://nn1:8020/source/second/20 32</code><br/></p>
+
+ <p><br/>And destination/sizes:</p>
+
+ <p><code>hdfs://nn2:8020/target/1 32</code><br/>
+ <code>hdfs://nn2:8020/target/10 32</code><br/>
+ <code>hdfs://nn2:8020/target/20 64</code><br/></p>
+
+ <p><br/>Will effect: </p>
+
+ <p><code>hdfs://nn2:8020/target/1 32</code><br/>
+ <code>hdfs://nn2:8020/target/2 32</code><br/>
+ <code>hdfs://nn2:8020/target/10 64</code><br/>
+ <code>hdfs://nn2:8020/target/20 32</code><br/></p>
+
+ <p><br/><code>1</code> is skipped because the file-length and contents match.
+ <code>2</code> is copied because it doesn't exist at the target.
+ <code>10</code> and <code>20</code> are overwritten since the contents
+ don't match the source. </p>
+
+ <p>If <code>-update</code> is used, <code>1</code> is overwritten as well.</p>
+
+ </section>
+ </body>
+
+</document>
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+
+public class StubContext {
+
+ private StubStatusReporter reporter = new StubStatusReporter();
+ private RecordReader<Text, FileStatus> reader;
+ private StubInMemoryWriter writer = new StubInMemoryWriter();
+ private Mapper<Text, FileStatus, Text, Text>.Context mapperContext;
+
+ public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
+ int taskId) throws IOException, InterruptedException {
+
+ WrappedMapper<Text, FileStatus, Text, Text> wrappedMapper
+ = new WrappedMapper<Text, FileStatus, Text, Text>();
+
+ MapContextImpl<Text, FileStatus, Text, Text> contextImpl
+ = new MapContextImpl<Text, FileStatus, Text, Text>(conf,
+ getTaskAttemptID(taskId), reader, writer,
+ null, reporter, null);
+
+ this.reader = reader;
+ this.mapperContext = wrappedMapper.getMapContext(contextImpl);
+ }
+
+ public Mapper<Text, FileStatus, Text, Text>.Context getContext() {
+ return mapperContext;
+ }
+
+ public StatusReporter getReporter() {
+ return reporter;
+ }
+
+ public RecordReader<Text, FileStatus> getReader() {
+ return reader;
+ }
+
+ public StubInMemoryWriter getWriter() {
+ return writer;
+ }
+
+ public static class StubStatusReporter extends StatusReporter {
+
+ private Counters counters = new Counters();
+
+ public StubStatusReporter() {
+ /*
+ final CounterGroup counterGroup
+ = new CounterGroup("FileInputFormatCounters",
+ "FileInputFormatCounters");
+ counterGroup.addCounter(new Counter("BYTES_READ",
+ "BYTES_READ",
+ 0));
+ counters.addGroup(counterGroup);
+ */
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return counters.findCounter(name);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return counters.findCounter(group, name);
+ }
+
+ @Override
+ public void progress() {}
+
+ @Override
+ public float getProgress() {
+ return 0F;
+ }
+
+ @Override
+ public void setStatus(String status) {}
+ }
+
+
+ public static class StubInMemoryWriter extends RecordWriter<Text, Text> {
+
+ List<Text> keys = new ArrayList<Text>();
+
+ List<Text> values = new ArrayList<Text>();
+
+ @Override
+ public void write(Text key, Text value) throws IOException, InterruptedException {
+ keys.add(key);
+ values.add(value);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ }
+
+ public List<Text> keys() {
+ return keys;
+ }
+
+ public List<Text> values() {
+ return values;
+ }
+
+ }
+
+ public static TaskAttemptID getTaskAttemptID(int taskId) {
+ return new TaskAttemptID("", 0, TaskType.MAP, taskId, 0);
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.util.TestDistCpUtils;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.ArrayList;
+
+public class TestCopyListing extends SimpleCopyListing {
+ private static final Log LOG = LogFactory.getLog(TestCopyListing.class);
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+ private static final Configuration config = new Configuration();
+ private static MiniDFSCluster cluster;
+
+ @BeforeClass
+ public static void create() throws IOException {
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
+ .build();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ public TestCopyListing() {
+ super(config, CREDENTIALS);
+ }
+
+ protected TestCopyListing(Configuration configuration) {
+ super(configuration, CREDENTIALS);
+ }
+
+ @Override
+ protected long getBytesToCopy() {
+ return 0;
+ }
+
+ @Override
+ protected long getNumberOfPaths() {
+ return 0;
+ }
+
+ @Test
+ public void testMultipleSrcToFile() {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(getConf());
+ List<Path> srcPaths = new ArrayList<Path>();
+ srcPaths.add(new Path("/tmp/in/1"));
+ srcPaths.add(new Path("/tmp/in/2"));
+ Path target = new Path("/tmp/out/1");
+ TestDistCpUtils.createFile(fs, "/tmp/in/1");
+ TestDistCpUtils.createFile(fs, "/tmp/in/2");
+ fs.mkdirs(target);
+ DistCpOptions options = new DistCpOptions(srcPaths, target);
+ validatePaths(options);
+ TestDistCpUtils.delete(fs, "/tmp");
+ //No errors
+
+ target = new Path("/tmp/out/1");
+ fs.create(target).close();
+ options = new DistCpOptions(srcPaths, target);
+ try {
+ validatePaths(options);
+ Assert.fail("Invalid inputs accepted");
+ } catch (InvalidInputException ignore) { }
+ TestDistCpUtils.delete(fs, "/tmp");
+
+ srcPaths.clear();
+ srcPaths.add(new Path("/tmp/in/1"));
+ fs.mkdirs(new Path("/tmp/in/1"));
+ target = new Path("/tmp/out/1");
+ fs.create(target).close();
+ options = new DistCpOptions(srcPaths, target);
+ try {
+ validatePaths(options);
+ Assert.fail("Invalid inputs accepted");
+ } catch (InvalidInputException ignore) { }
+ TestDistCpUtils.delete(fs, "/tmp");
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test input validation failed");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testDuplicates() {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(getConf());
+ List<Path> srcPaths = new ArrayList<Path>();
+ srcPaths.add(new Path("/tmp/in/*/*"));
+ TestDistCpUtils.createFile(fs, "/tmp/in/1.txt");
+ TestDistCpUtils.createFile(fs, "/tmp/in/src/1.txt");
+ Path target = new Path("/tmp/out");
+ Path listingFile = new Path("/tmp/list");
+ DistCpOptions options = new DistCpOptions(srcPaths, target);
+ CopyListing listing = CopyListing.getCopyListing(getConf(), CREDENTIALS, options);
+ try {
+ listing.buildListing(listingFile, options);
+ Assert.fail("Duplicates not detected");
+ } catch (DuplicateFileException ignore) {
+ }
+ } catch (IOException e) {
+ LOG.error("Exception encountered in test", e);
+ Assert.fail("Test failed " + e.getMessage());
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testBuildListing() {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(getConf());
+ List<Path> srcPaths = new ArrayList<Path>();
+ Path p1 = new Path("/tmp/in/1");
+ Path p2 = new Path("/tmp/in/2");
+ Path p3 = new Path("/tmp/in2/2");
+ Path target = new Path("/tmp/out/1");
+ srcPaths.add(p1.getParent());
+ srcPaths.add(p3.getParent());
+ TestDistCpUtils.createFile(fs, "/tmp/in/1");
+ TestDistCpUtils.createFile(fs, "/tmp/in/2");
+ TestDistCpUtils.createFile(fs, "/tmp/in2/2");
+ fs.mkdirs(target);
+ OutputStream out = fs.create(p1);
+ out.write("ABC".getBytes());
+ out.close();
+
+ out = fs.create(p2);
+ out.write("DEF".getBytes());
+ out.close();
+
+ out = fs.create(p3);
+ out.write("GHIJ".getBytes());
+ out.close();
+
+ Path listingFile = new Path("/tmp/file");
+
+ DistCpOptions options = new DistCpOptions(srcPaths, target);
+ options.setSyncFolder(true);
+ CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+ try {
+ listing.buildListing(listingFile, options);
+ Assert.fail("Duplicates not detected");
+ } catch (DuplicateFileException ignore) {
+ }
+ Assert.assertEquals(listing.getBytesToCopy(), 10);
+ Assert.assertEquals(listing.getNumberOfPaths(), 3);
+ TestDistCpUtils.delete(fs, "/tmp");
+
+ try {
+ listing.buildListing(listingFile, options);
+ Assert.fail("Invalid input not detected");
+ } catch (InvalidInputException ignore) {
+ }
+ TestDistCpUtils.delete(fs, "/tmp");
+ } catch (IOException e) {
+ LOG.error("Exception encountered ", e);
+ Assert.fail("Test build listing failed");
+ } finally {
+ TestDistCpUtils.delete(fs, "/tmp");
+ }
+ }
+
+ @Test
+ public void testBuildListingForSingleFile() {
+ FileSystem fs = null;
+ String testRootString = "/singleFileListing";
+ Path testRoot = new Path(testRootString);
+ SequenceFile.Reader reader = null;
+ try {
+ fs = FileSystem.get(getConf());
+ if (fs.exists(testRoot))
+ TestDistCpUtils.delete(fs, testRootString);
+
+ Path sourceFile = new Path(testRoot, "/source/foo/bar/source.txt");
+ Path decoyFile = new Path(testRoot, "/target/moo/source.txt");
+ Path targetFile = new Path(testRoot, "/target/moo/target.txt");
+
+ TestDistCpUtils.createFile(fs, sourceFile.toString());
+ TestDistCpUtils.createFile(fs, decoyFile.toString());
+ TestDistCpUtils.createFile(fs, targetFile.toString());
+
+ List<Path> srcPaths = new ArrayList<Path>();
+ srcPaths.add(sourceFile);
+
+ DistCpOptions options = new DistCpOptions(srcPaths, targetFile);
+ CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+
+ final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
+ listing.buildListing(listFile, options);
+
+ reader = new SequenceFile.Reader(fs, listFile, getConf());
+ FileStatus fileStatus = new FileStatus();
+ Text relativePath = new Text();
+ Assert.assertTrue(reader.next(relativePath, fileStatus));
+ Assert.assertTrue(relativePath.toString().equals(""));
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception encountered.");
+ LOG.error("Unexpected exception: ", e);
+ }
+ finally {
+ TestDistCpUtils.delete(fs, testRootString);
+ IOUtils.closeStream(reader);
+ }
+ }
+}