You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/01/26 07:36:54 UTC
svn commit: r1236045 [2/5] - in /hadoop/common/trunk: hadoop-project/
hadoop-tools/ hadoop-tools/hadoop-distcp/ hadoop-tools/hadoop-distcp/src/
hadoop-tools/hadoop-distcp/src/main/
hadoop-tools/hadoop-distcp/src/main/java/ hadoop-tools/hadoop-distcp/sr...
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.*;
+import java.util.Stack;
+
+/**
+ * The SimpleCopyListing is responsible for making the exhaustive list of
+ * all files/directories under its specified list of input-paths.
+ * These are written into the specified copy-listing file.
+ * Note: The SimpleCopyListing doesn't handle wild-cards in the input-paths.
+ */
+public class SimpleCopyListing extends CopyListing {
+ private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
+
+ private long totalPaths = 0;
+ private long totalBytesToCopy = 0;
+
+ /**
+ * Protected constructor, to initialize configuration.
+ *
+ * @param configuration The input configuration, with which the source/target FileSystems may be accessed.
+ * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+ * delegation token caching is skipped
+ */
+ protected SimpleCopyListing(Configuration configuration, Credentials credentials) {
+ super(configuration, credentials);
+ }
+
+ @Override
+ protected void validatePaths(DistCpOptions options)
+ throws IOException, InvalidInputException {
+
+ Path targetPath = options.getTargetPath();
+ FileSystem targetFS = targetPath.getFileSystem(getConf());
+ boolean targetIsFile = targetFS.isFile(targetPath);
+
+ //If target is a file, then source has to be single file
+ if (targetIsFile) {
+ if (options.getSourcePaths().size() > 1) {
+ throw new InvalidInputException("Multiple source being copied to a file: " +
+ targetPath);
+ }
+
+ Path srcPath = options.getSourcePaths().get(0);
+ FileSystem sourceFS = srcPath.getFileSystem(getConf());
+ if (!sourceFS.isFile(srcPath)) {
+ throw new InvalidInputException("Cannot copy " + srcPath +
+ ", which is not a file to " + targetPath);
+ }
+ }
+
+ if (options.shouldAtomicCommit() && targetFS.exists(targetPath)) {
+ throw new InvalidInputException("Target path for atomic-commit already exists: " +
+ targetPath + ". Cannot atomic-commit to pre-existing target-path.");
+ }
+
+ for (Path path: options.getSourcePaths()) {
+ FileSystem fs = path.getFileSystem(getConf());
+ if (!fs.exists(path)) {
+ throw new InvalidInputException(path + " doesn't exist");
+ }
+ }
+
+ /* This is requires to allow map tasks to access each of the source
+ clusters. This would retrieve the delegation token for each unique
+ file system and add them to job's private credential store
+ */
+ Credentials credentials = getCredentials();
+ if (credentials != null) {
+ Path[] inputPaths = options.getSourcePaths().toArray(new Path[1]);
+ TokenCache.obtainTokensForNamenodes(credentials, inputPaths, getConf());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
+
+ SequenceFile.Writer fileListWriter = null;
+
+ try {
+ fileListWriter = getWriter(pathToListingFile);
+
+ for (Path path: options.getSourcePaths()) {
+ FileSystem sourceFS = path.getFileSystem(getConf());
+ path = makeQualified(path);
+
+ FileStatus rootStatus = sourceFS.getFileStatus(path);
+ Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
+ boolean localFile = (rootStatus.getClass() != FileStatus.class);
+
+ FileStatus[] sourceFiles = sourceFS.listStatus(path);
+ if (sourceFiles != null && sourceFiles.length > 0) {
+ for (FileStatus sourceStatus: sourceFiles) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
+ }
+ writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot, localFile);
+
+ if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath());
+ }
+ traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, localFile);
+ }
+ }
+ } else {
+ writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, localFile);
+ }
+ }
+ } finally {
+ IOUtils.closeStream(fileListWriter);
+ }
+ }
+
+ private Path computeSourceRootPath(FileStatus sourceStatus,
+ DistCpOptions options) throws IOException {
+
+ Path target = options.getTargetPath();
+ FileSystem targetFS = target.getFileSystem(getConf());
+
+ boolean solitaryFile = options.getSourcePaths().size() == 1
+ && !sourceStatus.isDirectory();
+
+ if (solitaryFile) {
+ if (targetFS.isFile(target) || !targetFS.exists(target)) {
+ return sourceStatus.getPath();
+ } else {
+ return sourceStatus.getPath().getParent();
+ }
+ } else {
+ boolean specialHandling = (options.getSourcePaths().size() == 1 && !targetFS.exists(target)) ||
+ options.shouldSyncFolder() || options.shouldOverwrite();
+
+ return specialHandling && sourceStatus.isDirectory() ? sourceStatus.getPath() :
+ sourceStatus.getPath().getParent();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getBytesToCopy() {
+ return totalBytesToCopy;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getNumberOfPaths() {
+ return totalPaths;
+ }
+
+ private Path makeQualified(Path path) throws IOException {
+ final FileSystem fs = path.getFileSystem(getConf());
+ return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ }
+
+ private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
+ FileSystem fs = pathToListFile.getFileSystem(getConf());
+ if (fs.exists(pathToListFile)) {
+ fs.delete(pathToListFile, false);
+ }
+ return SequenceFile.createWriter(getConf(),
+ SequenceFile.Writer.file(pathToListFile),
+ SequenceFile.Writer.keyClass(Text.class),
+ SequenceFile.Writer.valueClass(FileStatus.class),
+ SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
+ }
+
+ private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem,
+ FileStatus fileStatus) throws IOException {
+ return fileStatus.isDirectory() && getChildren(fileSystem, fileStatus).length > 0;
+ }
+
+ private static FileStatus[] getChildren(FileSystem fileSystem,
+ FileStatus parent) throws IOException {
+ return fileSystem.listStatus(parent.getPath());
+ }
+
+ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
+ FileStatus sourceStatus,
+ Path sourcePathRoot, boolean localFile)
+ throws IOException {
+ FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
+ Stack<FileStatus> pathStack = new Stack<FileStatus>();
+ pathStack.push(sourceStatus);
+
+ while (!pathStack.isEmpty()) {
+ for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Recording source-path: "
+ + sourceStatus.getPath() + " for copy.");
+ writeToFileListing(fileListWriter, child, sourcePathRoot, localFile);
+ if (isDirectoryAndNotEmpty(sourceFS, child)) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Traversing non-empty source dir: "
+ + sourceStatus.getPath());
+ pathStack.push(child);
+ }
+ }
+ }
+ }
+
+ private void writeToFileListing(SequenceFile.Writer fileListWriter,
+ FileStatus fileStatus, Path sourcePathRoot,
+ boolean localFile) throws IOException {
+ if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDirectory())
+ return; // Skip the root-paths.
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
+ fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
+ }
+
+ FileStatus status = fileStatus;
+ if (localFile) {
+ status = getFileStatus(fileStatus);
+ }
+
+ fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
+ fileStatus.getPath())), status);
+ fileListWriter.sync();
+
+ if (!fileStatus.isDirectory()) {
+ totalBytesToCopy += fileStatus.getLen();
+ }
+ totalPaths++;
+ }
+
+ private static final ByteArrayOutputStream buffer = new ByteArrayOutputStream(64);
+ private DataInputBuffer in = new DataInputBuffer();
+
+ private FileStatus getFileStatus(FileStatus fileStatus) throws IOException {
+ FileStatus status = new FileStatus();
+
+ buffer.reset();
+ DataOutputStream out = new DataOutputStream(buffer);
+ fileStatus.write(out);
+
+ in.reset(buffer.toByteArray(), 0, buffer.size());
+ status.readFields(in);
+ return status;
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.tools.*;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.util.DistCpUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+/**
+ * The CopyCommitter class is DistCp's OutputCommitter implementation. It is
+ * responsible for handling the completion/cleanup of the DistCp run.
+ * Specifically, it does the following:
+ * 1. Cleanup of the meta-folder (where DistCp maintains its file-list, etc.)
+ * 2. Preservation of user/group/replication-factor on any directories that
+ * have been copied. (Files are taken care of in their map-tasks.)
+ * 3. Atomic-move of data from the temporary work-folder to the final path
+ * (if atomic-commit was opted for).
+ * 4. Deletion of files from the target that are missing at source (if opted for).
+ * 5. Cleanup of any partially copied files, from previous, failed attempts.
+ */
+public class CopyCommitter extends FileOutputCommitter {
+ private static final Log LOG = LogFactory.getLog(CopyCommitter.class);
+
+ private final TaskAttemptContext taskAttemptContext;
+
+ /**
+ * Create a output committer
+ *
+ * @param outputPath the job's output path
+ * @param context the task's context
+ * @throws IOException - Exception if any
+ */
+ public CopyCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
+ super(outputPath, context);
+ this.taskAttemptContext = context;
+ }
+
+ /** @inheritDoc */
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ super.commitJob(jobContext);
+
+ cleanupTempFiles(jobContext);
+
+ String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+ if (attributes != null && !attributes.isEmpty()) {
+ preserveFileAttributesForDirectories(conf);
+ }
+
+ try {
+ if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
+ deleteMissing(conf);
+ } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
+ commitData(conf);
+ }
+ taskAttemptContext.setStatus("Commit Successful");
+ }
+ finally {
+ cleanup(conf);
+ }
+ }
+
+ /** @inheritDoc */
+ @Override
+ public void abortJob(JobContext jobContext,
+ JobStatus.State state) throws IOException {
+ try {
+ super.abortJob(jobContext, state);
+ } finally {
+ cleanupTempFiles(jobContext);
+ cleanup(jobContext.getConfiguration());
+ }
+ }
+
+ private void cleanupTempFiles(JobContext context) {
+ try {
+ Configuration conf = context.getConfiguration();
+
+ Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+ FileSystem targetFS = targetWorkPath.getFileSystem(conf);
+
+ String jobId = context.getJobID().toString();
+ deleteAttemptTempFiles(targetWorkPath, targetFS, jobId);
+ deleteAttemptTempFiles(targetWorkPath.getParent(), targetFS, jobId);
+ } catch (Throwable t) {
+ LOG.warn("Unable to cleanup temp files", t);
+ }
+ }
+
+ private void deleteAttemptTempFiles(Path targetWorkPath,
+ FileSystem targetFS,
+ String jobId) throws IOException {
+
+ FileStatus[] tempFiles = targetFS.globStatus(
+ new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*"));
+
+ if (tempFiles != null && tempFiles.length > 0) {
+ for (FileStatus file : tempFiles) {
+ LOG.info("Cleaning up " + file.getPath());
+ targetFS.delete(file.getPath(), false);
+ }
+ }
+ }
+
+ /**
+ * Cleanup meta folder and other temporary files
+ *
+ * @param conf - Job Configuration
+ */
+ private void cleanup(Configuration conf) {
+ Path metaFolder = new Path(conf.get(DistCpConstants.CONF_LABEL_META_FOLDER));
+ try {
+ FileSystem fs = metaFolder.getFileSystem(conf);
+ LOG.info("Cleaning up temporary work folder: " + metaFolder);
+ fs.delete(metaFolder, true);
+ } catch (IOException ignore) {
+ LOG.error("Exception encountered ", ignore);
+ }
+ }
+
+ // This method changes the target-directories' file-attributes (owner,
+ // user/group permissions, etc.) based on the corresponding source directories.
+ private void preserveFileAttributesForDirectories(Configuration conf) throws IOException {
+ String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+ LOG.info("About to preserve attributes: " + attrSymbols);
+
+ EnumSet<FileAttribute> attributes = DistCpUtils.unpackAttributes(attrSymbols);
+
+ Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+ FileSystem clusterFS = sourceListing.getFileSystem(conf);
+ SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(sourceListing));
+ long totalLen = clusterFS.getFileStatus(sourceListing).getLen();
+
+ Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+ long preservedEntries = 0;
+ try {
+ FileStatus srcFileStatus = new FileStatus();
+ Text srcRelPath = new Text();
+
+ // Iterate over every source path that was copied.
+ while (sourceReader.next(srcRelPath, srcFileStatus)) {
+ // File-attributes for files are set at the time of copy,
+ // in the map-task.
+ if (! srcFileStatus.isDirectory()) continue;
+
+ Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
+
+ // Skip the root folder.
+ // Status can't be preserved on root-folder. (E.g. multiple paths may
+ // be copied to a single target folder. Which source-attributes to use
+ // on the target is undefined.)
+ if (targetRoot.equals(targetFile)) continue;
+
+ FileSystem targetFS = targetFile.getFileSystem(conf);
+ DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes);
+
+ taskAttemptContext.progress();
+ taskAttemptContext.setStatus("Preserving status on directory entries. [" +
+ sourceReader.getPosition() * 100 / totalLen + "%]");
+ }
+ } finally {
+ IOUtils.closeStream(sourceReader);
+ }
+ LOG.info("Preserved status on " + preservedEntries + " dir entries on target");
+ }
+
+ // This method deletes "extra" files from the target, if they're not
+ // available at the source.
+ private void deleteMissing(Configuration conf) throws IOException {
+ LOG.info("-delete option is enabled. About to remove entries from " +
+ "target that are missing in source");
+
+ // Sort the source-file listing alphabetically.
+ Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+ FileSystem clusterFS = sourceListing.getFileSystem(conf);
+ Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing);
+
+ // Similarly, create the listing of target-files. Sort alphabetically.
+ Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
+ CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
+
+ List<Path> targets = new ArrayList<Path>(1);
+ Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+ targets.add(targetFinalPath);
+ DistCpOptions options = new DistCpOptions(targets, new Path("/NONE"));
+
+ target.buildListing(targetListing, options);
+ Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
+ long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();
+
+ SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(sortedSourceListing));
+ SequenceFile.Reader targetReader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(sortedTargetListing));
+
+ // Walk both source and target file listings.
+ // Delete all from target that doesn't also exist on source.
+ long deletedEntries = 0;
+ try {
+ FileStatus srcFileStatus = new FileStatus();
+ Text srcRelPath = new Text();
+ FileStatus trgtFileStatus = new FileStatus();
+ Text trgtRelPath = new Text();
+
+ FileSystem targetFS = targetFinalPath.getFileSystem(conf);
+ boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+ while (targetReader.next(trgtRelPath, trgtFileStatus)) {
+ // Skip sources that don't exist on target.
+ while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) {
+ srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+ }
+
+ if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue;
+
+ // Target doesn't exist at source. Delete.
+ boolean result = (!targetFS.exists(trgtFileStatus.getPath()) ||
+ targetFS.delete(trgtFileStatus.getPath(), true));
+ if (result) {
+ LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source");
+ deletedEntries++;
+ } else {
+ throw new IOException("Unable to delete " + trgtFileStatus.getPath());
+ }
+ taskAttemptContext.progress();
+ taskAttemptContext.setStatus("Deleting missing files from target. [" +
+ targetReader.getPosition() * 100 / totalLen + "%]");
+ }
+ } finally {
+ IOUtils.closeStream(sourceReader);
+ IOUtils.closeStream(targetReader);
+ }
+ LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0));
+ }
+
+ private void commitData(Configuration conf) throws IOException {
+
+ Path workDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+ Path finalDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+ FileSystem targetFS = workDir.getFileSystem(conf);
+
+ LOG.info("Atomic commit enabled. Moving " + workDir + " to " + finalDir);
+ if (targetFS.exists(finalDir) && targetFS.exists(workDir)) {
+ LOG.error("Pre-existing final-path found at: " + finalDir);
+ throw new IOException("Target-path can't be committed to because it " +
+ "exists at " + finalDir + ". Copied data is in temp-dir: " + workDir + ". ");
+ }
+
+ boolean result = targetFS.rename(workDir, finalDir);
+ if (!result) {
+ LOG.warn("Rename failed. Perhaps data already moved. Verifying...");
+ result = targetFS.exists(finalDir) && !targetFS.exists(workDir);
+ }
+ if (result) {
+ LOG.info("Data committed successfully to " + finalDir);
+ taskAttemptContext.setStatus("Data committed successfully to " + finalDir);
+ } else {
+ LOG.error("Unable to commit data to " + finalDir);
+ throw new IOException("Atomic commit failed. Temporary data in " + workDir +
+ ", Unable to move to " + finalDir);
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptionSwitch;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.*;
+import java.util.EnumSet;
+import java.util.Arrays;
+
+/**
+ * Mapper class that executes the DistCp copy operation.
+ * Implements the o.a.h.mapreduce.Mapper<> interface.
+ */
+public class CopyMapper extends Mapper<Text, FileStatus, Text, Text> {
+
+ /**
+ * Hadoop counters for the DistCp CopyMapper.
+ * (These have been kept identical to the old DistCp,
+ * for backward compatibility.)
+ */
+ public static enum Counter {
+ COPY, // Number of files received by the mapper for copy.
+ SKIP, // Number of files skipped.
+ FAIL, // Number of files that failed to be copied.
+ BYTESCOPIED, // Number of bytes actually copied by the copy-mapper, total.
+ BYTESEXPECTED,// Number of bytes expected to be copied.
+ BYTESFAILED, // Number of bytes that failed to be copied.
+ BYTESSKIPPED, // Number of bytes that were skipped from copy.
+ }
+
+ private static Log LOG = LogFactory.getLog(CopyMapper.class);
+
+ private Configuration conf;
+
+ private boolean syncFolders = false;
+ private boolean ignoreFailures = false;
+ private boolean skipCrc = false;
+ private boolean overWrite = false;
+ private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
+
+ private FileSystem targetFS = null;
+ private Path targetWorkPath = null;
+
+ /**
+ * Implementation of the Mapper::setup() method. This extracts the DistCp-
+ * options specified in the Job's configuration, to set up the Job.
+ * @param context Mapper's context.
+ * @throws IOException On IO failure.
+ * @throws InterruptedException If the job is interrupted.
+ */
+ @Override
+ public void setup(Context context) throws IOException, InterruptedException {
+ conf = context.getConfiguration();
+
+ syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
+ ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
+ skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
+ overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
+ preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
+ PRESERVE_STATUS.getConfigLabel()));
+
+ targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+ Path targetFinalPath = new Path(conf.get(
+ DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+ targetFS = targetFinalPath.getFileSystem(conf);
+
+ if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
+ overWrite = true; // When target is an existing file, overwrite it.
+ }
+
+ if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
+ initializeSSLConf(context);
+ }
+ }
+
+ /**
+ * Initialize SSL Config if same is set in conf
+ *
+ * @throws IOException - If any
+ */
+ private void initializeSSLConf(Context context) throws IOException {
+ LOG.info("Initializing SSL configuration");
+
+ String workDir = conf.get(JobContext.JOB_LOCAL_DIR) + "/work";
+ Path[] cacheFiles = context.getLocalCacheFiles();
+
+ Configuration sslConfig = new Configuration(false);
+ String sslConfFileName = conf.get(DistCpConstants.CONF_LABEL_SSL_CONF);
+ Path sslClient = findCacheFile(cacheFiles, sslConfFileName);
+ if (sslClient == null) {
+ LOG.warn("SSL Client config file not found. Was looking for " + sslConfFileName +
+ " in " + Arrays.toString(cacheFiles));
+ return;
+ }
+ sslConfig.addResource(sslClient);
+
+ String trustStoreFile = conf.get("ssl.client.truststore.location");
+ Path trustStorePath = findCacheFile(cacheFiles, trustStoreFile);
+ sslConfig.set("ssl.client.truststore.location", trustStorePath.toString());
+
+ String keyStoreFile = conf.get("ssl.client.keystore.location");
+ Path keyStorePath = findCacheFile(cacheFiles, keyStoreFile);
+ sslConfig.set("ssl.client.keystore.location", keyStorePath.toString());
+
+ try {
+ OutputStream out = new FileOutputStream(workDir + "/" + sslConfFileName);
+ try {
+ sslConfig.writeXml(out);
+ } finally {
+ out.close();
+ }
+ conf.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfFileName);
+ } catch (IOException e) {
+ LOG.warn("Unable to write out the ssl configuration. " +
+ "Will fall back to default ssl-client.xml in class path, if there is one", e);
+ }
+ }
+
+ /**
+ * Find entry from distributed cache
+ *
+ * @param cacheFiles - All localized cache files
+ * @param fileName - fileName to search
+ * @return Path of the filename if found, else null
+ */
+ private Path findCacheFile(Path[] cacheFiles, String fileName) {
+ if (cacheFiles != null && cacheFiles.length > 0) {
+ for (Path file : cacheFiles) {
+ if (file.getName().equals(fileName)) {
+ return file;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Implementation of the Mapper<>::map(). Does the copy.
+ * @param relPath The target path.
+ * @param sourceFileStatus The source path.
+ * @throws IOException
+ */
+ @Override
+ public void map(Text relPath, FileStatus sourceFileStatus, Context context)
+ throws IOException, InterruptedException {
+ Path sourcePath = sourceFileStatus.getPath();
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
+
+ Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
+ targetFS.getWorkingDirectory()) + relPath.toString());
+
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = getFileAttributeSettings(context);
+
+ final String description = "Copying " + sourcePath + " to " + target;
+ context.setStatus(description);
+
+ LOG.info(description);
+
+ try {
+ FileStatus sourceCurrStatus;
+ FileSystem sourceFS;
+ try {
+ sourceFS = sourcePath.getFileSystem(conf);
+ sourceCurrStatus = sourceFS.getFileStatus(sourcePath);
+ } catch (FileNotFoundException e) {
+ throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
+ }
+
+ FileStatus targetStatus = null;
+
+ try {
+ targetStatus = targetFS.getFileStatus(target);
+ } catch (FileNotFoundException ignore) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Path could not be found: " + target, ignore);
+ }
+
+ if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
+ throw new IOException("Can't replace " + target + ". Target is " +
+ getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
+ }
+
+ if (sourceCurrStatus.isDirectory()) {
+ createTargetDirsWithRetry(description, target, context);
+ return;
+ }
+
+ if (skipFile(sourceFS, sourceCurrStatus, target)) {
+ LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
+ + " to " + target);
+ updateSkipCounters(context, sourceCurrStatus);
+ context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
+ }
+ else {
+ copyFileWithRetry(description, sourceCurrStatus, target, context,
+ fileAttributes);
+ }
+
+ DistCpUtils.preserve(target.getFileSystem(conf), target,
+ sourceCurrStatus, fileAttributes);
+
+ } catch (IOException exception) {
+ handleFailures(exception, sourceFileStatus, target, context);
+ }
+ }
+
+ private String getFileType(FileStatus fileStatus) {
+ return fileStatus == null ? "N/A" : (fileStatus.isDirectory() ? "dir" : "file");
+ }
+
+ private static EnumSet<DistCpOptions.FileAttribute>
+ getFileAttributeSettings(Mapper.Context context) {
+ String attributeString = context.getConfiguration().get(
+ DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel());
+ return DistCpUtils.unpackAttributes(attributeString);
+ }
+
+ private void copyFileWithRetry(String description, FileStatus sourceFileStatus,
+ Path target, Context context,
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes) throws IOException {
+
+ long bytesCopied;
+ try {
+ bytesCopied = (Long)new RetriableFileCopyCommand(description)
+ .execute(sourceFileStatus, target, context, fileAttributes);
+ } catch (Exception e) {
+ context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
+ throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
+ " --> " + target, e);
+ }
+ incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
+ incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
+ incrementCounter(context, Counter.COPY, 1);
+ }
+
+ private void createTargetDirsWithRetry(String description,
+ Path target, Context context) throws IOException {
+ try {
+ new RetriableDirectoryCreateCommand(description).execute(target, context);
+ } catch (Exception e) {
+ throw new IOException("mkdir failed for " + target, e);
+ }
+ incrementCounter(context, Counter.COPY, 1);
+ }
+
+ private static void updateSkipCounters(Context context,
+ FileStatus sourceFile) {
+ incrementCounter(context, Counter.SKIP, 1);
+ incrementCounter(context, Counter.BYTESSKIPPED, sourceFile.getLen());
+
+ }
+
+ private void handleFailures(IOException exception,
+ FileStatus sourceFileStatus, Path target,
+ Context context) throws IOException, InterruptedException {
+ LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " +
+ target, exception);
+
+ if (ignoreFailures && exception.getCause() instanceof
+ RetriableFileCopyCommand.CopyReadException) {
+ incrementCounter(context, Counter.FAIL, 1);
+ incrementCounter(context, Counter.BYTESFAILED, sourceFileStatus.getLen());
+ context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " - " +
+ StringUtils.stringifyException(exception)));
+ }
+ else
+ throw exception;
+ }
+
+ private static void incrementCounter(Context context, Counter counter,
+ long value) {
+ context.getCounter(counter).increment(value);
+ }
+
+ private boolean skipFile(FileSystem sourceFS, FileStatus source, Path target)
+ throws IOException {
+ return targetFS.exists(target)
+ && !overWrite
+ && !mustUpdate(sourceFS, source, target);
+ }
+
+ private boolean mustUpdate(FileSystem sourceFS, FileStatus source, Path target)
+ throws IOException {
+ final FileStatus targetFileStatus = targetFS.getFileStatus(target);
+
+ return syncFolders
+ && (
+ targetFileStatus.getLen() != source.getLen()
+ || (!skipCrc &&
+ !DistCpUtils.checksumsAreEqual(sourceFS,
+ source.getPath(), targetFS, target))
+ || (source.getBlockSize() != targetFileStatus.getBlockSize() &&
+ preserve.contains(FileAttribute.BLOCKSIZE))
+ );
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.tools.DistCpConstants;
+
+import java.io.IOException;
+
+/**
+ * The CopyOutputFormat is the Hadoop OutputFormat used in DistCp.
+ * It sets up the Job's Configuration (in the Job-Context) with the settings
+ * for the work-directory, final commit-directory, etc. It also sets the right
+ * output-committer.
+ * @param <K>
+ * @param <V>
+ */
+public class CopyOutputFormat<K, V> extends TextOutputFormat<K, V> {
+
+ /**
+ * Setter for the working directory for DistCp (where files will be copied
+ * before they are moved to the final commit-directory.)
+ * @param job The Job on whose configuration the working-directory is to be set.
+ * @param workingDirectory The path to use as the working directory.
+ */
+ public static void setWorkingDirectory(Job job, Path workingDirectory) {
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+ workingDirectory.toString());
+ }
+
+ /**
+ * Setter for the final directory for DistCp (where files copied will be
+ * moved, atomically.)
+ * @param job The Job on whose configuration the working-directory is to be set.
+ * @param commitDirectory The path to use for final commit.
+ */
+ public static void setCommitDirectory(Job job, Path commitDirectory) {
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+ commitDirectory.toString());
+ }
+
+ /**
+ * Getter for the working directory.
+ * @param job The Job from whose configuration the working-directory is to
+ * be retrieved.
+ * @return The working-directory Path.
+ */
+ public static Path getWorkingDirectory(Job job) {
+ return getWorkingDirectory(job.getConfiguration());
+ }
+
+ private static Path getWorkingDirectory(Configuration conf) {
+ String workingDirectory = conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH);
+ if (workingDirectory == null || workingDirectory.isEmpty()) {
+ return null;
+ } else {
+ return new Path(workingDirectory);
+ }
+ }
+
+ /**
+ * Getter for the final commit-directory.
+ * @param job The Job from whose configuration the commit-directory is to be
+ * retrieved.
+ * @return The commit-directory Path.
+ */
+ public static Path getCommitDirectory(Job job) {
+ return getCommitDirectory(job.getConfiguration());
+ }
+
+ private static Path getCommitDirectory(Configuration conf) {
+ String commitDirectory = conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH);
+ if (commitDirectory == null || commitDirectory.isEmpty()) {
+ return null;
+ } else {
+ return new Path(commitDirectory);
+ }
+ }
+
+ /** @inheritDoc */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
+ return new CopyCommitter(getOutputPath(context), context);
+ }
+
+ /** @inheritDoc */
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+
+ if (getCommitDirectory(conf) == null) {
+ throw new IllegalStateException("Commit directory not configured");
+ }
+
+ Path workingPath = getWorkingDirectory(conf);
+ if (workingPath == null) {
+ throw new IllegalStateException("Working directory not configured");
+ }
+
+ // get delegation token for outDir's file system
+ TokenCache.obtainTokensForNamenodes(context.getCredentials(),
+ new Path[] {workingPath}, conf);
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.tools.util.RetriableCommand;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * This class extends Retriable command to implement the creation of directories
+ * with retries on failure.
+ */
+public class RetriableDirectoryCreateCommand extends RetriableCommand {
+
+ /**
+ * Constructor, taking a description of the action.
+ * @param description Verbose description of the copy operation.
+ */
+ public RetriableDirectoryCreateCommand(String description) {
+ super(description);
+ }
+
+ /**
+ * Implementation of RetriableCommand::doExecute().
+ * This implements the actual mkdirs() functionality.
+ * @param arguments Argument-list to the command.
+ * @return Boolean. True, if the directory could be created successfully.
+ * @throws Exception IOException, on failure to create the directory.
+ */
+ @Override
+ protected Object doExecute(Object... arguments) throws Exception {
+ assert arguments.length == 2 : "Unexpected argument list.";
+ Path target = (Path)arguments[0];
+ Mapper.Context context = (Mapper.Context)arguments[1];
+
+ FileSystem targetFS = target.getFileSystem(context.getConfiguration());
+ return targetFS.mkdirs(target);
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.tools.util.RetriableCommand;
+import org.apache.hadoop.tools.util.ThrottledInputStream;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.tools.DistCpOptions.*;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.util.EnumSet;
+
+/**
+ * This class extends RetriableCommand to implement the copy of files,
+ * with retries on failure.
+ */
+public class RetriableFileCopyCommand extends RetriableCommand {
+
+ private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
+ private static int BUFFER_SIZE = 8 * 1024;
+
+ /**
+ * Constructor, taking a description of the action.
+ * @param description Verbose description of the copy operation.
+ */
+ public RetriableFileCopyCommand(String description) {
+ super(description);
+ }
+
+ /**
+ * Implementation of RetriableCommand::doExecute().
+ * This is the actual copy-implementation.
+ * @param arguments Argument-list to the command.
+ * @return Number of bytes copied.
+ * @throws Exception: CopyReadException, if there are read-failures. All other
+ * failures are IOExceptions.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Object doExecute(Object... arguments) throws Exception {
+ assert arguments.length == 4 : "Unexpected argument list.";
+ FileStatus source = (FileStatus)arguments[0];
+ assert !source.isDirectory() : "Unexpected file-status. Expected file.";
+ Path target = (Path)arguments[1];
+ Mapper.Context context = (Mapper.Context)arguments[2];
+ EnumSet<FileAttribute> fileAttributes
+ = (EnumSet<FileAttribute>)arguments[3];
+ return doCopy(source, target, context, fileAttributes);
+ }
+
+ private long doCopy(FileStatus sourceFileStatus, Path target,
+ Mapper.Context context,
+ EnumSet<FileAttribute> fileAttributes)
+ throws IOException {
+
+ Path tmpTargetPath = getTmpFile(target, context);
+ final Configuration configuration = context.getConfiguration();
+ FileSystem targetFS = target.getFileSystem(configuration);
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
+ LOG.debug("Tmp-file path: " + tmpTargetPath);
+ }
+ FileSystem sourceFS = sourceFileStatus.getPath().getFileSystem(
+ configuration);
+ long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
+ context, fileAttributes);
+
+ compareFileLengths(sourceFileStatus, tmpTargetPath, configuration, bytesRead);
+ compareCheckSums(sourceFS, sourceFileStatus.getPath(), targetFS, tmpTargetPath);
+ promoteTmpToTarget(tmpTargetPath, target, targetFS);
+ return bytesRead;
+
+ } finally {
+ if (targetFS.exists(tmpTargetPath))
+ targetFS.delete(tmpTargetPath, false);
+ }
+ }
+
+ private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
+ FileStatus sourceFileStatus, Mapper.Context context,
+ EnumSet<FileAttribute> fileAttributes)
+ throws IOException {
+ OutputStream outStream = new BufferedOutputStream(targetFS.create(
+ tmpTargetPath, true, BUFFER_SIZE,
+ getReplicationFactor(fileAttributes, sourceFileStatus, targetFS),
+ getBlockSize(fileAttributes, sourceFileStatus, targetFS), context));
+ return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, true, context);
+ }
+
+ private void compareFileLengths(FileStatus sourceFileStatus, Path target,
+ Configuration configuration, long bytesRead)
+ throws IOException {
+ final Path sourcePath = sourceFileStatus.getPath();
+ FileSystem fs = sourcePath.getFileSystem(configuration);
+ if (fs.getFileStatus(sourcePath).getLen() != bytesRead)
+ throw new IOException("Mismatch in length of source:" + sourcePath
+ + " and target:" + target);
+ }
+
+ private void compareCheckSums(FileSystem sourceFS, Path source,
+ FileSystem targetFS, Path target)
+ throws IOException {
+ if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target))
+ throw new IOException("Check-sum mismatch between "
+ + source + " and " + target);
+
+ }
+
+ //If target file exists and unable to delete target - fail
+ //If target doesn't exist and unable to create parent folder - fail
+ //If target is successfully deleted and parent exists, if rename fails - fail
+ private void promoteTmpToTarget(Path tmpTarget, Path target, FileSystem fs)
+ throws IOException {
+ if ((fs.exists(target) && !fs.delete(target, false))
+ || (!fs.exists(target.getParent()) && !fs.mkdirs(target.getParent()))
+ || !fs.rename(tmpTarget, target)) {
+ throw new IOException("Failed to promote tmp-file:" + tmpTarget
+ + " to: " + target);
+ }
+ }
+
+ private Path getTmpFile(Path target, Mapper.Context context) {
+ Path targetWorkPath = new Path(context.getConfiguration().
+ get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+ Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
+ LOG.info("Creating temp file: " +
+ new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
+ return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
+ }
+
+ private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
+ int bufferSize, boolean mustCloseStream,
+ Mapper.Context context) throws IOException {
+ Path source = sourceFileStatus.getPath();
+ byte buf[] = new byte[bufferSize];
+ ThrottledInputStream inStream = null;
+ long totalBytesRead = 0;
+
+ try {
+ inStream = getInputStream(source, context.getConfiguration());
+ int bytesRead = readBytes(inStream, buf);
+ while (bytesRead >= 0) {
+ totalBytesRead += bytesRead;
+ outStream.write(buf, 0, bytesRead);
+ updateContextStatus(totalBytesRead, context, sourceFileStatus);
+ bytesRead = inStream.read(buf);
+ }
+ } finally {
+ if (mustCloseStream)
+ IOUtils.cleanup(LOG, outStream, inStream);
+ }
+
+ return totalBytesRead;
+ }
+
+ private void updateContextStatus(long totalBytesRead, Mapper.Context context,
+ FileStatus sourceFileStatus) {
+ StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
+ .format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
+ message.append("% ")
+ .append(description).append(" [")
+ .append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
+ .append('/')
+ .append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
+ .append(']');
+ context.setStatus(message.toString());
+ }
+
+ private static int readBytes(InputStream inStream, byte buf[])
+ throws IOException {
+ try {
+ return inStream.read(buf);
+ }
+ catch (IOException e) {
+ throw new CopyReadException(e);
+ }
+ }
+
+ private static ThrottledInputStream getInputStream(Path path, Configuration conf)
+ throws IOException {
+ try {
+ FileSystem fs = path.getFileSystem(conf);
+ long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+ DistCpConstants.DEFAULT_BANDWIDTH_MB);
+ return new ThrottledInputStream(new BufferedInputStream(fs.open(path)),
+ bandwidthMB * 1024 * 1024);
+ }
+ catch (IOException e) {
+ throw new CopyReadException(e);
+ }
+ }
+
+ private static short getReplicationFactor(
+ EnumSet<FileAttribute> fileAttributes,
+ FileStatus sourceFile, FileSystem targetFS) {
+ return fileAttributes.contains(FileAttribute.REPLICATION)?
+ sourceFile.getReplication() : targetFS.getDefaultReplication();
+ }
+
+ private static long getBlockSize(
+ EnumSet<FileAttribute> fileAttributes,
+ FileStatus sourceFile, FileSystem targetFS) {
+ return fileAttributes.contains(FileAttribute.BLOCKSIZE)?
+ sourceFile.getBlockSize() : targetFS.getDefaultBlockSize();
+ }
+
+ /**
+ * Special subclass of IOException. This is used to distinguish read-operation
+ * failures from other kinds of IOExceptions.
+ * The failure to read from source is dealt with specially, in the CopyMapper.
+ * Such failures may be skipped if the DistCpOptions indicate so.
+ * Write failures are intolerable, and amount to CopyMapper failure.
+ */
+ public static class CopyReadException extends IOException {
+ public CopyReadException(Throwable rootCause) {
+ super(rootCause);
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * UniformSizeInputFormat extends the InputFormat<> class, to produce
+ * input-splits for DistCp.
+ * It looks at the copy-listing and groups the contents into input-splits such
+ * that the total-number of bytes to be copied for each input split is
+ * uniform.
+ */
+public class UniformSizeInputFormat extends InputFormat<Text, FileStatus> {
+ private static final Log LOG
+ = LogFactory.getLog(UniformSizeInputFormat.class);
+
+ /**
+ * Implementation of InputFormat::getSplits(). Returns a list of InputSplits,
+ * such that the number of bytes to be copied for all the splits are
+ * approximately equal.
+ * @param context JobContext for the job.
+ * @return The list of uniformly-distributed input-splits.
+ * @throws IOException: On failure.
+ * @throws InterruptedException
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext context)
+ throws IOException, InterruptedException {
+ Configuration configuration = context.getConfiguration();
+ int numSplits = DistCpUtils.getInt(configuration,
+ JobContext.NUM_MAPS);
+
+ if (numSplits == 0) return new ArrayList<InputSplit>();
+
+ return getSplits(configuration, numSplits,
+ DistCpUtils.getLong(configuration,
+ DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
+ }
+
+ private List<InputSplit> getSplits(Configuration configuration, int numSplits,
+ long totalSizeBytes) throws IOException {
+ List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+ long nBytesPerSplit = (long) Math.ceil(totalSizeBytes * 1.0 / numSplits);
+
+ FileStatus srcFileStatus = new FileStatus();
+ Text srcRelPath = new Text();
+ long currentSplitSize = 0;
+ long lastSplitStart = 0;
+ long lastPosition = 0;
+
+ final Path listingFilePath = getListingFilePath(configuration);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Average bytes per map: " + nBytesPerSplit +
+ ", Number of maps: " + numSplits + ", total size: " + totalSizeBytes);
+ }
+ SequenceFile.Reader reader=null;
+ try {
+ reader = getListingFileReader(configuration);
+ while (reader.next(srcRelPath, srcFileStatus)) {
+ // If adding the current file would cause the bytes per map to exceed
+ // limit. Add the current file to new split
+ if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) {
+ FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
+ lastPosition - lastSplitStart, null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug ("Creating split : " + split + ", bytes in split: " + currentSplitSize);
+ }
+ splits.add(split);
+ lastSplitStart = lastPosition;
+ currentSplitSize = 0;
+ }
+ currentSplitSize += srcFileStatus.getLen();
+ lastPosition = reader.getPosition();
+ }
+ if (lastPosition > lastSplitStart) {
+ FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
+ lastPosition - lastSplitStart, null);
+ if (LOG.isDebugEnabled()) {
+ LOG.info ("Creating split : " + split + ", bytes in split: " + currentSplitSize);
+ }
+ splits.add(split);
+ }
+
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+
+ return splits;
+ }
+
+ private static Path getListingFilePath(Configuration configuration) {
+ final String listingFilePathString =
+ configuration.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+
+ assert !listingFilePathString.equals("")
+ : "Couldn't find listing file. Invalid input.";
+ return new Path(listingFilePathString);
+ }
+
+ private SequenceFile.Reader getListingFileReader(Configuration configuration) {
+
+ final Path listingFilePath = getListingFilePath(configuration);
+ try {
+ final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
+ if (!fileSystem.exists(listingFilePath))
+ throw new IllegalArgumentException("Listing file doesn't exist at: "
+ + listingFilePath);
+
+ return new SequenceFile.Reader(configuration,
+ SequenceFile.Reader.file(listingFilePath));
+ }
+ catch (IOException exception) {
+ LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
+ throw new IllegalArgumentException("Couldn't find listing-file at: "
+ + listingFilePath, exception);
+ }
+ }
+
+ /**
+ * Implementation of InputFormat::createRecordReader().
+ * @param split The split for which the RecordReader is sought.
+ * @param context The context of the current task-attempt.
+ * @return A SequenceFileRecordReader instance, (since the copy-listing is a
+ * simple sequence-file.)
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public RecordReader<Text, FileStatus> createRecordReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SequenceFileRecordReader<Text, FileStatus>();
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+
+import java.io.IOException;
+
+/**
+ * The DynamicInputChunk represents a single chunk of work, when used in
+ * conjunction with the DynamicInputFormat and the DynamicRecordReader.
+ * The records in the DynamicInputFormat's input-file are split across various
+ * DynamicInputChunks. Each one is claimed and processed in an iteration of
+ * a dynamic-mapper. When a DynamicInputChunk has been exhausted, the faster
+ * mapper may claim another and process it, until there are no more to be
+ * consumed.
+ */
+class DynamicInputChunk<K, V> {
+ private static Log LOG = LogFactory.getLog(DynamicInputChunk.class);
+
+ private static Configuration configuration;
+ private static Path chunkRootPath;
+ private static String chunkFilePrefix;
+ private static int numChunksLeft = -1; // Un-initialized before 1st dir-scan.
+ private static FileSystem fs;
+
+ private Path chunkFilePath;
+ private SequenceFileRecordReader<K, V> reader;
+ private SequenceFile.Writer writer;
+
+ private static void initializeChunkInvariants(Configuration config)
+ throws IOException {
+ configuration = config;
+ Path listingFilePath = new Path(getListingFilePath(configuration));
+ chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir");
+ fs = chunkRootPath.getFileSystem(configuration);
+ chunkFilePrefix = listingFilePath.getName() + ".chunk.";
+ }
+
+ private static String getListingFilePath(Configuration configuration) {
+ final String listingFileString = configuration.get(
+ DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+ assert !listingFileString.equals("") : "Listing file not found.";
+ return listingFileString;
+ }
+
+ private static boolean areInvariantsInitialized() {
+ return chunkRootPath != null;
+ }
+
+ private DynamicInputChunk(String chunkId, Configuration configuration)
+ throws IOException {
+ if (!areInvariantsInitialized())
+ initializeChunkInvariants(configuration);
+
+ chunkFilePath = new Path(chunkRootPath, chunkFilePrefix + chunkId);
+ openForWrite();
+ }
+
+
+ private void openForWrite() throws IOException {
+ writer = SequenceFile.createWriter(
+ chunkFilePath.getFileSystem(configuration), configuration,
+ chunkFilePath, Text.class, FileStatus.class,
+ SequenceFile.CompressionType.NONE);
+
+ }
+
+ /**
+ * Factory method to create chunk-files for writing to.
+ * (For instance, when the DynamicInputFormat splits the input-file into
+ * chunks.)
+ * @param chunkId String to identify the chunk.
+ * @param configuration Configuration, describing the location of the listing-
+ * file, file-system for the map-job, etc.
+ * @return A DynamicInputChunk, corresponding to a chunk-file, with the name
+ * incorporating the chunk-id.
+ * @throws IOException Exception on failure to create the chunk.
+ */
+ public static DynamicInputChunk createChunkForWrite(String chunkId,
+ Configuration configuration) throws IOException {
+ return new DynamicInputChunk(chunkId, configuration);
+ }
+
+ /**
+ * Method to write records into a chunk.
+ * @param key Key from the listing file.
+ * @param value Corresponding value from the listing file.
+ * @throws IOException Exception onf failure to write to the file.
+ */
+ public void write(Text key, FileStatus value) throws IOException {
+ writer.append(key, value);
+ }
+
+ /**
+ * Closes streams opened to the chunk-file.
+ */
+ public void close() {
+ IOUtils.cleanup(LOG, reader, writer);
+ }
+
+ /**
+ * Reassigns the chunk to a specified Map-Task, for consumption.
+ * @param taskId The Map-Task to which a the chunk is to be reassigned.
+ * @throws IOException Exception on failure to reassign.
+ */
+ public void assignTo(TaskID taskId) throws IOException {
+ Path newPath = new Path(chunkRootPath, taskId.toString());
+ if (!fs.rename(chunkFilePath, newPath)) {
+ LOG.warn(chunkFilePath + " could not be assigned to " + taskId);
+ }
+ }
+
+ private DynamicInputChunk(Path chunkFilePath,
+ TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ if (!areInvariantsInitialized())
+ initializeChunkInvariants(taskAttemptContext.getConfiguration());
+
+ this.chunkFilePath = chunkFilePath;
+ openForRead(taskAttemptContext);
+ }
+
+ private void openForRead(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ reader = new SequenceFileRecordReader<K, V>();
+ reader.initialize(new FileSplit(chunkFilePath, 0,
+ DistCpUtils.getFileSize(chunkFilePath, configuration), null),
+ taskAttemptContext);
+ }
+
+ /**
+ * Factory method that
+ * 1. acquires a chunk for the specified map-task attempt
+ * 2. returns a DynamicInputChunk associated with the acquired chunk-file.
+ * @param taskAttemptContext The attempt-context for the map task that's
+ * trying to acquire a chunk.
+ * @return The acquired dynamic-chunk. The chunk-file is renamed to the
+ * attempt-id (from the attempt-context.)
+ * @throws IOException Exception on failure.
+ * @throws InterruptedException Exception on failure.
+ */
+ public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ if (!areInvariantsInitialized())
+ initializeChunkInvariants(taskAttemptContext.getConfiguration());
+
+ String taskId
+ = taskAttemptContext.getTaskAttemptID().getTaskID().toString();
+ Path acquiredFilePath = new Path(chunkRootPath, taskId);
+
+ if (fs.exists(acquiredFilePath)) {
+ LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath);
+ return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
+ }
+
+ for (FileStatus chunkFile : getListOfChunkFiles()) {
+ if (fs.rename(chunkFile.getPath(), acquiredFilePath)) {
+ LOG.info(taskId + " acquired " + chunkFile.getPath());
+ return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
+ }
+ else
+ LOG.warn(taskId + " could not acquire " + chunkFile.getPath());
+ }
+
+ return null;
+ }
+
+ /**
+ * Method to be called to relinquish an acquired chunk. All streams open to
+ * the chunk are closed, and the chunk-file is deleted.
+ * @throws IOException Exception thrown on failure to release (i.e. delete)
+ * the chunk file.
+ */
+ public void release() throws IOException {
+ close();
+ if (!fs.delete(chunkFilePath, false)) {
+ LOG.error("Unable to release chunk at path: " + chunkFilePath);
+ throw new IOException("Unable to release chunk at path: " + chunkFilePath);
+ }
+ }
+
+ static FileStatus [] getListOfChunkFiles() throws IOException {
+ Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*");
+ FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern);
+ numChunksLeft = chunkFiles.length;
+ return chunkFiles;
+ }
+
+ /**
+ * Getter for the chunk-file's path, on HDFS.
+ * @return The qualified path to the chunk-file.
+ */
+ public Path getPath() {
+ return chunkFilePath;
+ }
+
+ /**
+ * Getter for the record-reader, opened to the chunk-file.
+ * @return Opened Sequence-file reader.
+ */
+ public SequenceFileRecordReader<K,V> getReader() {
+ assert reader != null : "Reader un-initialized!";
+ return reader;
+ }
+
+ /**
+ * Getter for the number of chunk-files left in the chunk-file directory.
+ * Useful to determine how many chunks (and hence, records) are left to be
+ * processed.
+ * @return Before the first scan of the directory, the number returned is -1.
+ * Otherwise, the number of chunk-files seen from the last scan is returned.
+ */
+ public static int getNumChunksLeft() {
+ return numChunksLeft;
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java?rev=1236045&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java Thu Jan 26 06:36:52 2012
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+
+/**
+ * DynamicInputFormat implements the "Worker pattern" for DistCp.
+ * Rather than to split up the copy-list into a set of static splits,
+ * the DynamicInputFormat does the following:
+ * 1. Splits the copy-list into small chunks on the DFS.
+ * 2. Creates a set of empty "dynamic" splits, that each consume as many chunks
+ * as it can.
+ * This arrangement ensures that a single slow mapper won't slow down the entire
+ * job (since the slack will be picked up by other mappers, who consume more
+ * chunks.)
+ * By varying the split-ratio, one can vary chunk sizes to achieve different
+ * performance characteristics.
+ */
+public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
+ private static final Log LOG = LogFactory.getLog(DynamicInputFormat.class);
+
+ private static final String CONF_LABEL_LISTING_SPLIT_RATIO
+ = "mapred.listing.split.ratio";
+ private static final String CONF_LABEL_NUM_SPLITS
+ = "mapred.num.splits";
+ private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
+ = "mapred.num.entries.per.chunk";
+
+ /**
+ * Implementation of InputFormat::getSplits(). This method splits up the
+ * copy-listing file into chunks, and assigns the first batch to different
+ * tasks.
+ * @param jobContext JobContext for the map job.
+ * @return The list of (empty) dynamic input-splits.
+ * @throws IOException, on failure.
+ * @throws InterruptedException
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ LOG.info("DynamicInputFormat: Getting splits for job:"
+ + jobContext.getJobID());
+ return createSplits(jobContext,
+ splitCopyListingIntoChunksWithShuffle(jobContext));
+ }
+
+ private List<InputSplit> createSplits(JobContext jobContext,
+ List<DynamicInputChunk> chunks)
+ throws IOException {
+ int numMaps = getNumMapTasks(jobContext.getConfiguration());
+
+ final int nSplits = Math.min(numMaps, chunks.size());
+ List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
+
+ for (int i=0; i< nSplits; ++i) {
+ TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
+ chunks.get(i).assignTo(taskId);
+ splits.add(new FileSplit(chunks.get(i).getPath(), 0,
+ // Setting non-zero length for FileSplit size, to avoid a possible
+ // future when 0-sized file-splits are considered "empty" and skipped
+ // over.
+ MIN_RECORDS_PER_CHUNK,
+ null));
+ }
+ DistCpUtils.publish(jobContext.getConfiguration(),
+ CONF_LABEL_NUM_SPLITS, splits.size());
+ return splits;
+ }
+
+ private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16;
+
+ private List<DynamicInputChunk> splitCopyListingIntoChunksWithShuffle
+ (JobContext context) throws IOException {
+
+ final Configuration configuration = context.getConfiguration();
+ int numRecords = getNumberOfRecords(configuration);
+ int numMaps = getNumMapTasks(configuration);
+ // Number of chunks each map will process, on average.
+ int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
+ validateNumChunksUsing(splitRatio, numMaps);
+
+ int numEntriesPerChunk = (int)Math.ceil((float)numRecords
+ /(splitRatio * numMaps));
+ DistCpUtils.publish(context.getConfiguration(),
+ CONF_LABEL_NUM_ENTRIES_PER_CHUNK,
+ numEntriesPerChunk);
+
+ final int nChunksTotal = (int)Math.ceil((float)numRecords/numEntriesPerChunk);
+ int nChunksOpenAtOnce
+ = Math.min(N_CHUNKS_OPEN_AT_ONCE_DEFAULT, nChunksTotal);
+
+ Path listingPath = getListingFilePath(configuration);
+ SequenceFile.Reader reader
+ = new SequenceFile.Reader(configuration,
+ SequenceFile.Reader.file(listingPath));
+
+ List<DynamicInputChunk> openChunks
+ = new ArrayList<DynamicInputChunk>();
+
+ List<DynamicInputChunk> chunksFinal = new ArrayList<DynamicInputChunk>();
+
+ FileStatus fileStatus = new FileStatus();
+ Text relPath = new Text();
+ int recordCounter = 0;
+ int chunkCount = 0;
+
+ try {
+
+ while (reader.next(relPath, fileStatus)) {
+ if (recordCounter % (nChunksOpenAtOnce*numEntriesPerChunk) == 0) {
+ // All chunks full. Create new chunk-set.
+ closeAll(openChunks);
+ chunksFinal.addAll(openChunks);
+
+ openChunks = createChunks(
+ configuration, chunkCount, nChunksTotal, nChunksOpenAtOnce);
+
+ chunkCount += openChunks.size();
+
+ nChunksOpenAtOnce = openChunks.size();
+ recordCounter = 0;
+ }
+
+ // Shuffle into open chunks.
+ openChunks.get(recordCounter%nChunksOpenAtOnce).write(relPath, fileStatus);
+ ++recordCounter;
+ }
+
+ } finally {
+ closeAll(openChunks);
+ chunksFinal.addAll(openChunks);
+ IOUtils.closeStream(reader);
+ }
+
+ LOG.info("Number of dynamic-chunk-files created: " + chunksFinal.size());
+ return chunksFinal;
+ }
+
+ private static void validateNumChunksUsing(int splitRatio, int numMaps)
+ throws IOException {
+ if (splitRatio * numMaps > MAX_CHUNKS_TOLERABLE)
+ throw new IOException("Too many chunks created with splitRatio:"
+ + splitRatio + ", numMaps:" + numMaps
+ + ". Reduce numMaps or decrease split-ratio to proceed.");
+ }
+
+ private static void closeAll(List<DynamicInputChunk> chunks) {
+ for (DynamicInputChunk chunk: chunks)
+ chunk.close();
+ }
+
+ private static List<DynamicInputChunk> createChunks(Configuration config,
+ int chunkCount, int nChunksTotal, int nChunksOpenAtOnce)
+ throws IOException {
+ List<DynamicInputChunk> chunks = new ArrayList<DynamicInputChunk>();
+ int chunkIdUpperBound
+ = Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce);
+
+ // If there will be fewer than nChunksOpenAtOnce chunks left after
+ // the current batch of chunks, fold the remaining chunks into
+ // the current batch.
+ if (nChunksTotal - chunkIdUpperBound < nChunksOpenAtOnce)
+ chunkIdUpperBound = nChunksTotal;
+
+ for (int i=chunkCount; i < chunkIdUpperBound; ++i)
+ chunks.add(createChunk(i, config));
+ return chunks;
+ }
+
+ private static DynamicInputChunk createChunk(int chunkId, Configuration config)
+ throws IOException {
+ return DynamicInputChunk.createChunkForWrite(String.format("%05d", chunkId),
+ config);
+ }
+
+
+ private static Path getListingFilePath(Configuration configuration) {
+ String listingFilePathString = configuration.get(
+ DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+
+ assert !listingFilePathString.equals("") : "Listing file not found.";
+
+ Path listingFilePath = new Path(listingFilePathString);
+ try {
+ assert listingFilePath.getFileSystem(configuration)
+ .exists(listingFilePath) : "Listing file: " + listingFilePath +
+ " not found.";
+ } catch (IOException e) {
+ assert false : "Listing file: " + listingFilePath
+ + " couldn't be accessed. " + e.getMessage();
+ }
+ return listingFilePath;
+ }
+
+ private static int getNumberOfRecords(Configuration configuration) {
+ return DistCpUtils.getInt(configuration,
+ DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS);
+ }
+
+ private static int getNumMapTasks(Configuration configuration) {
+ return DistCpUtils.getInt(configuration,
+ JobContext.NUM_MAPS);
+ }
+
+ private static int getListingSplitRatio(Configuration configuration,
+ int numMaps, int numPaths) {
+ return configuration.getInt(
+ CONF_LABEL_LISTING_SPLIT_RATIO,
+ getSplitRatio(numMaps, numPaths));
+ }
+
+ private static final int MAX_CHUNKS_TOLERABLE = 400;
+ private static final int MAX_CHUNKS_IDEAL = 100;
+ private static final int MIN_RECORDS_PER_CHUNK = 5;
+ private static final int SPLIT_RATIO_DEFAULT = 2;
+
+ /**
+ * Package private, for testability.
+ * @param nMaps The number of maps requested for.
+ * @param nRecords The number of records to be copied.
+ * @return The number of splits each map should handle, ideally.
+ */
+ static int getSplitRatio(int nMaps, int nRecords) {
+ if (nMaps == 1) {
+ LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
+ return 1;
+ }
+
+ if (nMaps > MAX_CHUNKS_IDEAL)
+ return SPLIT_RATIO_DEFAULT;
+
+ int nPickups = (int)Math.ceil((float)MAX_CHUNKS_IDEAL/nMaps);
+ int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
+
+ return nRecordsPerChunk < MIN_RECORDS_PER_CHUNK ?
+ SPLIT_RATIO_DEFAULT : nPickups;
+ }
+
+ static int getNumEntriesPerChunk(Configuration configuration) {
+ return DistCpUtils.getInt(configuration,
+ CONF_LABEL_NUM_ENTRIES_PER_CHUNK);
+ }
+
+
+ /**
+ * Implementation of Inputformat::createRecordReader().
+ * @param inputSplit The split for which the RecordReader is required.
+ * @param taskAttemptContext TaskAttemptContext for the current attempt.
+ * @return DynamicRecordReader instance.
+ * @throws IOException, on failure.
+ * @throws InterruptedException
+ */
+ @Override
+ public RecordReader<K, V> createRecordReader(
+ InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return new DynamicRecordReader<K, V>();
+ }
+}