You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:36:54 UTC

svn commit: r1236045 [2/5] - in /hadoop/common/trunk: hadoop-project/ hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/ hadoop-tools/hadoop-distcp/src/main/ hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/hadoop-distcp/sr...

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.*;
+import java.util.Stack;
+
+/**
+ * The SimpleCopyListing is responsible for making the exhaustive list of
+ * all files/directories under its specified list of input-paths.
+ * These are written into the specified copy-listing file.
+ * Note: The SimpleCopyListing doesn't handle wild-cards in the input-paths.
+ */
+public class SimpleCopyListing extends CopyListing {
+  private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
+
+  private long totalPaths = 0;
+  private long totalBytesToCopy = 0;
+
+  /**
+   * Protected constructor, to initialize configuration.
+   *
+   * @param configuration The input configuration, with which the source/target FileSystems may be accessed.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  protected SimpleCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+  }
+
+  @Override
+  protected void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException {
+
+    Path targetPath = options.getTargetPath();
+    FileSystem targetFS = targetPath.getFileSystem(getConf());
+    boolean targetIsFile = targetFS.isFile(targetPath);
+
+    //If target is a file, then source has to be single file
+    if (targetIsFile) {
+      if (options.getSourcePaths().size() > 1) {
+        throw new InvalidInputException("Multiple source being copied to a file: " +
+            targetPath);
+      }
+
+      Path srcPath = options.getSourcePaths().get(0);
+      FileSystem sourceFS = srcPath.getFileSystem(getConf());
+      if (!sourceFS.isFile(srcPath)) {
+        throw new InvalidInputException("Cannot copy " + srcPath +
+            ", which is not a file to " + targetPath);
+      }
+    }
+
+    if (options.shouldAtomicCommit() && targetFS.exists(targetPath)) {
+      throw new InvalidInputException("Target path for atomic-commit already exists: " +
+        targetPath + ". Cannot atomic-commit to pre-existing target-path.");
+    }
+
+    for (Path path: options.getSourcePaths()) {
+      FileSystem fs = path.getFileSystem(getConf());
+      if (!fs.exists(path)) {
+        throw new InvalidInputException(path + " doesn't exist");
+      }
+    }
+
+    /* This is requires to allow map tasks to access each of the source
+       clusters. This would retrieve the delegation token for each unique
+       file system and add them to job's private credential store
+     */
+    Credentials credentials = getCredentials();
+    if (credentials != null) {
+      Path[] inputPaths = options.getSourcePaths().toArray(new Path[1]);
+      TokenCache.obtainTokensForNamenodes(credentials, inputPaths, getConf());
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
+
+    SequenceFile.Writer fileListWriter = null;
+
+    try {
+      fileListWriter = getWriter(pathToListingFile);
+
+      for (Path path: options.getSourcePaths()) {
+        FileSystem sourceFS = path.getFileSystem(getConf());
+        path = makeQualified(path);
+
+        FileStatus rootStatus = sourceFS.getFileStatus(path);
+        Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
+        boolean localFile = (rootStatus.getClass() != FileStatus.class);
+
+        FileStatus[] sourceFiles = sourceFS.listStatus(path);
+        if (sourceFiles != null && sourceFiles.length > 0) {
+          for (FileStatus sourceStatus: sourceFiles) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
+            }
+            writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot, localFile);
+
+            if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath());
+              }
+              traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, localFile);
+            }
+          }
+        } else {
+          writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, localFile);
+        }
+      }
+    } finally {
+      IOUtils.closeStream(fileListWriter);
+    }
+  }
+
+  private Path computeSourceRootPath(FileStatus sourceStatus,
+                                     DistCpOptions options) throws IOException {
+
+    Path target = options.getTargetPath();
+    FileSystem targetFS = target.getFileSystem(getConf());
+
+    boolean solitaryFile = options.getSourcePaths().size() == 1
+                                                && !sourceStatus.isDirectory();
+
+    if (solitaryFile) {
+      if (targetFS.isFile(target) || !targetFS.exists(target)) {
+        return sourceStatus.getPath();
+      } else {
+        return sourceStatus.getPath().getParent();
+      }
+    } else {
+      boolean specialHandling = (options.getSourcePaths().size() == 1 && !targetFS.exists(target)) ||
+          options.shouldSyncFolder() || options.shouldOverwrite();
+
+      return specialHandling && sourceStatus.isDirectory() ? sourceStatus.getPath() :
+          sourceStatus.getPath().getParent();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getBytesToCopy() {
+    return totalBytesToCopy;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getNumberOfPaths() {
+    return totalPaths;
+  }
+
+  private Path makeQualified(Path path) throws IOException {
+    final FileSystem fs = path.getFileSystem(getConf());
+    return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+
+  private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
+    FileSystem fs = pathToListFile.getFileSystem(getConf());
+    if (fs.exists(pathToListFile)) {
+      fs.delete(pathToListFile, false);
+    }
+    return SequenceFile.createWriter(getConf(),
+            SequenceFile.Writer.file(pathToListFile),
+            SequenceFile.Writer.keyClass(Text.class),
+            SequenceFile.Writer.valueClass(FileStatus.class),
+            SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
+  }
+
+  private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem,
+                                    FileStatus fileStatus) throws IOException {
+    return fileStatus.isDirectory() && getChildren(fileSystem, fileStatus).length > 0;
+  }
+
+  private static FileStatus[] getChildren(FileSystem fileSystem,
+                                         FileStatus parent) throws IOException {
+    return fileSystem.listStatus(parent.getPath());
+  }
+
+  private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
+                                         FileStatus sourceStatus,
+                                         Path sourcePathRoot, boolean localFile)
+                                         throws IOException {
+    FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
+    Stack<FileStatus> pathStack = new Stack<FileStatus>();
+    pathStack.push(sourceStatus);
+
+    while (!pathStack.isEmpty()) {
+      for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Recording source-path: "
+                    + sourceStatus.getPath() + " for copy.");
+        writeToFileListing(fileListWriter, child, sourcePathRoot, localFile);
+        if (isDirectoryAndNotEmpty(sourceFS, child)) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Traversing non-empty source dir: "
+                       + sourceStatus.getPath());
+          pathStack.push(child);
+        }
+      }
+    }
+  }
+
+  private void writeToFileListing(SequenceFile.Writer fileListWriter,
+                                  FileStatus fileStatus, Path sourcePathRoot,
+                                  boolean localFile) throws IOException {
+    if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDirectory())
+      return; // Skip the root-paths.
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
+        fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
+    }
+
+    FileStatus status = fileStatus;
+    if (localFile) {
+      status = getFileStatus(fileStatus);
+    }
+
+    fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
+        fileStatus.getPath())), status);
+    fileListWriter.sync();
+
+    if (!fileStatus.isDirectory()) {
+      totalBytesToCopy += fileStatus.getLen();
+    }
+    totalPaths++;
+  }
+
+  private static final ByteArrayOutputStream buffer = new ByteArrayOutputStream(64);
+  private DataInputBuffer in = new DataInputBuffer();
+  
+  private FileStatus getFileStatus(FileStatus fileStatus) throws IOException {
+    FileStatus status = new FileStatus();
+
+    buffer.reset();
+    DataOutputStream out = new DataOutputStream(buffer);
+    fileStatus.write(out);
+
+    in.reset(buffer.toByteArray(), 0, buffer.size());
+    status.readFields(in);
+    return status;
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.tools.*;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.util.DistCpUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+/**
+ * The CopyCommitter class is DistCp's OutputCommitter implementation. It is
+ * responsible for handling the completion/cleanup of the DistCp run.
+ * Specifically, it does the following:
+ *  1. Cleanup of the meta-folder (where DistCp maintains its file-list, etc.)
+ *  2. Preservation of user/group/replication-factor on any directories that
+ *     have been copied. (Files are taken care of in their map-tasks.)
+ *  3. Atomic-move of data from the temporary work-folder to the final path
+ *     (if atomic-commit was opted for).
+ *  4. Deletion of files from the target that are missing at source (if opted for).
+ *  5. Cleanup of any partially copied files, from previous, failed attempts.
+ */
+public class CopyCommitter extends FileOutputCommitter {
+  private static final Log LOG = LogFactory.getLog(CopyCommitter.class);
+
+  private final TaskAttemptContext taskAttemptContext;
+
+  /**
+   * Create a output committer
+   *
+   * @param outputPath the job's output path
+   * @param context    the task's context
+   * @throws IOException - Exception if any
+   */
+  public CopyCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    this.taskAttemptContext = context;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    Configuration conf = jobContext.getConfiguration();
+    super.commitJob(jobContext);
+
+    cleanupTempFiles(jobContext);
+
+    String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+    if (attributes != null && !attributes.isEmpty()) {
+      preserveFileAttributesForDirectories(conf);
+    }
+
+    try {
+      if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
+        deleteMissing(conf);
+      } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
+        commitData(conf);
+      }
+      taskAttemptContext.setStatus("Commit Successful");
+    }
+    finally {
+      cleanup(conf);
+    }
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void abortJob(JobContext jobContext,
+                       JobStatus.State state) throws IOException {
+    try {
+      super.abortJob(jobContext, state);
+    } finally {
+      cleanupTempFiles(jobContext);
+      cleanup(jobContext.getConfiguration());
+    }
+  }
+
+  private void cleanupTempFiles(JobContext context) {
+    try {
+      Configuration conf = context.getConfiguration();
+
+      Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+      FileSystem targetFS = targetWorkPath.getFileSystem(conf);
+
+      String jobId = context.getJobID().toString();
+      deleteAttemptTempFiles(targetWorkPath, targetFS, jobId);
+      deleteAttemptTempFiles(targetWorkPath.getParent(), targetFS, jobId);
+    } catch (Throwable t) {
+      LOG.warn("Unable to cleanup temp files", t);
+    }
+  }
+
+  private void deleteAttemptTempFiles(Path targetWorkPath,
+                                      FileSystem targetFS,
+                                      String jobId) throws IOException {
+
+    FileStatus[] tempFiles = targetFS.globStatus(
+        new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*"));
+
+    if (tempFiles != null && tempFiles.length > 0) {
+      for (FileStatus file : tempFiles) {
+        LOG.info("Cleaning up " + file.getPath());
+        targetFS.delete(file.getPath(), false);
+      }
+    }
+  }
+
+  /**
+   * Cleanup meta folder and other temporary files
+   *
+   * @param conf - Job Configuration
+   */
+  private void cleanup(Configuration conf) {
+    Path metaFolder = new Path(conf.get(DistCpConstants.CONF_LABEL_META_FOLDER));
+    try {
+      FileSystem fs = metaFolder.getFileSystem(conf);
+      LOG.info("Cleaning up temporary work folder: " + metaFolder);
+      fs.delete(metaFolder, true);
+    } catch (IOException ignore) {
+      LOG.error("Exception encountered ", ignore);
+    }
+  }
+
+  // This method changes the target-directories' file-attributes (owner,
+  // user/group permissions, etc.) based on the corresponding source directories.
+  private void preserveFileAttributesForDirectories(Configuration conf) throws IOException {
+    String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+    LOG.info("About to preserve attributes: " + attrSymbols);
+
+    EnumSet<FileAttribute> attributes = DistCpUtils.unpackAttributes(attrSymbols);
+
+    Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+    FileSystem clusterFS = sourceListing.getFileSystem(conf);
+    SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
+                                      SequenceFile.Reader.file(sourceListing));
+    long totalLen = clusterFS.getFileStatus(sourceListing).getLen();
+
+    Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+    long preservedEntries = 0;
+    try {
+      FileStatus srcFileStatus = new FileStatus();
+      Text srcRelPath = new Text();
+
+      // Iterate over every source path that was copied.
+      while (sourceReader.next(srcRelPath, srcFileStatus)) {
+        // File-attributes for files are set at the time of copy,
+        // in the map-task.
+        if (! srcFileStatus.isDirectory()) continue;
+
+        Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
+
+        // Skip the root folder.
+        // Status can't be preserved on root-folder. (E.g. multiple paths may
+        // be copied to a single target folder. Which source-attributes to use
+        // on the target is undefined.)
+        if (targetRoot.equals(targetFile)) continue;
+
+        FileSystem targetFS = targetFile.getFileSystem(conf);
+        DistCpUtils.preserve(targetFS, targetFile, srcFileStatus,  attributes);
+
+        taskAttemptContext.progress();
+        taskAttemptContext.setStatus("Preserving status on directory entries. [" +
+            sourceReader.getPosition() * 100 / totalLen + "%]");
+      }
+    } finally {
+      IOUtils.closeStream(sourceReader);
+    }
+    LOG.info("Preserved status on " + preservedEntries + " dir entries on target");
+  }
+
+  // This method deletes "extra" files from the target, if they're not
+  // available at the source.
+  private void deleteMissing(Configuration conf) throws IOException {
+    LOG.info("-delete option is enabled. About to remove entries from " +
+        "target that are missing in source");
+
+    // Sort the source-file listing alphabetically.
+    Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+    FileSystem clusterFS = sourceListing.getFileSystem(conf);
+    Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing);
+
+    // Similarly, create the listing of target-files. Sort alphabetically.
+    Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
+    CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
+
+    List<Path> targets = new ArrayList<Path>(1);
+    Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    targets.add(targetFinalPath);
+    DistCpOptions options = new DistCpOptions(targets, new Path("/NONE"));
+
+    target.buildListing(targetListing, options);
+    Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
+    long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();
+
+    SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
+                                 SequenceFile.Reader.file(sortedSourceListing));
+    SequenceFile.Reader targetReader = new SequenceFile.Reader(conf,
+                                 SequenceFile.Reader.file(sortedTargetListing));
+
+    // Walk both source and target file listings.
+    // Delete all from target that doesn't also exist on source.
+    long deletedEntries = 0;
+    try {
+      FileStatus srcFileStatus = new FileStatus();
+      Text srcRelPath = new Text();
+      FileStatus trgtFileStatus = new FileStatus();
+      Text trgtRelPath = new Text();
+
+      FileSystem targetFS = targetFinalPath.getFileSystem(conf);
+      boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+      while (targetReader.next(trgtRelPath, trgtFileStatus)) {
+        // Skip sources that don't exist on target.
+        while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) {
+          srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+        }
+
+        if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue;
+
+        // Target doesn't exist at source. Delete.
+        boolean result = (!targetFS.exists(trgtFileStatus.getPath()) ||
+            targetFS.delete(trgtFileStatus.getPath(), true));
+        if (result) {
+          LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source");
+          deletedEntries++;
+        } else {
+          throw new IOException("Unable to delete " + trgtFileStatus.getPath());
+        }
+        taskAttemptContext.progress();
+        taskAttemptContext.setStatus("Deleting missing files from target. [" +
+            targetReader.getPosition() * 100 / totalLen + "%]");
+      }
+    } finally {
+      IOUtils.closeStream(sourceReader);
+      IOUtils.closeStream(targetReader);
+    }
+    LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0));
+  }
+
+  private void commitData(Configuration conf) throws IOException {
+
+    Path workDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+    Path finalDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    FileSystem targetFS = workDir.getFileSystem(conf);
+
+    LOG.info("Atomic commit enabled. Moving " + workDir + " to " + finalDir);
+    if (targetFS.exists(finalDir) && targetFS.exists(workDir)) {
+      LOG.error("Pre-existing final-path found at: " + finalDir);
+      throw new IOException("Target-path can't be committed to because it " +
+          "exists at " + finalDir + ". Copied data is in temp-dir: " + workDir + ". ");
+    }
+
+    boolean result = targetFS.rename(workDir, finalDir);
+    if (!result) {
+      LOG.warn("Rename failed. Perhaps data already moved. Verifying...");
+      result = targetFS.exists(finalDir) && !targetFS.exists(workDir);
+    }
+    if (result) {
+      LOG.info("Data committed successfully to " + finalDir);
+      taskAttemptContext.setStatus("Data committed successfully to " + finalDir);
+    } else {
+      LOG.error("Unable to commit data to " + finalDir);
+      throw new IOException("Atomic commit failed. Temporary data in " + workDir +
+        ", Unable to move to " + finalDir);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptionSwitch;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.*;
+import java.util.EnumSet;
+import java.util.Arrays;
+
+/**
+ * Mapper class that executes the DistCp copy operation.
+ * Implements the o.a.h.mapreduce.Mapper<> interface.
+ */
+public class CopyMapper extends Mapper<Text, FileStatus, Text, Text> {
+
+  /**
+   * Hadoop counters for the DistCp CopyMapper.
+   * (These have been kept identical to the old DistCp,
+   * for backward compatibility.)
+   */
+  public static enum Counter {
+    COPY,         // Number of files received by the mapper for copy.
+    SKIP,         // Number of files skipped.
+    FAIL,         // Number of files that failed to be copied.
+    BYTESCOPIED,  // Number of bytes actually copied by the copy-mapper, total.
+    BYTESEXPECTED,// Number of bytes expected to be copied.
+    BYTESFAILED,  // Number of bytes that failed to be copied.
+    BYTESSKIPPED, // Number of bytes that were skipped from copy.
+  }
+
+  private static Log LOG = LogFactory.getLog(CopyMapper.class);
+
+  private Configuration conf;
+
+  private boolean syncFolders = false;
+  private boolean ignoreFailures = false;
+  private boolean skipCrc = false;
+  private boolean overWrite = false;
+  private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
+
+  private FileSystem targetFS = null;
+  private Path    targetWorkPath = null;
+
+  /**
+   * Implementation of the Mapper::setup() method. This extracts the DistCp-
+   * options specified in the Job's configuration, to set up the Job.
+   * @param context Mapper's context.
+   * @throws IOException On IO failure.
+   * @throws InterruptedException If the job is interrupted.
+   */
+  @Override
+  public void setup(Context context) throws IOException, InterruptedException {
+    conf = context.getConfiguration();
+
+    syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
+    ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
+    skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
+    overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
+    preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
+        PRESERVE_STATUS.getConfigLabel()));
+
+    targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+    Path targetFinalPath = new Path(conf.get(
+            DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    targetFS = targetFinalPath.getFileSystem(conf);
+
+    if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
+      overWrite = true; // When target is an existing file, overwrite it.
+    }
+
+    if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
+      initializeSSLConf(context);
+    }
+  }
+
+  /**
+   * Initialize SSL Config if same is set in conf
+   *
+   * @throws IOException - If any
+   */
+  private void initializeSSLConf(Context context) throws IOException {
+    LOG.info("Initializing SSL configuration");
+    
+    String workDir = conf.get(JobContext.JOB_LOCAL_DIR) + "/work";
+    Path[] cacheFiles = context.getLocalCacheFiles();
+
+    Configuration sslConfig = new Configuration(false);
+    String sslConfFileName = conf.get(DistCpConstants.CONF_LABEL_SSL_CONF);
+    Path sslClient = findCacheFile(cacheFiles, sslConfFileName);
+    if (sslClient == null) {
+      LOG.warn("SSL Client config file not found. Was looking for " + sslConfFileName +
+          " in " + Arrays.toString(cacheFiles));
+      return;
+    }
+    sslConfig.addResource(sslClient);
+
+    String trustStoreFile = conf.get("ssl.client.truststore.location");
+    Path trustStorePath = findCacheFile(cacheFiles, trustStoreFile);
+    sslConfig.set("ssl.client.truststore.location", trustStorePath.toString());
+
+    String keyStoreFile = conf.get("ssl.client.keystore.location");
+    Path keyStorePath = findCacheFile(cacheFiles, keyStoreFile);
+    sslConfig.set("ssl.client.keystore.location", keyStorePath.toString());
+
+    try {
+      OutputStream out = new FileOutputStream(workDir + "/" + sslConfFileName);
+      try {
+        sslConfig.writeXml(out);
+      } finally {
+        out.close();
+      }
+      conf.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfFileName);
+    } catch (IOException e) {
+      LOG.warn("Unable to write out the ssl configuration. " +
+          "Will fall back to default ssl-client.xml in class path, if there is one", e);
+    }
+  }
+
+  /**
+   * Find entry from distributed cache
+   *
+   * @param cacheFiles - All localized cache files
+   * @param fileName - fileName to search
+   * @return Path of the filename if found, else null
+   */
+  private Path findCacheFile(Path[] cacheFiles, String fileName) {
+    if (cacheFiles != null && cacheFiles.length > 0) {
+      for (Path file : cacheFiles) {
+        if (file.getName().equals(fileName)) {
+          return file;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Implementation of the Mapper<>::map(). Does the copy.
+   * @param relPath The target path.
+   * @param sourceFileStatus The source path.
+   * @throws IOException
+   */
+  @Override
+  public void map(Text relPath, FileStatus sourceFileStatus, Context context)
+          throws IOException, InterruptedException {
+    Path sourcePath = sourceFileStatus.getPath();
+
+    if (LOG.isDebugEnabled())
+      LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
+
+    Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
+                          targetFS.getWorkingDirectory()) + relPath.toString());
+
+    EnumSet<DistCpOptions.FileAttribute> fileAttributes
+            = getFileAttributeSettings(context);
+
+    final String description = "Copying " + sourcePath + " to " + target;
+    context.setStatus(description);
+
+    LOG.info(description);
+
+    try {
+      FileStatus sourceCurrStatus;
+      FileSystem sourceFS;
+      try {
+        sourceFS = sourcePath.getFileSystem(conf);
+        sourceCurrStatus = sourceFS.getFileStatus(sourcePath);
+      } catch (FileNotFoundException e) {
+        throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
+      }
+
+      FileStatus targetStatus = null;
+
+      try {
+        targetStatus = targetFS.getFileStatus(target);
+      } catch (FileNotFoundException ignore) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Path could not be found: " + target, ignore);
+      }
+
+      if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
+        throw new IOException("Can't replace " + target + ". Target is " +
+            getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
+      }
+
+      if (sourceCurrStatus.isDirectory()) {
+        createTargetDirsWithRetry(description, target, context);
+        return;
+      }
+
+      if (skipFile(sourceFS, sourceCurrStatus, target)) {
+        LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
+                 + " to " + target);
+        updateSkipCounters(context, sourceCurrStatus);
+        context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
+      }
+      else {
+        copyFileWithRetry(description, sourceCurrStatus, target, context,
+                          fileAttributes);
+      }
+
+      DistCpUtils.preserve(target.getFileSystem(conf), target,
+                           sourceCurrStatus, fileAttributes);
+
+    } catch (IOException exception) {
+      handleFailures(exception, sourceFileStatus, target, context);
+    }
+  }
+
+  private String getFileType(FileStatus fileStatus) {
+    return fileStatus == null ? "N/A" : (fileStatus.isDirectory() ? "dir" : "file");
+  }
+
+  private static EnumSet<DistCpOptions.FileAttribute>
+          getFileAttributeSettings(Mapper.Context context) {
+    String attributeString = context.getConfiguration().get(
+            DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel());
+    return DistCpUtils.unpackAttributes(attributeString);
+  }
+
+  private void copyFileWithRetry(String description, FileStatus sourceFileStatus,
+               Path target, Context context,
+               EnumSet<DistCpOptions.FileAttribute> fileAttributes) throws IOException {
+
+    long bytesCopied;
+    try {
+      bytesCopied = (Long)new RetriableFileCopyCommand(description)
+                       .execute(sourceFileStatus, target, context, fileAttributes);
+    } catch (Exception e) {
+      context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
+      throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
+          " --> " + target, e);
+    }
+    incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
+    incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
+    incrementCounter(context, Counter.COPY, 1);
+  }
+
+  private void createTargetDirsWithRetry(String description,
+                   Path target, Context context) throws IOException {
+    try {
+      new RetriableDirectoryCreateCommand(description).execute(target, context);
+    } catch (Exception e) {
+      throw new IOException("mkdir failed for " + target, e);
+    }
+    incrementCounter(context, Counter.COPY, 1);
+  }
+
+  private static void updateSkipCounters(Context context,
+                                         FileStatus sourceFile) {
+    incrementCounter(context, Counter.SKIP, 1);
+    incrementCounter(context, Counter.BYTESSKIPPED, sourceFile.getLen());
+
+  }
+
+  private void handleFailures(IOException exception,
+                                     FileStatus sourceFileStatus, Path target,
+                                     Context context) throws IOException, InterruptedException {
+    LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " +
+                target, exception);
+
+    if (ignoreFailures && exception.getCause() instanceof
+            RetriableFileCopyCommand.CopyReadException) {
+      incrementCounter(context, Counter.FAIL, 1);
+      incrementCounter(context, Counter.BYTESFAILED, sourceFileStatus.getLen());
+      context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " - " + 
+          StringUtils.stringifyException(exception)));
+    }
+    else
+      throw exception;
+  }
+
+  private static void incrementCounter(Context context, Counter counter,
+                                       long value) {
+    context.getCounter(counter).increment(value);
+  }
+
+  private boolean skipFile(FileSystem sourceFS, FileStatus source, Path target)
+                                          throws IOException {
+    return     targetFS.exists(target)
+            && !overWrite
+            && !mustUpdate(sourceFS, source, target);
+  }
+
+  private boolean mustUpdate(FileSystem sourceFS, FileStatus source, Path target)
+                                    throws IOException {
+    final FileStatus targetFileStatus = targetFS.getFileStatus(target);
+
+    return     syncFolders
+            && (
+                   targetFileStatus.getLen() != source.getLen()
+                || (!skipCrc &&
+                       !DistCpUtils.checksumsAreEqual(sourceFS,
+                                          source.getPath(), targetFS, target))
+                || (source.getBlockSize() != targetFileStatus.getBlockSize() &&
+                      preserve.contains(FileAttribute.BLOCKSIZE))
+               );
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.tools.DistCpConstants;
+
+import java.io.IOException;
+
+/**
+ * The CopyOutputFormat is the Hadoop OutputFormat used in DistCp.
+ * It sets up the Job's Configuration (in the Job-Context) with the settings
+ * for the work-directory, final commit-directory, etc. It also sets the right
+ * output-committer.
+ * @param <K>
+ * @param <V>
+ */
+public class CopyOutputFormat<K, V> extends TextOutputFormat<K, V> {
+
+  /**
+   * Setter for the working directory for DistCp (where files will be copied
+   * before they are moved to the final commit-directory.)
+   * @param job The Job on whose configuration the working-directory is to be set.
+   * @param workingDirectory The path to use as the working directory.
+   */
+  public static void setWorkingDirectory(Job job, Path workingDirectory) {
+    job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+        workingDirectory.toString());
+  }
+
+  /**
+   * Setter for the final directory for DistCp (where files copied will be
+   * moved, atomically.)
+   * @param job The Job on whose configuration the working-directory is to be set.
+   * @param commitDirectory The path to use for final commit.
+   */
+  public static void setCommitDirectory(Job job, Path commitDirectory) {
+    job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+        commitDirectory.toString());
+  }
+
+  /**
+   * Getter for the working directory.
+   * @param job The Job from whose configuration the working-directory is to
+   * be retrieved.
+   * @return The working-directory Path.
+   */
+  public static Path getWorkingDirectory(Job job) {
+    return getWorkingDirectory(job.getConfiguration());
+  }
+
+  private static Path getWorkingDirectory(Configuration conf) {
+    String workingDirectory = conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH);
+    if (workingDirectory == null || workingDirectory.isEmpty()) {
+      return null;
+    } else {
+      return new Path(workingDirectory);
+    }
+  }
+
+  /**
+   * Getter for the final commit-directory.
+   * @param job The Job from whose configuration the commit-directory is to be
+   * retrieved.
+   * @return The commit-directory Path.
+   */
+  public static Path getCommitDirectory(Job job) {
+    return getCommitDirectory(job.getConfiguration());
+  }
+
+  private static Path getCommitDirectory(Configuration conf) {
+    String commitDirectory = conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH);
+    if (commitDirectory == null || commitDirectory.isEmpty()) {
+      return null;
+    } else {
+      return new Path(commitDirectory);
+    }
+  }
+
+  /** @inheritDoc */
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
+    return new CopyCommitter(getOutputPath(context), context);
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void checkOutputSpecs(JobContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+
+    if (getCommitDirectory(conf) == null) {
+      throw new IllegalStateException("Commit directory not configured");
+    }
+
+    Path workingPath = getWorkingDirectory(conf);
+    if (workingPath == null) {
+      throw new IllegalStateException("Working directory not configured");
+    }
+
+    // get delegation token for outDir's file system
+    TokenCache.obtainTokensForNamenodes(context.getCredentials(),
+                                        new Path[] {workingPath}, conf);
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.tools.util.RetriableCommand;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * This class extends Retriable command to implement the creation of directories
+ * with retries on failure.
+ */
+public class RetriableDirectoryCreateCommand extends RetriableCommand {
+
+  /**
+   * Constructor, taking a description of the action.
+   * @param description Verbose description of the copy operation.
+   */
+  public RetriableDirectoryCreateCommand(String description) {
+    super(description);
+  }
+
+  /**
+   * Implementation of RetriableCommand::doExecute().
+   * This implements the actual mkdirs() functionality.
+   * @param arguments Argument-list to the command.
+   * @return Boolean. True, if the directory could be created successfully.
+   * @throws Exception IOException, on failure to create the directory.
+   */
+  @Override
+  protected Object doExecute(Object... arguments) throws Exception {
+    assert arguments.length == 2 : "Unexpected argument list.";
+    Path target = (Path)arguments[0];
+    Mapper.Context context = (Mapper.Context)arguments[1];
+
+    FileSystem targetFS = target.getFileSystem(context.getConfiguration());
+    return targetFS.mkdirs(target);
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.tools.util.RetriableCommand;
+import org.apache.hadoop.tools.util.ThrottledInputStream;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.tools.DistCpOptions.*;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.util.EnumSet;
+
+/**
+ * This class extends RetriableCommand to implement the copy of files,
+ * with retries on failure.
+ */
+public class RetriableFileCopyCommand extends RetriableCommand {
+
+  private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
+  private static int BUFFER_SIZE = 8 * 1024;
+
+  /**
+   * Constructor, taking a description of the action.
+   * @param description Verbose description of the copy operation.
+   */
+  public RetriableFileCopyCommand(String description) {
+    super(description);
+  }
+
+  /**
+   * Implementation of RetriableCommand::doExecute().
+   * This is the actual copy-implementation.
+   * @param arguments Argument-list to the command.
+   * @return Number of bytes copied.
+   * @throws Exception: CopyReadException, if there are read-failures. All other
+   *         failures are IOExceptions.
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  protected Object doExecute(Object... arguments) throws Exception {
+    assert arguments.length == 4 : "Unexpected argument list.";
+    FileStatus source = (FileStatus)arguments[0];
+    assert !source.isDirectory() : "Unexpected file-status. Expected file.";
+    Path target = (Path)arguments[1];
+    Mapper.Context context = (Mapper.Context)arguments[2];
+    EnumSet<FileAttribute> fileAttributes
+            = (EnumSet<FileAttribute>)arguments[3];
+    return doCopy(source, target, context, fileAttributes);
+  }
+
+  private long doCopy(FileStatus sourceFileStatus, Path target,
+                      Mapper.Context context,
+                      EnumSet<FileAttribute> fileAttributes)
+          throws IOException {
+
+    Path tmpTargetPath = getTmpFile(target, context);
+    final Configuration configuration = context.getConfiguration();
+    FileSystem targetFS = target.getFileSystem(configuration);
+
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
+        LOG.debug("Tmp-file path: " + tmpTargetPath);
+      }
+      FileSystem sourceFS = sourceFileStatus.getPath().getFileSystem(
+              configuration);
+      long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
+                                     context, fileAttributes);
+
+      compareFileLengths(sourceFileStatus, tmpTargetPath, configuration, bytesRead);
+      compareCheckSums(sourceFS, sourceFileStatus.getPath(), targetFS, tmpTargetPath);
+      promoteTmpToTarget(tmpTargetPath, target, targetFS);
+      return bytesRead;
+
+    } finally {
+      if (targetFS.exists(tmpTargetPath))
+        targetFS.delete(tmpTargetPath, false);
+    }
+  }
+
+  private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
+                             FileStatus sourceFileStatus, Mapper.Context context,
+                             EnumSet<FileAttribute> fileAttributes)
+                             throws IOException {
+    OutputStream outStream = new BufferedOutputStream(targetFS.create(
+            tmpTargetPath, true, BUFFER_SIZE,
+            getReplicationFactor(fileAttributes, sourceFileStatus, targetFS),
+            getBlockSize(fileAttributes, sourceFileStatus, targetFS), context));
+    return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, true, context);
+  }
+
+  private void compareFileLengths(FileStatus sourceFileStatus, Path target,
+                                  Configuration configuration, long bytesRead)
+                                  throws IOException {
+    final Path sourcePath = sourceFileStatus.getPath();
+    FileSystem fs = sourcePath.getFileSystem(configuration);
+    if (fs.getFileStatus(sourcePath).getLen() != bytesRead)
+      throw new IOException("Mismatch in length of source:" + sourcePath
+                + " and target:" + target);
+  }
+
+  private void compareCheckSums(FileSystem sourceFS, Path source,
+                                FileSystem targetFS, Path target)
+                                throws IOException {
+    if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target))
+      throw new IOException("Check-sum mismatch between "
+                              + source + " and " + target);
+
+  }
+
+  //If target file exists and unable to delete target - fail
+  //If target doesn't exist and unable to create parent folder - fail
+  //If target is successfully deleted and parent exists, if rename fails - fail
+  private void promoteTmpToTarget(Path tmpTarget, Path target, FileSystem fs)
+                                  throws IOException {
+    if ((fs.exists(target) && !fs.delete(target, false))
+        || (!fs.exists(target.getParent()) && !fs.mkdirs(target.getParent()))
+        || !fs.rename(tmpTarget, target)) {
+      throw new IOException("Failed to promote tmp-file:" + tmpTarget
+                              + " to: " + target);
+    }
+  }
+
+  private Path getTmpFile(Path target, Mapper.Context context) {
+    Path targetWorkPath = new Path(context.getConfiguration().
+        get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+    Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
+    LOG.info("Creating temp file: " +
+        new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
+    return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
+  }
+
+  private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
+                         int bufferSize, boolean mustCloseStream,
+                         Mapper.Context context) throws IOException {
+    Path source = sourceFileStatus.getPath();
+    byte buf[] = new byte[bufferSize];
+    ThrottledInputStream inStream = null;
+    long totalBytesRead = 0;
+
+    try {
+      inStream = getInputStream(source, context.getConfiguration());
+      int bytesRead = readBytes(inStream, buf);
+      while (bytesRead >= 0) {
+        totalBytesRead += bytesRead;
+        outStream.write(buf, 0, bytesRead);
+        updateContextStatus(totalBytesRead, context, sourceFileStatus);
+        bytesRead = inStream.read(buf);
+      }
+    } finally {
+      if (mustCloseStream)
+        IOUtils.cleanup(LOG, outStream, inStream);
+    }
+
+    return totalBytesRead;
+  }
+
+  private void updateContextStatus(long totalBytesRead, Mapper.Context context,
+                                   FileStatus sourceFileStatus) {
+    StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
+                .format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
+    message.append("% ")
+            .append(description).append(" [")
+            .append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
+            .append('/')
+        .append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
+            .append(']');
+    context.setStatus(message.toString());
+  }
+
+  private static int readBytes(InputStream inStream, byte buf[])
+          throws IOException {
+    try {
+      return inStream.read(buf);
+    }
+    catch (IOException e) {
+      throw new CopyReadException(e);
+    }
+  }
+
+  private static ThrottledInputStream getInputStream(Path path, Configuration conf)
+          throws IOException {
+    try {
+      FileSystem fs = path.getFileSystem(conf);
+      long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+              DistCpConstants.DEFAULT_BANDWIDTH_MB);
+      return new ThrottledInputStream(new BufferedInputStream(fs.open(path)),
+              bandwidthMB * 1024 * 1024);
+    }
+    catch (IOException e) {
+      throw new CopyReadException(e);
+    }
+  }
+
+  private static short getReplicationFactor(
+          EnumSet<FileAttribute> fileAttributes,
+          FileStatus sourceFile, FileSystem targetFS) {
+    return fileAttributes.contains(FileAttribute.REPLICATION)?
+            sourceFile.getReplication() : targetFS.getDefaultReplication();
+  }
+
+  private static long getBlockSize(
+          EnumSet<FileAttribute> fileAttributes,
+          FileStatus sourceFile, FileSystem targetFS) {
+    return fileAttributes.contains(FileAttribute.BLOCKSIZE)?
+            sourceFile.getBlockSize() : targetFS.getDefaultBlockSize();
+  }
+
+  /**
+   * Special subclass of IOException. This is used to distinguish read-operation
+   * failures from other kinds of IOExceptions.
+   * The failure to read from source is dealt with specially, in the CopyMapper.
+   * Such failures may be skipped if the DistCpOptions indicate so.
+   * Write failures are intolerable, and amount to CopyMapper failure.  
+   */
+  public static class CopyReadException extends IOException {
+    public CopyReadException(Throwable rootCause) {
+      super(rootCause);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * UniformSizeInputFormat extends the InputFormat<> class, to produce
+ * input-splits for DistCp.
+ * It looks at the copy-listing and groups the contents into input-splits such
+ * that the total-number of bytes to be copied for each input split is
+ * uniform.
+ */
+public class UniformSizeInputFormat extends InputFormat<Text, FileStatus> {
+  private static final Log LOG
+                = LogFactory.getLog(UniformSizeInputFormat.class);
+
+  /**
+   * Implementation of InputFormat::getSplits(). Returns a list of InputSplits,
+   * such that the number of bytes to be copied for all the splits are
+   * approximately equal.
+   * @param context JobContext for the job.
+   * @return The list of uniformly-distributed input-splits.
+   * @throws IOException: On failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context)
+                      throws IOException, InterruptedException {
+    Configuration configuration = context.getConfiguration();
+    int numSplits = DistCpUtils.getInt(configuration,
+                                       JobContext.NUM_MAPS);
+
+    if (numSplits == 0) return new ArrayList<InputSplit>();
+
+    return getSplits(configuration, numSplits,
+                     DistCpUtils.getLong(configuration,
+                          DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
+  }
+
+  private List<InputSplit> getSplits(Configuration configuration, int numSplits,
+                                     long totalSizeBytes) throws IOException {
+    List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+    long nBytesPerSplit = (long) Math.ceil(totalSizeBytes * 1.0 / numSplits);
+
+    FileStatus srcFileStatus = new FileStatus();
+    Text srcRelPath = new Text();
+    long currentSplitSize = 0;
+    long lastSplitStart = 0;
+    long lastPosition = 0;
+
+    final Path listingFilePath = getListingFilePath(configuration);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Average bytes per map: " + nBytesPerSplit +
+          ", Number of maps: " + numSplits + ", total size: " + totalSizeBytes);
+    }
+    SequenceFile.Reader reader=null;
+    try {
+      reader = getListingFileReader(configuration);
+      while (reader.next(srcRelPath, srcFileStatus)) {
+        // If adding the current file would cause the bytes per map to exceed
+        // limit. Add the current file to new split
+        if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) {
+          FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
+              lastPosition - lastSplitStart, null);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug ("Creating split : " + split + ", bytes in split: " + currentSplitSize);
+          }
+          splits.add(split);
+          lastSplitStart = lastPosition;
+          currentSplitSize = 0;
+        }
+        currentSplitSize += srcFileStatus.getLen();
+        lastPosition = reader.getPosition();
+      }
+      if (lastPosition > lastSplitStart) {
+        FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
+            lastPosition - lastSplitStart, null);
+        if (LOG.isDebugEnabled()) {
+          LOG.info ("Creating split : " + split + ", bytes in split: " + currentSplitSize);
+        }
+        splits.add(split);
+      }
+
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+
+    return splits;
+  }
+
+  private static Path getListingFilePath(Configuration configuration) {
+    final String listingFilePathString =
+            configuration.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+
+    assert !listingFilePathString.equals("")
+              : "Couldn't find listing file. Invalid input.";
+    return new Path(listingFilePathString);
+  }
+
+  private SequenceFile.Reader getListingFileReader(Configuration configuration) {
+
+    final Path listingFilePath = getListingFilePath(configuration);
+    try {
+      final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
+      if (!fileSystem.exists(listingFilePath))
+        throw new IllegalArgumentException("Listing file doesn't exist at: "
+                                           + listingFilePath);
+
+      return new SequenceFile.Reader(configuration,
+                                     SequenceFile.Reader.file(listingFilePath));
+    }
+    catch (IOException exception) {
+      LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
+      throw new IllegalArgumentException("Couldn't find listing-file at: "
+                                         + listingFilePath, exception);
+    }
+  }
+
+  /**
+   * Implementation of InputFormat::createRecordReader().
+   * @param split The split for which the RecordReader is sought.
+   * @param context The context of the current task-attempt.
+   * @return A SequenceFileRecordReader instance, (since the copy-listing is a
+   * simple sequence-file.)
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public RecordReader<Text, FileStatus> createRecordReader(InputSplit split,
+                                                     TaskAttemptContext context)
+                                      throws IOException, InterruptedException {
+    return new SequenceFileRecordReader<Text, FileStatus>();
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+
+import java.io.IOException;
+
+/**
+ * The DynamicInputChunk represents a single chunk of work, when used in
+ * conjunction with the DynamicInputFormat and the DynamicRecordReader.
+ * The records in the DynamicInputFormat's input-file are split across various
+ * DynamicInputChunks. Each one is claimed and processed in an iteration of
+ * a dynamic-mapper. When a DynamicInputChunk has been exhausted, the faster
+ * mapper may claim another and process it, until there are no more to be
+ * consumed.
+ */
+class DynamicInputChunk<K, V> {
+  private static Log LOG = LogFactory.getLog(DynamicInputChunk.class);
+
+  private static Configuration configuration;
+  private static Path chunkRootPath;
+  private static String chunkFilePrefix;
+  private static int numChunksLeft = -1; // Un-initialized before 1st dir-scan.
+  private static FileSystem fs;
+
+  private Path chunkFilePath;
+  private SequenceFileRecordReader<K, V> reader;
+  private SequenceFile.Writer writer;
+
+  private static void initializeChunkInvariants(Configuration config)
+                                                  throws IOException {
+    configuration = config;
+    Path listingFilePath = new Path(getListingFilePath(configuration));
+    chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir");
+    fs = chunkRootPath.getFileSystem(configuration);
+    chunkFilePrefix = listingFilePath.getName() + ".chunk.";
+  }
+
+  private static String getListingFilePath(Configuration configuration) {
+    final String listingFileString = configuration.get(
+            DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+    assert !listingFileString.equals("") : "Listing file not found.";
+    return listingFileString;
+  }
+
+  private static boolean areInvariantsInitialized() {
+    return chunkRootPath != null;
+  }
+
+  private DynamicInputChunk(String chunkId, Configuration configuration)
+                                                      throws IOException {
+    if (!areInvariantsInitialized())
+      initializeChunkInvariants(configuration);
+
+    chunkFilePath = new Path(chunkRootPath, chunkFilePrefix + chunkId);
+    openForWrite();
+  }
+
+
+  private void openForWrite() throws IOException {
+    writer = SequenceFile.createWriter(
+            chunkFilePath.getFileSystem(configuration), configuration,
+            chunkFilePath, Text.class, FileStatus.class,
+            SequenceFile.CompressionType.NONE);
+
+  }
+
+  /**
+   * Factory method to create chunk-files for writing to.
+   * (For instance, when the DynamicInputFormat splits the input-file into
+   * chunks.)
+   * @param chunkId String to identify the chunk.
+   * @param configuration Configuration, describing the location of the listing-
+   * file, file-system for the map-job, etc.
+   * @return A DynamicInputChunk, corresponding to a chunk-file, with the name
+   * incorporating the chunk-id.
+   * @throws IOException Exception on failure to create the chunk.
+   */
+  public static DynamicInputChunk createChunkForWrite(String chunkId,
+                          Configuration configuration) throws IOException {
+    return new DynamicInputChunk(chunkId, configuration);
+  }
+
+  /**
+   * Method to write records into a chunk.
+   * @param key Key from the listing file.
+   * @param value Corresponding value from the listing file.
+   * @throws IOException Exception onf failure to write to the file.
+   */
+  public void write(Text key, FileStatus value) throws IOException {
+    writer.append(key, value);
+  }
+
+  /**
+   * Closes streams opened to the chunk-file.
+   */
+  public void close() {
+    IOUtils.cleanup(LOG, reader, writer);
+  }
+
+  /**
+   * Reassigns the chunk to a specified Map-Task, for consumption.
+   * @param taskId The Map-Task to which a the chunk is to be reassigned.
+   * @throws IOException Exception on failure to reassign.
+   */
+  public void assignTo(TaskID taskId) throws IOException {
+    Path newPath = new Path(chunkRootPath, taskId.toString());
+    if (!fs.rename(chunkFilePath, newPath)) {
+      LOG.warn(chunkFilePath + " could not be assigned to " + taskId);
+    }
+  }
+
+  private DynamicInputChunk(Path chunkFilePath,
+                            TaskAttemptContext taskAttemptContext)
+                                   throws IOException, InterruptedException {
+    if (!areInvariantsInitialized())
+      initializeChunkInvariants(taskAttemptContext.getConfiguration());
+
+    this.chunkFilePath = chunkFilePath;
+    openForRead(taskAttemptContext);
+  }
+
+  private void openForRead(TaskAttemptContext taskAttemptContext)
+          throws IOException, InterruptedException {
+    reader = new SequenceFileRecordReader<K, V>();
+    reader.initialize(new FileSplit(chunkFilePath, 0,
+            DistCpUtils.getFileSize(chunkFilePath, configuration), null),
+            taskAttemptContext);
+  }
+
+  /**
+   * Factory method that
+   * 1. acquires a chunk for the specified map-task attempt
+   * 2. returns a DynamicInputChunk associated with the acquired chunk-file.
+   * @param taskAttemptContext The attempt-context for the map task that's
+   * trying to acquire a chunk.
+   * @return The acquired dynamic-chunk. The chunk-file is renamed to the
+   * attempt-id (from the attempt-context.)
+   * @throws IOException Exception on failure.
+   * @throws InterruptedException Exception on failure.
+   */
+  public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext)
+                                      throws IOException, InterruptedException {
+    if (!areInvariantsInitialized())
+        initializeChunkInvariants(taskAttemptContext.getConfiguration());
+
+    String taskId
+            = taskAttemptContext.getTaskAttemptID().getTaskID().toString();
+    Path acquiredFilePath = new Path(chunkRootPath, taskId);
+
+    if (fs.exists(acquiredFilePath)) {
+      LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath);
+      return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
+    }
+
+    for (FileStatus chunkFile : getListOfChunkFiles()) {
+      if (fs.rename(chunkFile.getPath(), acquiredFilePath)) {
+        LOG.info(taskId + " acquired " + chunkFile.getPath());
+        return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
+      }
+      else
+        LOG.warn(taskId + " could not acquire " + chunkFile.getPath());
+    }
+
+    return null;
+  }
+
+  /**
+   * Method to be called to relinquish an acquired chunk. All streams open to
+   * the chunk are closed, and the chunk-file is deleted.
+   * @throws IOException Exception thrown on failure to release (i.e. delete)
+   * the chunk file.
+   */
+  public void release() throws IOException {
+    close();
+    if (!fs.delete(chunkFilePath, false)) {
+      LOG.error("Unable to release chunk at path: " + chunkFilePath);
+      throw new IOException("Unable to release chunk at path: " + chunkFilePath);
+    }
+  }
+
+  static FileStatus [] getListOfChunkFiles() throws IOException {
+    Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*");
+    FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern);
+    numChunksLeft = chunkFiles.length;
+    return chunkFiles;
+  }
+
+  /**
+   * Getter for the chunk-file's path, on HDFS.
+   * @return The qualified path to the chunk-file.
+   */
+  public Path getPath() {
+    return chunkFilePath;
+  }
+
+  /**
+   * Getter for the record-reader, opened to the chunk-file.
+   * @return Opened Sequence-file reader.
+   */
+  public SequenceFileRecordReader<K,V> getReader() {
+    assert reader != null : "Reader un-initialized!";
+    return reader;
+  }
+
+  /**
+   * Getter for the number of chunk-files left in the chunk-file directory.
+   * Useful to determine how many chunks (and hence, records) are left to be
+   * processed.
+   * @return Before the first scan of the directory, the number returned is -1.
+   * Otherwise, the number of chunk-files seen from the last scan is returned.
+   */
+  public static int getNumChunksLeft() {
+    return numChunksLeft;
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+
+/**
+ * DynamicInputFormat implements the "Worker pattern" for DistCp.
+ * Rather than to split up the copy-list into a set of static splits,
+ * the DynamicInputFormat does the following:
+ * 1. Splits the copy-list into small chunks on the DFS.
+ * 2. Creates a set of empty "dynamic" splits, that each consume as many chunks
+ *    as it can.
+ * This arrangement ensures that a single slow mapper won't slow down the entire
+ * job (since the slack will be picked up by other mappers, who consume more
+ * chunks.)
+ * By varying the split-ratio, one can vary chunk sizes to achieve different
+ * performance characteristics. 
+ */
+public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
+  private static final Log LOG = LogFactory.getLog(DynamicInputFormat.class);
+
+  private static final String CONF_LABEL_LISTING_SPLIT_RATIO
+          = "mapred.listing.split.ratio";
+  private static final String CONF_LABEL_NUM_SPLITS
+          = "mapred.num.splits";
+  private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
+          = "mapred.num.entries.per.chunk";
+
+  /**
+   * Implementation of InputFormat::getSplits(). This method splits up the
+   * copy-listing file into chunks, and assigns the first batch to different
+   * tasks.
+   * @param jobContext JobContext for the map job.
+   * @return The list of (empty) dynamic input-splits.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext)
+      throws IOException, InterruptedException {
+    LOG.info("DynamicInputFormat: Getting splits for job:"
+             + jobContext.getJobID());
+    return createSplits(jobContext,
+                        splitCopyListingIntoChunksWithShuffle(jobContext));
+  }
+
+  private List<InputSplit> createSplits(JobContext jobContext,
+                                        List<DynamicInputChunk> chunks)
+          throws IOException {
+    int numMaps = getNumMapTasks(jobContext.getConfiguration());
+
+    final int nSplits = Math.min(numMaps, chunks.size());
+    List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
+    
+    for (int i=0; i< nSplits; ++i) {
+      TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
+      chunks.get(i).assignTo(taskId);
+      splits.add(new FileSplit(chunks.get(i).getPath(), 0,
+          // Setting non-zero length for FileSplit size, to avoid a possible
+          // future when 0-sized file-splits are considered "empty" and skipped
+          // over.
+          MIN_RECORDS_PER_CHUNK,
+          null));
+    }
+    DistCpUtils.publish(jobContext.getConfiguration(),
+                        CONF_LABEL_NUM_SPLITS, splits.size());
+    return splits;
+  }
+
+  private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16;
+
+  private List<DynamicInputChunk> splitCopyListingIntoChunksWithShuffle
+                                    (JobContext context) throws IOException {
+
+    final Configuration configuration = context.getConfiguration();
+    int numRecords = getNumberOfRecords(configuration);
+    int numMaps = getNumMapTasks(configuration);
+    // Number of chunks each map will process, on average.
+    int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
+    validateNumChunksUsing(splitRatio, numMaps);
+
+    int numEntriesPerChunk = (int)Math.ceil((float)numRecords
+                                          /(splitRatio * numMaps));
+    DistCpUtils.publish(context.getConfiguration(),
+                        CONF_LABEL_NUM_ENTRIES_PER_CHUNK,
+                        numEntriesPerChunk);
+
+    final int nChunksTotal = (int)Math.ceil((float)numRecords/numEntriesPerChunk);
+    int nChunksOpenAtOnce
+            = Math.min(N_CHUNKS_OPEN_AT_ONCE_DEFAULT, nChunksTotal);
+
+    Path listingPath = getListingFilePath(configuration);
+    SequenceFile.Reader reader
+            = new SequenceFile.Reader(configuration,
+                                      SequenceFile.Reader.file(listingPath));
+
+    List<DynamicInputChunk> openChunks
+                  = new ArrayList<DynamicInputChunk>();
+    
+    List<DynamicInputChunk> chunksFinal = new ArrayList<DynamicInputChunk>();
+
+    FileStatus fileStatus = new FileStatus();
+    Text relPath = new Text();
+    int recordCounter = 0;
+    int chunkCount = 0;
+
+    try {
+
+      while (reader.next(relPath, fileStatus)) {
+        if (recordCounter % (nChunksOpenAtOnce*numEntriesPerChunk) == 0) {
+          // All chunks full. Create new chunk-set.
+          closeAll(openChunks);
+          chunksFinal.addAll(openChunks);
+
+          openChunks = createChunks(
+                  configuration, chunkCount, nChunksTotal, nChunksOpenAtOnce);
+
+          chunkCount += openChunks.size();
+
+          nChunksOpenAtOnce = openChunks.size();
+          recordCounter = 0;
+        }
+
+        // Shuffle into open chunks.
+        openChunks.get(recordCounter%nChunksOpenAtOnce).write(relPath, fileStatus);
+        ++recordCounter;
+      }
+
+    } finally {
+      closeAll(openChunks);
+      chunksFinal.addAll(openChunks);
+      IOUtils.closeStream(reader);
+    }
+
+    LOG.info("Number of dynamic-chunk-files created: " + chunksFinal.size()); 
+    return chunksFinal;
+  }
+
+  private static void validateNumChunksUsing(int splitRatio, int numMaps)
+                                              throws IOException {
+    if (splitRatio * numMaps > MAX_CHUNKS_TOLERABLE)
+      throw new IOException("Too many chunks created with splitRatio:"
+                 + splitRatio + ", numMaps:" + numMaps
+                 + ". Reduce numMaps or decrease split-ratio to proceed.");
+  }
+
+  private static void closeAll(List<DynamicInputChunk> chunks) {
+    for (DynamicInputChunk chunk: chunks)
+      chunk.close();
+  }
+
+  private static List<DynamicInputChunk> createChunks(Configuration config,
+                      int chunkCount, int nChunksTotal, int nChunksOpenAtOnce)
+                                          throws IOException {
+    List<DynamicInputChunk> chunks = new ArrayList<DynamicInputChunk>();
+    int chunkIdUpperBound
+            = Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce);
+
+    // If there will be fewer than nChunksOpenAtOnce chunks left after
+    // the current batch of chunks, fold the remaining chunks into
+    // the current batch.
+    if (nChunksTotal - chunkIdUpperBound < nChunksOpenAtOnce)
+      chunkIdUpperBound = nChunksTotal;
+
+    for (int i=chunkCount; i < chunkIdUpperBound; ++i)
+      chunks.add(createChunk(i, config));
+    return chunks;
+  }
+
+  private static DynamicInputChunk createChunk(int chunkId, Configuration config)
+                                              throws IOException {
+    return DynamicInputChunk.createChunkForWrite(String.format("%05d", chunkId),
+                                              config);
+  }
+
+
+  private static Path getListingFilePath(Configuration configuration) {
+    String listingFilePathString = configuration.get(
+            DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+
+    assert !listingFilePathString.equals("") : "Listing file not found.";
+
+    Path listingFilePath = new Path(listingFilePathString);
+    try {
+      assert listingFilePath.getFileSystem(configuration)
+              .exists(listingFilePath) : "Listing file: " + listingFilePath +
+                                          " not found.";
+    } catch (IOException e) {
+      assert false :   "Listing file: " + listingFilePath
+                    + " couldn't be accessed. " + e.getMessage();
+    }
+    return listingFilePath;
+  }
+
+  private static int getNumberOfRecords(Configuration configuration) {
+    return DistCpUtils.getInt(configuration,
+                              DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS);
+  }
+
+  private static int getNumMapTasks(Configuration configuration) {
+    return DistCpUtils.getInt(configuration,
+                              JobContext.NUM_MAPS);
+  }
+
+  private static int getListingSplitRatio(Configuration configuration,
+                                            int numMaps, int numPaths) {
+    return configuration.getInt(
+            CONF_LABEL_LISTING_SPLIT_RATIO,
+            getSplitRatio(numMaps, numPaths));
+  }
+
+  private static final int MAX_CHUNKS_TOLERABLE = 400;
+  private static final int MAX_CHUNKS_IDEAL     = 100;
+  private static final int MIN_RECORDS_PER_CHUNK = 5;
+  private static final int SPLIT_RATIO_DEFAULT  = 2;
+
+  /**
+   * Package private, for testability.
+   * @param nMaps The number of maps requested for.
+   * @param nRecords The number of records to be copied.
+   * @return The number of splits each map should handle, ideally.
+   */
+  static int getSplitRatio(int nMaps, int nRecords) {
+    if (nMaps == 1) {
+      LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
+      return 1;
+    }
+
+    if (nMaps > MAX_CHUNKS_IDEAL)
+      return SPLIT_RATIO_DEFAULT;
+
+    int nPickups = (int)Math.ceil((float)MAX_CHUNKS_IDEAL/nMaps);
+    int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
+
+    return nRecordsPerChunk < MIN_RECORDS_PER_CHUNK ?
+              SPLIT_RATIO_DEFAULT : nPickups;
+  }
+
+  static int getNumEntriesPerChunk(Configuration configuration) {
+    return DistCpUtils.getInt(configuration,
+                              CONF_LABEL_NUM_ENTRIES_PER_CHUNK);
+  }
+
+
+  /**
+   * Implementation of Inputformat::createRecordReader().
+   * @param inputSplit The split for which the RecordReader is required.
+   * @param taskAttemptContext TaskAttemptContext for the current attempt.
+   * @return DynamicRecordReader instance.
+   * @throws IOException, on failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public RecordReader<K, V> createRecordReader(
+          InputSplit inputSplit,
+          TaskAttemptContext taskAttemptContext)
+          throws IOException, InterruptedException {
+    return new DynamicRecordReader<K, V>();
+  }
+}