You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2013/03/28 06:00:10 UTC
svn commit: r1461952 [4/6] - in /hadoop/common/branches/branch-1: ./ bin/
src/docs/src/documentation/content/xdocs/ src/test/
src/test/org/apache/hadoop/tools/distcp2/
src/test/org/apache/hadoop/tools/distcp2/mapred/
src/test/org/apache/hadoop/tools/di...
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.tools.distcp2.CopyListing.DuplicateFileException;
+import org.apache.hadoop.tools.distcp2.CopyListing.InvalidInputException;
+import org.apache.hadoop.tools.distcp2.mapred.CopyMapper;
+import org.apache.hadoop.tools.distcp2.mapred.CopyOutputFormat;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * DistCp is the main driver-class for DistCpV2.
+ * For command-line use, DistCp::main() orchestrates the parsing of command-line
+ * parameters and the launch of the DistCp job.
+ * For programmatic use, a DistCp object can be constructed by specifying
+ * options (in a DistCpOptions object), and DistCp::execute() may be used to
+ * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
+ * behaviour.
+ */
+public class DistCp extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(DistCp.class);
+
+ private DistCpOptions inputOptions;
+ private Path metaFolder;
+
+ private static final String PREFIX = "_distcp";
+ private static final String WIP_PREFIX = "._WIP_";
+ private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
+ public static final Random rand = new Random();
+
+ private boolean submitted;
+ private FileSystem jobFS;
+
+ /**
+ * Public Constructor. Creates DistCp object with specified input-parameters.
+ * (E.g. source-paths, target-location, etc.)
+ * @param inputOptions Options (indicating source-paths, target-location.)
+ * @param configuration The Hadoop configuration against which the Copy-mapper must run.
+ * @throws Exception, on failure.
+ */
+ public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
+ final JobConf config = new JobConf(configuration);
+ config.addResource(DISTCP_DEFAULT_XML);
+ setConf(config);
+ this.inputOptions = inputOptions;
+ this.metaFolder = createMetaFolderPath();
+ }
+
+ /**
+ * To be used with the ToolRunner. Not for public consumption.
+ */
+ private DistCp() {}
+
+ /**
+ * Implementation of Tool::run(). Orchestrates the copy of source file(s)
+ * to target location, by:
+ * 1. Creating a list of files to be copied to target.
+ * 2. Launching a Map-only job to copy the files. (Delegates to execute().)
+ * @param argv List of arguments passed to DistCp, from the ToolRunner.
+ * @return On success, it returns 0. Else, -1.
+ */
+ public int run(String[] argv) {
+ if (argv.length < 1) {
+ OptionsParser.usage();
+ return DistCpConstants.INVALID_ARGUMENT;
+ }
+
+ try {
+ inputOptions = (OptionsParser.parse(argv));
+
+ LOG.info("Input Options: " + inputOptions);
+ } catch (Throwable e) {
+ LOG.error("Invalid arguments: ", e);
+ System.err.println("Invalid arguments: " + e.getMessage());
+ OptionsParser.usage();
+ return DistCpConstants.INVALID_ARGUMENT;
+ }
+
+ try {
+ execute();
+ } catch (InvalidInputException e) {
+ LOG.error("Invalid input: ", e);
+ return DistCpConstants.INVALID_ARGUMENT;
+ } catch (DuplicateFileException e) {
+ LOG.error("Duplicate files in input path: ", e);
+ return DistCpConstants.DUPLICATE_INPUT;
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ return DistCpConstants.UNKNOWN_ERROR;
+ }
+ return DistCpConstants.SUCCESS;
+ }
+
+ /**
+ * Implements the core-execution. Creates the file-list for copy,
+ * and launches the Hadoop-job, to do the copy.
+ * @return Job handle
+ * @throws Exception, on failure.
+ */
+ public Job execute() throws Exception {
+ assert inputOptions != null;
+ assert getConf() != null;
+
+ Job job = null;
+ try {
+ synchronized(this) {
+ metaFolder = createMetaFolderPath();
+ jobFS = metaFolder.getFileSystem(getConf());
+
+ job = createJob();
+ }
+ createInputFileListing(job);
+
+ job.submit();
+ submitted = true;
+ } finally {
+ if (!submitted) {
+ cleanup();
+ }
+ }
+
+ String jobID = job.getJobID().toString();
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+
+ LOG.info("DistCp job-id: " + jobID);
+ if (inputOptions.shouldBlock()) {
+ job.waitForCompletion(true);
+ }
+ return job;
+ }
+
+ /**
+ * Create Job object for submitting it, with all the configuration
+ *
+ * @return Reference to job object.
+ * @throws IOException - Exception if any
+ */
+ private Job createJob() throws IOException {
+ String jobName = "distcp";
+ Job job = Job.getInstance(getConf());
+ String userChosenName = job.getJobName();
+ if (userChosenName != null)
+ jobName += ": " + userChosenName;
+ job.setJobName(jobName);
+ job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
+ job.setJarByClass(CopyMapper.class);
+ configureOutputFormat(job);
+
+ job.setMapperClass(CopyMapper.class);
+ job.setNumReduceTasks(0);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputFormatClass(CopyOutputFormat.class);
+
+ job.setSpeculativeExecution(false);
+ ((JobConf)job.getConfiguration()).setNumMapTasks(inputOptions.getMaxMaps());
+
+ if (inputOptions.getSslConfigurationFile() != null) {
+ setupSSLConfig(job);
+ }
+
+ inputOptions.appendToConf(job.getConfiguration());
+ return job;
+ }
+
+ /**
+ * Setup ssl configuration on the job configuration to enable hsftp access
+ * from map job. Also copy the ssl configuration file to Distributed cache
+ *
+ * @param job - Reference to job's handle
+ * @throws java.io.IOException - Exception if unable to locate ssl config file
+ */
+ private void setupSSLConfig(Job job) throws IOException {
+ Configuration configuration = job.getConfiguration();
+ Path sslConfigPath = new Path(configuration.
+ getResource(inputOptions.getSslConfigurationFile()).toString());
+
+ addSSLFilesToDistCache(job, sslConfigPath);
+ configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName());
+ configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName());
+ }
+
+ /**
+ * Add SSL files to distributed cache. Trust store, key store and ssl config xml
+ *
+ * @param job - Job handle
+ * @param sslConfigPath - ssl Configuration file specified through options
+ * @throws IOException - If any
+ */
+ private void addSSLFilesToDistCache(Job job,
+ Path sslConfigPath) throws IOException {
+ Configuration configuration = job.getConfiguration();
+ FileSystem localFS = FileSystem.getLocal(configuration);
+
+ Configuration sslConf = new Configuration(false);
+ sslConf.addResource(sslConfigPath);
+
+ Path localStorePath = getLocalStorePath(sslConf,
+ DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
+ addCacheFile(job, localStorePath.makeQualified(localFS.getUri(),
+ localFS.getWorkingDirectory()).toUri());
+ configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
+ localStorePath.getName());
+
+ localStorePath = getLocalStorePath(sslConf,
+ DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
+ addCacheFile(job, localStorePath.makeQualified(localFS.getUri(),
+ localFS.getWorkingDirectory()).toUri());
+ configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
+ localStorePath.getName());
+
+ addCacheFile(job, sslConfigPath.makeQualified(localFS.getUri(),
+ localFS.getWorkingDirectory()).toUri());
+
+ }
+
+ private static void addCacheFile(Job job, URI uri) {
+ DistributedCache.addCacheFile(uri, job.getConfiguration());
+ }
+
+ /**
+ * Get Local Trust store/key store path
+ *
+ * @param sslConf - Config from SSL Client xml
+ * @param storeKey - Key for either trust store or key store
+ * @return - Path where the store is present
+ * @throws IOException -If any
+ */
+ private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException {
+ if (sslConf.get(storeKey) != null) {
+ return new Path(sslConf.get(storeKey));
+ } else {
+ throw new IOException("Store for " + storeKey + " is not set in " +
+ inputOptions.getSslConfigurationFile());
+ }
+ }
+
+ /**
+ * Setup output format appropriately
+ *
+ * @param job - Job handle
+ * @throws IOException - Exception if any
+ */
+ private void configureOutputFormat(Job job) throws IOException {
+ final Configuration configuration = job.getConfiguration();
+ Path targetPath = inputOptions.getTargetPath();
+ FileSystem targetFS = targetPath.getFileSystem(configuration);
+ targetPath = targetPath.makeQualified(targetFS.getUri(),
+ targetFS.getWorkingDirectory());
+
+ if (inputOptions.shouldAtomicCommit()) {
+ Path workDir = inputOptions.getAtomicWorkPath();
+ if (workDir == null) {
+ workDir = targetPath.getParent();
+ }
+ workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
+ + rand.nextInt());
+ FileSystem workFS = workDir.getFileSystem(configuration);
+ if (!DistCpUtils.compareFs(targetFS, workFS)) {
+ throw new IllegalArgumentException("Work path " + workDir +
+ " and target path " + targetPath + " are in different file system");
+ }
+ CopyOutputFormat.setWorkingDirectory(job, workDir);
+ } else {
+ CopyOutputFormat.setWorkingDirectory(job, targetPath);
+ }
+ CopyOutputFormat.setCommitDirectory(job, targetPath);
+
+ Path logPath = inputOptions.getLogPath();
+ if (logPath == null) {
+ logPath = new Path(metaFolder, "_logs");
+ } else {
+ LOG.info("DistCp job log path: " + logPath);
+ }
+ CopyOutputFormat.setOutputPath(job, logPath);
+ }
+
+ /**
+ * Create input listing by invoking an appropriate copy listing
+ * implementation. Also add delegation tokens for each path
+ * to job's credential store
+ *
+ * @param job - Handle to job
+ * @return Returns the path where the copy listing is created
+ * @throws IOException - If any
+ */
+ protected Path createInputFileListing(Job job) throws IOException {
+ Path fileListingPath = getFileListingPath();
+ CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
+ job.getCredentials(), inputOptions);
+ copyListing.buildListing(fileListingPath, inputOptions);
+ return fileListingPath;
+ }
+
+ /**
+ * Get default name of the copy listing file. Use the meta folder
+ * to create the copy listing file
+ *
+ * @return - Path where the copy listing file has to be saved
+ * @throws IOException - Exception if any
+ */
+ protected Path getFileListingPath() throws IOException {
+ String fileListPathStr = metaFolder + "/fileList.seq";
+ Path path = new Path(fileListPathStr);
+ return new Path(path.toUri().normalize().toString());
+ }
+
+ /**
+ * Create a default working folder for the job, under the
+ * job staging directory
+ *
+ * @return Returns the working folder information
+ * @throws Exception - EXception if any
+ */
+ private Path createMetaFolderPath() throws Exception {
+ final JobConf configuration = (JobConf)getConf();
+ Path stagingDir = JobSubmissionFiles.getStagingDir(
+ new JobClient(configuration), configuration);
+ Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
+ if (LOG.isDebugEnabled())
+ LOG.debug("Meta folder location: " + metaFolderPath);
+ configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());
+ return metaFolderPath;
+ }
+
+ /**
+ * Main function of the DistCp program. Parses the input arguments (via OptionsParser),
+ * and invokes the DistCp::run() method, via the ToolRunner.
+ * @param argv Command-line arguments sent to DistCp.
+ */
+ public static void main(String argv[]) {
+ try {
+ DistCp distCp = new DistCp();
+ Cleanup CLEANUP = new Cleanup(distCp);
+
+ Runtime.getRuntime().addShutdownHook(CLEANUP);
+ System.exit(ToolRunner.run(getDefaultConf(), distCp, argv));
+ }
+ catch (Exception e) {
+ LOG.error("Couldn't complete DistCp operation: ", e);
+ System.exit(DistCpConstants.UNKNOWN_ERROR);
+ }
+ }
+
+ /**
+ * Loads properties from distcp-default.xml into configuration
+ * object
+ * @return Configuration which includes properties from distcp-default.xml
+ */
+ private static JobConf getDefaultConf() {
+ JobConf config = new JobConf();
+ config.addResource(DISTCP_DEFAULT_XML);
+ return config;
+ }
+
+ private synchronized void cleanup() {
+ try {
+ if (metaFolder == null) return;
+
+ jobFS.delete(metaFolder, true);
+ metaFolder = null;
+ } catch (IOException e) {
+ LOG.error("Unable to cleanup meta folder: " + metaFolder, e);
+ }
+ }
+
+ private boolean isSubmitted() {
+ return submitted;
+ }
+
+ private static class Cleanup extends Thread {
+ private final DistCp distCp;
+
+ public Cleanup(DistCp distCp) {
+ this.distCp = distCp;
+ }
+
+ @Override
+ public void run() {
+ if (distCp.isSubmitted()) return;
+
+ distCp.cleanup();
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,107 @@
+package org.apache.hadoop.tools.distcp2;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility class to hold commonly used constants.
+ */
+public class DistCpConstants {
+
+ /* Default number of maps to use for DistCp */
+ public static final int DEFAULT_MAPS = 20;
+
+ /* Default bandwidth if none specified */
+ public static final int DEFAULT_BANDWIDTH_MB = 100;
+
+ /* Default strategy for copying. Implementation looked up
+ from distcp-default.xml
+ */
+ public static final String UNIFORMSIZE = "uniformsize";
+
+ /**
+ * Constants mapping to command line switches/input options
+ */
+ public static final String CONF_LABEL_ATOMIC_COPY = "distcp.atomic.copy";
+ public static final String CONF_LABEL_WORK_PATH = "distcp.work.path";
+ public static final String CONF_LABEL_LOG_PATH = "distcp.log.path";
+ public static final String CONF_LABEL_IGNORE_FAILURES = "distcp.ignore.failures";
+ public static final String CONF_LABEL_PRESERVE_STATUS = "distcp.preserve.status";
+ public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
+ public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
+ public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
+ public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
+ public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
+ public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
+ public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
+ public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
+ public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
+
+ /* Total bytes to be copied. Updated by copylisting. Unfiltered count */
+ public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";
+
+ /* Total number of paths to copy, includes directories. Unfiltered count */
+ public static final String CONF_LABEL_TOTAL_NUMBER_OF_RECORDS = "mapred.number.of.records";
+
+ /* SSL keystore resource */
+ public static final String CONF_LABEL_SSL_KEYSTORE = "dfs.https.client.keystore.resource";
+
+ /* If input is based -f <<source listing>>, file containing the src paths */
+ public static final String CONF_LABEL_LISTING_FILE_PATH = "distcp.listing.file.path";
+
+ /* Directory where the mapreduce job will write to. If not atomic commit, then same
+ as CONF_LABEL_TARGET_FINAL_PATH
+ */
+ public static final String CONF_LABEL_TARGET_WORK_PATH = "distcp.target.work.path";
+
+ /* Directory where the final data will be committed to. If not atomic commit, then same
+ as CONF_LABEL_TARGET_WORK_PATH
+ */
+ public static final String CONF_LABEL_TARGET_FINAL_PATH = "distcp.target.final.path";
+
+ /**
+ * DistCp job id for consumers of the Disctp
+ */
+ public static final String CONF_LABEL_DISTCP_JOB_ID = "distcp.job.id";
+
+ /* Meta folder where the job's intermediate data is kept */
+ public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder";
+
+ /* DistCp CopyListing class override param */
+ public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
+
+ /**
+ * Conf label for SSL Trust-store location.
+ */
+ public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION
+ = "ssl.client.truststore.location";
+
+ /**
+ * Conf label for SSL Key-store location.
+ */
+ public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION
+ = "ssl.client.keystore.location";
+
+ /**
+ * Constants for DistCp return code to shell / consumer of ToolRunner's run
+ */
+ public static final int SUCCESS = 0;
+ public static final int INVALID_ARGUMENT = -1;
+ public static final int DUPLICATE_INPUT = -2;
+ public static final int UNKNOWN_ERROR = -999;
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.cli.Option;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Enumeration mapping configuration keys to distcp command line
+ * options.
+ */
+public enum DistCpOptionSwitch {
+
+ /**
+ * Ignores any failures during copy, and continues with rest.
+ * Logs failures in a file
+ */
+ IGNORE_FAILURES(DistCpConstants.CONF_LABEL_IGNORE_FAILURES,
+ new Option("i", false, "Ignore failures during copy")),
+
+ /**
+ * Preserves status of file/path in the target.
+ * Default behavior with -p, is to preserve replication,
+ * block size, user, group and permission on the target file
+ *
+ * If any of the optional switches are present among rbugp, then
+ * only the corresponding file attribute is preserved
+ *
+ */
+ PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+ new Option("p", true, "preserve status (rbugp)" +
+ "(replication, block-size, user, group, permission)")),
+
+ /**
+ * Update target location by copying only files that are missing
+ * in the target. This can be used to periodically sync two folders
+ * across source and target. Typically used with DELETE_MISSING
+ * Incompatible with ATOMIC_COMMIT
+ */
+ SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS,
+ new Option("update", false, "Update target, copying only missing" +
+ "files or directories")),
+
+ /**
+ * Deletes missing files in target that are missing from source
+ * This allows the target to be in sync with the source contents
+ * Typically used in conjunction with SYNC_FOLDERS
+ * Incompatible with ATOMIC_COMMIT
+ */
+ DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
+ new Option("delete", false, "Delete from target, " +
+ "files missing in source")),
+
+ /**
+ * Configuration file to use with hftps:// for securely copying
+ * files across clusters. Typically the configuration file contains
+ * truststore/keystore information such as location, password and type
+ */
+ SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF,
+ new Option("mapredSslConf", true, "Configuration for ssl config file" +
+ ", to use with hftps://")),
+
+ /**
+ * Max number of maps to use during copy. DistCp will split work
+ * as equally as possible among these maps
+ */
+ MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS,
+ new Option("m", true, "Max number of concurrent maps to use for copy")),
+
+ /**
+ * Source file listing can be provided to DistCp in a file.
+ * This allows DistCp to copy random list of files from source
+ * and copy them to target
+ */
+ SOURCE_FILE_LISTING(DistCpConstants.CONF_LABEL_SOURCE_LISTING,
+ new Option("f", true, "List of files that need to be copied")),
+
+ /**
+ * Copy all the source files and commit them atomically to the target
+ * This is typically useful in cases where there is a process
+ * polling for availability of a file/dir. This option is incompatible
+ * with SYNC_FOLDERS & DELETE_MISSING
+ */
+ ATOMIC_COMMIT(DistCpConstants.CONF_LABEL_ATOMIC_COPY,
+ new Option("atomic", false, "Commit all changes or none")),
+
+ /**
+ * Work path to be used only in conjunction in Atomic commit
+ */
+ WORK_PATH(DistCpConstants.CONF_LABEL_WORK_PATH,
+ new Option("tmp", true, "Intermediate work path to be used for atomic commit")),
+
+ /**
+ * Log path where distcp output logs are written to
+ */
+ LOG_PATH(DistCpConstants.CONF_LABEL_LOG_PATH,
+ new Option("log", true, "Folder on DFS where distcp execution logs are saved")),
+
+ /**
+ * Copy strategy is use. This could be dynamic or uniform size etc.
+ * DistCp would use an appropriate input format based on this.
+ */
+ COPY_STRATEGY(DistCpConstants.CONF_LABEL_COPY_STRATEGY,
+ new Option("strategy", true, "Copy strategy to use. Default is " +
+ "dividing work based on file sizes")),
+
+ /**
+ * Skip CRC checks between source and target, when determining what
+ * files need to be copied.
+ */
+ SKIP_CRC(DistCpConstants.CONF_LABEL_SKIP_CRC,
+ new Option("skipcrccheck", false, "Whether to skip CRC checks between " +
+ "source and target paths.")),
+
+ /**
+ * Overwrite target-files unconditionally.
+ */
+ OVERWRITE(DistCpConstants.CONF_LABEL_OVERWRITE,
+ new Option("overwrite", false, "Choose to overwrite target files " +
+ "unconditionally, even if they exist.")),
+
+ /**
+ * Should DisctpExecution be blocking
+ */
+ BLOCKING("",
+ new Option("async", false, "Should distcp execution be blocking")),
+
+ FILE_LIMIT("",
+ new Option("filelimit", true, "(Deprecated!) Limit number of files " +
+ "copied to <= n")),
+
+ SIZE_LIMIT("",
+ new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
+ "copied to <= n bytes")),
+
+ /**
+ * Specify bandwidth per map in MB
+ */
+ BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+ new Option("bandwidth", true, "Specify bandwidth per map in MB"));
+
+ private final String confLabel;
+ private final Option option;
+
+ DistCpOptionSwitch(String confLabel, Option option) {
+ this.confLabel = confLabel;
+ this.option = option;
+ }
+
+ /**
+ * Get Configuration label for the option
+ * @return configuration label name
+ */
+ public String getConfigLabel() {
+ return confLabel;
+ }
+
+ /**
+ * Get CLI Option corresponding to the distcp option
+ * @return option
+ */
+ public Option getOption() {
+ return option;
+ }
+
+ /**
+ * Get Switch symbol
+ * @return switch symbol char
+ */
+ public String getSwitch() {
+ return option.getOpt();
+ }
+
+ @Override
+ public String toString() {
+ return super.name() + " {" +
+ "confLabel='" + confLabel + '\'' +
+ ", option=" + option + '}';
+ }
+
+ /**
+ * Helper function to add an option to hadoop configuration object
+ * @param conf - Configuration object to include the option
+ * @param option - Option to add
+ * @param value - Value
+ */
+ public static void addToConf(Configuration conf,
+ DistCpOptionSwitch option,
+ String value) {
+ conf.set(option.getConfigLabel(), value);
+ }
+
+ /**
+ * Helper function to set an option to hadoop configuration object
+ * @param conf - Configuration object to include the option
+ * @param option - Option to add
+ */
+ public static void addToConf(Configuration conf,
+ DistCpOptionSwitch option) {
+ conf.set(option.getConfigLabel(), "true");
+ }
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * The Options class encapsulates all DistCp options.
+ * These may be set from command-line (via the OptionsParser)
+ * or may be set manually.
+ */
+public class DistCpOptions {
+
+ private boolean atomicCommit = false;
+ private boolean syncFolder = false;
+ private boolean deleteMissing = false;
+ private boolean ignoreFailures = false;
+ private boolean overwrite = false;
+ private boolean skipCRC = false;
+ private boolean blocking = true;
+
+ private int maxMaps = DistCpConstants.DEFAULT_MAPS;
+ private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
+
+ private String sslConfigurationFile;
+
+ private String copyStrategy = DistCpConstants.UNIFORMSIZE;
+
+ private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class);
+
+ private Path atomicWorkPath;
+
+ private Path logPath;
+
+ private Path sourceFileListing;
+ private List<Path> sourcePaths;
+
+ private Path targetPath;
+
+ public static enum FileAttribute{
+ REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION;
+
+ public static FileAttribute getAttribute(char symbol) {
+ for (FileAttribute attribute : values()) {
+ if (attribute.name().charAt(0) == Character.toUpperCase(symbol)) {
+ return attribute;
+ }
+ }
+ throw new NoSuchElementException("No attribute for " + symbol);
+ }
+ }
+
+ /**
+ * Constructor, to initialize source/target paths.
+ * @param sourcePaths List of source-paths (including wildcards)
+ * to be copied to target.
+ * @param targetPath Destination path for the dist-copy.
+ */
+ public DistCpOptions(List<Path> sourcePaths, Path targetPath) {
+ assert sourcePaths != null && !sourcePaths.isEmpty() : "Invalid source paths";
+ assert targetPath != null : "Invalid Target path";
+
+ this.sourcePaths = sourcePaths;
+ this.targetPath = targetPath;
+ }
+
+ /**
+ * Constructor, to initialize source/target paths.
+ * @param sourceFileListing File containing list of source paths
+ * @param targetPath Destination path for the dist-copy.
+ */
+ public DistCpOptions(Path sourceFileListing, Path targetPath) {
+ assert sourceFileListing != null : "Invalid source paths";
+ assert targetPath != null : "Invalid Target path";
+
+ this.sourceFileListing = sourceFileListing;
+ this.targetPath = targetPath;
+ }
+
+ /**
+ * Copy constructor.
+ * @param that DistCpOptions being copied from.
+ */
+ public DistCpOptions(DistCpOptions that) {
+ if (this != that && that != null) {
+ this.atomicCommit = that.atomicCommit;
+ this.syncFolder = that.syncFolder;
+ this.deleteMissing = that.deleteMissing;
+ this.ignoreFailures = that.ignoreFailures;
+ this.overwrite = that.overwrite;
+ this.skipCRC = that.skipCRC;
+ this.blocking = that.blocking;
+ this.maxMaps = that.maxMaps;
+ this.mapBandwidth = that.mapBandwidth;
+ this.sslConfigurationFile = that.getSslConfigurationFile();
+ this.copyStrategy = that.copyStrategy;
+ this.preserveStatus = that.preserveStatus;
+ this.atomicWorkPath = that.getAtomicWorkPath();
+ this.logPath = that.getLogPath();
+ this.sourceFileListing = that.getSourceFileListing();
+ this.sourcePaths = that.getSourcePaths();
+ this.targetPath = that.getTargetPath();
+ }
+ }
+
+ /**
+ * Should the data be committed atomically?
+ *
+ * @return true if data should be committed automically. false otherwise
+ */
+ public boolean shouldAtomicCommit() {
+ return atomicCommit;
+ }
+
+ /**
+ * Set if data need to be committed automatically
+ *
+ * @param atomicCommit - boolean switch
+ */
+ public void setAtomicCommit(boolean atomicCommit) {
+ validate(DistCpOptionSwitch.ATOMIC_COMMIT, atomicCommit);
+ this.atomicCommit = atomicCommit;
+ }
+
+ /**
+ * Should the data be sync'ed between source and target paths?
+ *
+ * @return true if data should be sync'ed up. false otherwise
+ */
+ public boolean shouldSyncFolder() {
+ return syncFolder;
+ }
+
+ /**
+ * Set if source and target folder contents be sync'ed up
+ *
+ * @param syncFolder - boolean switch
+ */
+ public void setSyncFolder(boolean syncFolder) {
+ validate(DistCpOptionSwitch.SYNC_FOLDERS, syncFolder);
+ this.syncFolder = syncFolder;
+ }
+
+ /**
+ * Should target files missing in source should be deleted?
+ *
+ * @return true if zoombie target files to be removed. false otherwise
+ */
+ public boolean shouldDeleteMissing() {
+ return deleteMissing;
+ }
+
+ /**
+ * Set if files only present in target should be deleted
+ *
+ * @param deleteMissing - boolean switch
+ */
+ public void setDeleteMissing(boolean deleteMissing) {
+ validate(DistCpOptionSwitch.DELETE_MISSING, deleteMissing);
+ this.deleteMissing = deleteMissing;
+ }
+
+ /**
+ * Should failures be logged and ignored during copy?
+ *
+ * @return true if failures are to be logged and ignored. false otherwise
+ */
+ public boolean shouldIgnoreFailures() {
+ return ignoreFailures;
+ }
+
+ /**
+ * Set if failures during copy be ignored
+ *
+ * @param ignoreFailures - boolean switch
+ */
+ public void setIgnoreFailures(boolean ignoreFailures) {
+ this.ignoreFailures = ignoreFailures;
+ }
+
+ /**
+ * Should DistCp be running in blocking mode
+ *
+ * @return true if should run in blocking, false otherwise
+ */
+ public boolean shouldBlock() {
+ return blocking;
+ }
+
+ /**
+ * Set if Disctp should run blocking or non-blocking
+ *
+ * @param blocking - boolean switch
+ */
+ public void setBlocking(boolean blocking) {
+ this.blocking = blocking;
+ }
+
+ /**
+ * Should files be overwritten always?
+ *
+ * @return true if files in target that may exist before distcp, should always
+ * be overwritten. false otherwise
+ */
+ public boolean shouldOverwrite() {
+ return overwrite;
+ }
+
+ /**
+ * Set if files should always be overwritten on target
+ *
+ * @param overwrite - boolean switch
+ */
+ public void setOverwrite(boolean overwrite) {
+ validate(DistCpOptionSwitch.OVERWRITE, overwrite);
+ this.overwrite = overwrite;
+ }
+
+ /**
+ * Should CRC/checksum check be skipped while checking files are identical
+ *
+ * @return true if checksum check should be skipped while checking files are
+ * identical. false otherwise
+ */
+ public boolean shouldSkipCRC() {
+ return skipCRC;
+ }
+
+ /**
+ * Set if checksum comparison should be skipped while determining if
+ * source and destination files are identical
+ *
+ * @param skipCRC - boolean switch
+ */
+ public void setSkipCRC(boolean skipCRC) {
+ validate(DistCpOptionSwitch.SKIP_CRC, skipCRC);
+ this.skipCRC = skipCRC;
+ }
+
+ /** Get the max number of maps to use for this copy
+ *
+ * @return Max number of maps
+ */
+ public int getMaxMaps() {
+ return maxMaps;
+ }
+
+ /**
+ * Set the max number of maps to use for copy
+ *
+ * @param maxMaps - Number of maps
+ */
+ public void setMaxMaps(int maxMaps) {
+ this.maxMaps = Math.max(maxMaps, 1);
+ }
+
+ /** Get the map bandwidth in MB
+ *
+ * @return Bandwidth in MB
+ */
+ public int getMapBandwidth() {
+ return mapBandwidth;
+ }
+
+ /**
+ * Set per map bandwidth
+ *
+ * @param mapBandwidth - per map bandwidth
+ */
+ public void setMapBandwidth(int mapBandwidth) {
+ assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
+ this.mapBandwidth = mapBandwidth;
+ }
+
+ /**
+ * Get path where the ssl configuration file is present to use for hftps://
+ *
+ * @return Path on local file system
+ */
+ public String getSslConfigurationFile() {
+ return sslConfigurationFile;
+ }
+
+ /**
+ * Set the SSL configuration file path to use with hftps:// (local path)
+ *
+ * @param sslConfigurationFile - Local ssl config file path
+ */
+ public void setSslConfigurationFile(String sslConfigurationFile) {
+ this.sslConfigurationFile = sslConfigurationFile;
+ }
+
+ /**
+ * Returns an iterator with the list of file attributes to preserve
+ *
+ * @return iterator of file attributes to preserve
+ */
+ public Iterator<FileAttribute> preserveAttributes() {
+ return preserveStatus.iterator();
+ }
+
+ /**
+ * Checks if the input attibute should be preserved or not
+ *
+ * @param attribute - Attribute to check
+ * @return True if attribute should be preserved, false otherwise
+ */
+ public boolean shouldPreserve(FileAttribute attribute) {
+ return preserveStatus.contains(attribute);
+ }
+
+ /**
+ * Add file attributes that need to be preserved. This method may be
+ * called multiple times to add attributes.
+ *
+ * @param fileAttribute - Attribute to add, one at a time
+ */
+ public void preserve(FileAttribute fileAttribute) {
+ for (FileAttribute attribute : preserveStatus) {
+ if (attribute.equals(fileAttribute)) {
+ return;
+ }
+ }
+ preserveStatus.add(fileAttribute);
+ }
+
+ /** Get work path for atomic commit. If null, the work
+ * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath)
+ *
+ * @return Atomic work path on the target cluster. Null if not set
+ */
+ public Path getAtomicWorkPath() {
+ return atomicWorkPath;
+ }
+
+ /**
+ * Set the work path for atomic commit
+ *
+ * @param atomicWorkPath - Path on the target cluster
+ */
+ public void setAtomicWorkPath(Path atomicWorkPath) {
+ this.atomicWorkPath = atomicWorkPath;
+ }
+
+ /** Get output directory for writing distcp logs. Otherwise logs
+ * are temporarily written to JobStagingDir/_logs and deleted
+ * upon job completion
+ *
+ * @return Log output path on the cluster where distcp job is run
+ */
+ public Path getLogPath() {
+ return logPath;
+ }
+
+ /**
+ * Set the log path where distcp output logs are stored
+ * Uses JobStagingDir/_logs by default
+ *
+ * @param logPath - Path where logs will be saved
+ */
+ public void setLogPath(Path logPath) {
+ this.logPath = logPath;
+ }
+
+ /**
+ * Get the copy strategy to use. Uses appropriate input format
+ *
+ * @return copy strategy to use
+ */
+ public String getCopyStrategy() {
+ return copyStrategy;
+ }
+
+ /**
+ * Set the copy strategy to use. Should map to a strategy implementation
+ * in distp-default.xml
+ *
+ * @param copyStrategy - copy Strategy to use
+ */
+ public void setCopyStrategy(String copyStrategy) {
+ this.copyStrategy = copyStrategy;
+ }
+
+ /**
+ * File path (hdfs:// or file://) that contains the list of actual
+ * files to copy
+ *
+ * @return - Source listing file path
+ */
+ public Path getSourceFileListing() {
+ return sourceFileListing;
+ }
+
+ /**
+ * Getter for sourcePaths.
+ * @return List of source-paths.
+ */
+ public List<Path> getSourcePaths() {
+ return sourcePaths;
+ }
+
+ /**
+ * Setter for sourcePaths.
+ * @param sourcePaths The new list of source-paths.
+ */
+ public void setSourcePaths(List<Path> sourcePaths) {
+ assert sourcePaths != null && sourcePaths.size() != 0;
+ this.sourcePaths = sourcePaths;
+ }
+
+ /**
+ * Getter for the targetPath.
+ * @return The target-path.
+ */
+ public Path getTargetPath() {
+ return targetPath;
+ }
+
+ public void validate(DistCpOptionSwitch option, boolean value) {
+
+ boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
+ value : this.syncFolder);
+ boolean overwrite = (option == DistCpOptionSwitch.OVERWRITE ?
+ value : this.overwrite);
+ boolean deleteMissing = (option == DistCpOptionSwitch.DELETE_MISSING ?
+ value : this.deleteMissing);
+ boolean atomicCommit = (option == DistCpOptionSwitch.ATOMIC_COMMIT ?
+ value : this.atomicCommit);
+ boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
+ value : this.skipCRC);
+
+ if (syncFolder && atomicCommit) {
+ throw new IllegalArgumentException("Atomic commit can't be used with " +
+ "sync folder or overwrite options");
+ }
+
+ if (deleteMissing && !(overwrite || syncFolder)) {
+ throw new IllegalArgumentException("Delete missing is applicable " +
+ "only with update or overwrite options");
+ }
+
+ if (overwrite && syncFolder) {
+ throw new IllegalArgumentException("Overwrite and update options are " +
+ "mutually exclusive");
+ }
+
+ if (!syncFolder && skipCRC) {
+ throw new IllegalArgumentException("Skip CRC is valid only with update options");
+ }
+
+ }
+
+ /**
+ * Add options to configuration. These will be used in the Mapper/committer
+ *
+ * @param conf - Configruation object to which the options need to be added
+ */
+ public void appendToConf(Configuration conf) {
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
+ String.valueOf(atomicCommit));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
+ String.valueOf(ignoreFailures));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
+ String.valueOf(syncFolder));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
+ String.valueOf(deleteMissing));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
+ String.valueOf(overwrite));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
+ String.valueOf(skipCRC));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
+ String.valueOf(mapBandwidth));
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
+ DistCpUtils.packAttributes(preserveStatus));
+ }
+
+ /**
+ * Utility to easily string-ify Options, for logging.
+ *
+ * @return String representation of the Options.
+ */
+ @Override
+ public String toString() {
+ return "DistCpOptions{" +
+ "atomicCommit=" + atomicCommit +
+ ", syncFolder=" + syncFolder +
+ ", deleteMissing=" + deleteMissing +
+ ", ignoreFailures=" + ignoreFailures +
+ ", maxMaps=" + maxMaps +
+ ", sslConfigurationFile='" + sslConfigurationFile + '\'' +
+ ", copyStrategy='" + copyStrategy + '\'' +
+ ", sourceFileListing=" + sourceFileListing +
+ ", sourcePaths=" + sourcePaths +
+ ", targetPath=" + targetPath +
+ '}';
+ }
+
+ @Override
+ protected DistCpOptions clone() throws CloneNotSupportedException {
+ return (DistCpOptions) super.clone();
+ }
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * FileBasedCopyListing implements the CopyListing interface,
+ * to create the copy-listing for DistCp,
+ * by iterating over all source paths mentioned in a specified input-file.
+ */
+public class FileBasedCopyListing extends CopyListing {
+
+ private final CopyListing globbedListing;
+ /**
+ * Constructor, to initialize base-class.
+ * @param configuration The input Configuration object.
+ * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+ * delegation token caching is skipped
+ */
+ public FileBasedCopyListing(Configuration configuration, Credentials credentials) {
+ super(configuration, credentials);
+ globbedListing = new GlobbedCopyListing(getConf(), credentials);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void validatePaths(DistCpOptions options)
+ throws IOException, InvalidInputException {
+ }
+
+ /**
+ * Implementation of CopyListing::buildListing().
+ * Iterates over all source paths mentioned in the input-file.
+ * @param pathToListFile Path on HDFS where the listing file is written.
+ * @param options Input Options for DistCp (indicating source/target paths.)
+ * @throws IOException
+ */
+ @Override
+ public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException {
+ DistCpOptions newOption = new DistCpOptions(options);
+ newOption.setSourcePaths(fetchFileList(options.getSourceFileListing()));
+ globbedListing.buildListing(pathToListFile, newOption);
+ }
+
+ private List<Path> fetchFileList(Path sourceListing) throws IOException {
+ List<Path> result = new ArrayList<Path>();
+ FileSystem fs = sourceListing.getFileSystem(getConf());
+ BufferedReader input = null;
+ try {
+ input = new BufferedReader(new InputStreamReader(fs.open(sourceListing)));
+ String line = input.readLine();
+ while (line != null) {
+ result.add(new Path(line));
+ line = input.readLine();
+ }
+ } finally {
+ IOUtils.closeStream(input);
+ }
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getBytesToCopy() {
+ return globbedListing.getBytesToCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getNumberOfPaths() {
+ return globbedListing.getNumberOfPaths();
+ }
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * GlobbedCopyListing implements the CopyListing interface, to create the copy
+ * listing-file by "globbing" all specified source paths (wild-cards and all.)
+ */
+public class GlobbedCopyListing extends CopyListing {
+ private static final Log LOG = LogFactory.getLog(GlobbedCopyListing.class);
+
+ private final CopyListing simpleListing;
+ /**
+ * Constructor, to initialize the configuration.
+ * @param configuration The input Configuration object.
+ * @param credentials Credentials object on which the FS delegation tokens are cached. If null
+ * delegation token caching is skipped
+ */
+ public GlobbedCopyListing(Configuration configuration, Credentials credentials) {
+ super(configuration, credentials);
+ simpleListing = new SimpleCopyListing(getConf(), credentials) ;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void validatePaths(DistCpOptions options)
+ throws IOException, InvalidInputException {
+ }
+
+ /**
+ * Implementation of CopyListing::buildListing().
+ * Creates the copy listing by "globbing" all source-paths.
+ * @param pathToListingFile The location at which the copy-listing file
+ * is to be created.
+ * @param options Input Options for DistCp (indicating source/target paths.)
+ * @throws IOException
+ */
+ @Override
+ public void doBuildListing(Path pathToListingFile,
+ DistCpOptions options) throws IOException {
+
+ List<Path> globbedPaths = new ArrayList<Path>();
+ if (options.getSourcePaths().isEmpty()) {
+ throw new InvalidInputException("Nothing to process. Source paths::EMPTY");
+ }
+
+ for (Path p : options.getSourcePaths()) {
+ FileSystem fs = p.getFileSystem(getConf());
+ FileStatus[] inputs = fs.globStatus(p);
+
+ if(inputs != null && inputs.length > 0) {
+ for (FileStatus onePath: inputs) {
+ globbedPaths.add(onePath.getPath());
+ }
+ } else {
+ throw new InvalidInputException(p + " doesn't exist");
+ }
+ }
+
+ DistCpOptions optionsGlobbed = new DistCpOptions(options);
+ optionsGlobbed.setSourcePaths(globbedPaths);
+ simpleListing.buildListing(pathToListingFile, optionsGlobbed);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getBytesToCopy() {
+ return simpleListing.getBytesToCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getNumberOfPaths() {
+ return simpleListing.getNumberOfPaths();
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.cli.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+
+import java.util.*;
+
+/**
+ * The OptionsParser parses out the command-line options passed to DistCp,
+ * and interprets those specific to DistCp, to create an Options object.
+ */
+public class OptionsParser {
+
+ private static final Log LOG = LogFactory.getLog(OptionsParser.class);
+
+ private static final Options cliOptions = new Options();
+
+ static {
+ for (DistCpOptionSwitch option : DistCpOptionSwitch.values()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding option " + option.getOption());
+ }
+ cliOptions.addOption(option.getOption());
+ }
+ }
+
+ private static class CustomParser extends GnuParser {
+ @Override
+ protected String[] flatten(Options options, String[] arguments, boolean stopAtNonOption) {
+ for (int index = 0; index < arguments.length; index++) {
+ if (arguments[index].equals("-" + DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+ arguments[index] = "-prbugp";
+ }
+ }
+ return super.flatten(options, arguments, stopAtNonOption);
+ }
+ }
+
+ /**
+ * The parse method parses the command-line options, and creates
+ * a corresponding Options object.
+ * @param args Command-line arguments (excluding the options consumed
+ * by the GenericOptionsParser).
+ * @return The Options object, corresponding to the specified command-line.
+ * @throws IllegalArgumentException: Thrown if the parse fails.
+ */
+ public static DistCpOptions parse(String args[]) throws IllegalArgumentException {
+
+ CommandLineParser parser = new CustomParser();
+
+ CommandLine command;
+ try {
+ command = parser.parse(cliOptions, args, true);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Unable to parse arguments. " +
+ Arrays.toString(args), e);
+ }
+
+ DistCpOptions option;
+ Path targetPath;
+ List<Path> sourcePaths = new ArrayList<Path>();
+
+ String leftOverArgs[] = command.getArgs();
+ if (leftOverArgs == null || leftOverArgs.length < 1) {
+ throw new IllegalArgumentException("Target path not specified");
+ }
+
+ //Last Argument is the target path
+ targetPath = new Path(leftOverArgs[leftOverArgs.length -1].trim());
+
+ //Copy any source paths in the arguments to the list
+ for (int index = 0; index < leftOverArgs.length - 1; index++) {
+ sourcePaths.add(new Path(leftOverArgs[index].trim()));
+ }
+
+ /* If command has source file listing, use it else, fall back on source paths in args
+ If both are present, throw exception and bail */
+ if (command.hasOption(DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) {
+ if (!sourcePaths.isEmpty()) {
+ throw new IllegalArgumentException("Both source file listing and source paths present");
+ }
+ option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
+ SOURCE_FILE_LISTING.getSwitch())), targetPath);
+ } else {
+ if (sourcePaths.isEmpty()) {
+ throw new IllegalArgumentException("Neither source file listing nor source paths present");
+ }
+ option = new DistCpOptions(sourcePaths, targetPath);
+ }
+
+ //Process all the other option switches and set options appropriately
+ if (command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) {
+ option.setIgnoreFailures(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch())) {
+ option.setAtomicCommit(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch()) &&
+ option.shouldAtomicCommit()) {
+ String workPath = getVal(command, DistCpOptionSwitch.WORK_PATH.getSwitch());
+ if (workPath != null && !workPath.isEmpty()) {
+ option.setAtomicWorkPath(new Path(workPath));
+ }
+ } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
+ throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic");
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
+ option.setLogPath(new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch())));
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch())) {
+ option.setSyncFolder(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch())) {
+ option.setOverwrite(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
+ option.setDeleteMissing(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch())) {
+ option.setSkipCRC(true);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) {
+ option.setBlocking(false);
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
+ try {
+ Integer mapBandwidth = Integer.parseInt(
+ getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
+ if (mapBandwidth.intValue() <= 0) {
+ throw new IllegalArgumentException("Bandwidth specified is not positive: " +
+ mapBandwidth);
+ }
+ option.setMapBandwidth(mapBandwidth);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Bandwidth specified is invalid: " +
+ getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
+ }
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) {
+ option.setSslConfigurationFile(command.
+ getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
+ try {
+ Integer maps = Integer.parseInt(
+ getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
+ option.setMaxMaps(maps);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Number of maps is invalid: " +
+ getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
+ }
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
+ option.setCopyStrategy(
+ getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+ String attributes =
+ getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch());
+ if (attributes == null || attributes.isEmpty()) {
+ for (FileAttribute attribute : FileAttribute.values()) {
+ option.preserve(attribute);
+ }
+ } else {
+ for (int index = 0; index < attributes.length(); index++) {
+ option.preserve(FileAttribute.
+ getAttribute(attributes.charAt(index)));
+ }
+ }
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
+ String fileLimitString = getVal(command,
+ DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
+ try {
+ Integer.parseInt(fileLimitString);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("File-limit is invalid: "
+ + fileLimitString, e);
+ }
+ LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
+ " option. Ignoring.");
+ }
+
+ if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
+ String sizeLimitString = getVal(command,
+ DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
+ try {
+ Long.parseLong(sizeLimitString);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Size-limit is invalid: "
+ + sizeLimitString, e);
+ }
+ LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
+ " option. Ignoring.");
+ }
+
+ return option;
+ }
+
+ private static String getVal(CommandLine command, String swtch) {
+ String optionValue = command.getOptionValue(swtch);
+ if (optionValue == null) {
+ return null;
+ } else {
+ return optionValue.trim();
+ }
+ }
+
+ public static void usage() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("distcp OPTIONS [source_path...] <target_path>\n\nOPTIONS", cliOptions);
+ }
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.*;
+import java.util.Stack;
+
+/**
+ * The SimpleCopyListing is responsible for making the exhaustive list of
+ * all files/directories under its specified list of input-paths.
+ * These are written into the specified copy-listing file.
+ * Note: The SimpleCopyListing doesn't handle wild-cards in the input-paths.
+ */
+public class SimpleCopyListing extends CopyListing {
+ private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
+
+ private long totalPaths = 0;
+ private long totalBytesToCopy = 0;
+
+ /**
+ * Protected constructor, to initialize configuration.
+ *
+ * @param configuration The input configuration, with which the source/target FileSystems may be accessed.
+ * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+ * delegation token caching is skipped
+ */
+ protected SimpleCopyListing(Configuration configuration, Credentials credentials) {
+ super(configuration, credentials);
+ }
+
+ @Override
+ protected void validatePaths(DistCpOptions options)
+ throws IOException, InvalidInputException {
+
+ Path targetPath = options.getTargetPath();
+ FileSystem targetFS = targetPath.getFileSystem(getConf());
+ boolean targetIsFile = targetFS.isFile(targetPath);
+
+ //If target is a file, then source has to be single file
+ if (targetIsFile) {
+ if (options.getSourcePaths().size() > 1) {
+ throw new InvalidInputException("Multiple source being copied to a file: " +
+ targetPath);
+ }
+
+ Path srcPath = options.getSourcePaths().get(0);
+ FileSystem sourceFS = srcPath.getFileSystem(getConf());
+ if (!sourceFS.isFile(srcPath)) {
+ throw new InvalidInputException("Cannot copy " + srcPath +
+ ", which is not a file to " + targetPath);
+ }
+ }
+
+ if (options.shouldAtomicCommit() && targetFS.exists(targetPath)) {
+ throw new InvalidInputException("Target path for atomic-commit already exists: " +
+ targetPath + ". Cannot atomic-commit to pre-existing target-path.");
+ }
+
+ for (Path path: options.getSourcePaths()) {
+ FileSystem fs = path.getFileSystem(getConf());
+ if (!fs.exists(path)) {
+ throw new InvalidInputException(path + " doesn't exist");
+ }
+ }
+
+ /* This is requires to allow map tasks to access each of the source
+ clusters. This would retrieve the delegation token for each unique
+ file system and add them to job's private credential store
+ */
+ Credentials credentials = getCredentials();
+ if (credentials != null) {
+ Path[] inputPaths = options.getSourcePaths().toArray(new Path[1]);
+ TokenCache.obtainTokensForNamenodes(credentials, inputPaths, getConf());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
+
+ SequenceFile.Writer fileListWriter = null;
+
+ try {
+ fileListWriter = getWriter(pathToListingFile);
+
+ for (Path path: options.getSourcePaths()) {
+ FileSystem sourceFS = path.getFileSystem(getConf());
+ path = makeQualified(path);
+
+ FileStatus rootStatus = sourceFS.getFileStatus(path);
+ Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
+ boolean localFile = (rootStatus.getClass() != FileStatus.class);
+
+ FileStatus[] sourceFiles = sourceFS.listStatus(path);
+ if (sourceFiles != null && sourceFiles.length > 0) {
+ for (FileStatus sourceStatus: sourceFiles) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
+ }
+ writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot,
+ localFile, options);
+
+ if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath());
+ }
+ traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot,
+ localFile, options);
+ }
+ }
+ } else {
+ writeToFileListing(fileListWriter, rootStatus, sourcePathRoot,
+ localFile, options);
+ }
+ }
+ } finally {
+ IOUtils.closeStream(fileListWriter);
+ }
+ }
+
+ private Path computeSourceRootPath(FileStatus sourceStatus,
+ DistCpOptions options) throws IOException {
+
+ Path target = options.getTargetPath();
+ FileSystem targetFS = target.getFileSystem(getConf());
+
+ boolean solitaryFile = options.getSourcePaths().size() == 1
+ && !sourceStatus.isDir();
+
+ if (solitaryFile) {
+ if (targetFS.isFile(target) || !targetFS.exists(target)) {
+ return sourceStatus.getPath();
+ } else {
+ return sourceStatus.getPath().getParent();
+ }
+ } else {
+ boolean specialHandling = (options.getSourcePaths().size() == 1 && !targetFS.exists(target)) ||
+ options.shouldSyncFolder() || options.shouldOverwrite();
+
+ return specialHandling && sourceStatus.isDir() ? sourceStatus.getPath() :
+ sourceStatus.getPath().getParent();
+ }
+ }
+
+ /**
+ * Provide an option to skip copy of a path, Allows for exclusion
+ * of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME}
+ * @param path - Path being considered for copy while building the file listing
+ * @param options - Input options passed during DistCp invocation
+ * @return - True if the path should be considered for copy, false otherwise
+ */
+ protected boolean shouldCopy(Path path, DistCpOptions options) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getBytesToCopy() {
+ return totalBytesToCopy;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long getNumberOfPaths() {
+ return totalPaths;
+ }
+
+ private Path makeQualified(Path path) throws IOException {
+ final FileSystem fs = path.getFileSystem(getConf());
+ return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ }
+
+ private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
+ FileSystem fs = pathToListFile.getFileSystem(getConf());
+ if (fs.exists(pathToListFile)) {
+ fs.delete(pathToListFile, false);
+ }
+ return SequenceFile.createWriter(fs, getConf(), pathToListFile,
+ Text.class, FileStatus.class, SequenceFile.CompressionType.NONE);
+ }
+
+ private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem,
+ FileStatus fileStatus) throws IOException {
+ return fileStatus.isDir() && getChildren(fileSystem, fileStatus).length > 0;
+ }
+
+ private static FileStatus[] getChildren(FileSystem fileSystem,
+ FileStatus parent) throws IOException {
+ return fileSystem.listStatus(parent.getPath());
+ }
+
+ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
+ FileStatus sourceStatus,
+ Path sourcePathRoot,
+ boolean localFile,
+ DistCpOptions options)
+ throws IOException {
+ FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
+ Stack<FileStatus> pathStack = new Stack<FileStatus>();
+ pathStack.push(sourceStatus);
+
+ while (!pathStack.isEmpty()) {
+ for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Recording source-path: "
+ + sourceStatus.getPath() + " for copy.");
+ writeToFileListing(fileListWriter, child, sourcePathRoot,
+ localFile, options);
+ if (isDirectoryAndNotEmpty(sourceFS, child)) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Traversing non-empty source dir: "
+ + sourceStatus.getPath());
+ pathStack.push(child);
+ }
+ }
+ }
+ }
+
+ private void writeToFileListing(SequenceFile.Writer fileListWriter,
+ FileStatus fileStatus,
+ Path sourcePathRoot,
+ boolean localFile,
+ DistCpOptions options) throws IOException {
+ if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDir())
+ return; // Skip the root-paths.
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
+ fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
+ }
+
+ FileStatus status = fileStatus;
+ if (localFile) {
+ status = getFileStatus(fileStatus);
+ }
+
+ if (!shouldCopy(fileStatus.getPath(), options)) {
+ return;
+ }
+
+ fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
+ fileStatus.getPath())), status);
+ fileListWriter.sync();
+
+ if (!fileStatus.isDir()) {
+ totalBytesToCopy += fileStatus.getLen();
+ }
+ totalPaths++;
+ }
+
+ private static final ByteArrayOutputStream buffer = new ByteArrayOutputStream(64);
+ private DataInputBuffer in = new DataInputBuffer();
+
+ private FileStatus getFileStatus(FileStatus fileStatus) throws IOException {
+ FileStatus status = new FileStatus();
+
+ buffer.reset();
+ DataOutputStream out = new DataOutputStream(buffer);
+ fileStatus.write(out);
+
+ in.reset(buffer.toByteArray(), 0, buffer.size());
+ status.readFields(in);
+ return status;
+ }
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.tools.distcp2.CopyListing;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.distcp2.GlobbedCopyListing;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+
+/**
+ * The CopyCommitter class is DistCp's OutputCommitter implementation. It is
+ * responsible for handling the completion/cleanup of the DistCp run.
+ * Specifically, it does the following:
+ * 1. Cleanup of the meta-folder (where DistCp maintains its file-list, etc.)
+ * 2. Preservation of user/group/replication-factor on any directories that
+ * have been copied. (Files are taken care of in their map-tasks.)
+ * 3. Atomic-move of data from the temporary work-folder to the final path
+ * (if atomic-commit was opted for).
+ * 4. Deletion of files from the target that are missing at source (if opted for).
+ * 5. Cleanup of any partially copied files, from previous, failed attempts.
+ */
+public class CopyCommitter extends FileOutputCommitter {
+ private static final Log LOG = LogFactory.getLog(CopyCommitter.class);
+
+ private final TaskAttemptContext taskAttemptContext;
+
+ /**
+ * Create a output committer
+ *
+ * @param outputPath the job's output path
+ * @param context the task's context
+ * @throws IOException - Exception if any
+ */
+ public CopyCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
+ super(outputPath, context);
+ this.taskAttemptContext = context;
+ }
+
+ /** @inheritDoc */
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ super.commitJob(jobContext);
+
+ cleanupTempFiles(jobContext);
+
+ String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+ if (attributes != null && !attributes.isEmpty()) {
+ preserveFileAttributesForDirectories(conf);
+ }
+
+ try {
+ if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
+ deleteMissing(conf);
+ } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
+ commitData(conf);
+ }
+ taskAttemptContext.setStatus("Commit Successful");
+ }
+ finally {
+ cleanup(conf);
+ }
+ }
+
+ /** @inheritDoc */
+ @Override
+ public void abortJob(JobContext jobContext,
+ JobStatus.State state) throws IOException {
+ try {
+ super.abortJob(jobContext, state);
+ } finally {
+ cleanupTempFiles(jobContext);
+ cleanup(jobContext.getConfiguration());
+ }
+ }
+
+ private void cleanupTempFiles(JobContext context) {
+ try {
+ Configuration conf = context.getConfiguration();
+
+ Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+ FileSystem targetFS = targetWorkPath.getFileSystem(conf);
+
+ String jobId = context.getJobID().toString();
+ deleteAttemptTempFiles(targetWorkPath, targetFS, jobId);
+ deleteAttemptTempFiles(targetWorkPath.getParent(), targetFS, jobId);
+ } catch (Throwable t) {
+ LOG.warn("Unable to cleanup temp files", t);
+ }
+ }
+
+ private void deleteAttemptTempFiles(Path targetWorkPath,
+ FileSystem targetFS,
+ String jobId) throws IOException {
+
+ FileStatus[] tempFiles = targetFS.globStatus(
+ new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*"));
+
+ if (tempFiles != null && tempFiles.length > 0) {
+ for (FileStatus file : tempFiles) {
+ LOG.info("Cleaning up " + file.getPath());
+ targetFS.delete(file.getPath(), false);
+ }
+ }
+ }
+
+ /**
+ * Cleanup meta folder and other temporary files
+ *
+ * @param conf - Job Configuration
+ */
+ private void cleanup(Configuration conf) {
+ Path metaFolder = new Path(conf.get(DistCpConstants.CONF_LABEL_META_FOLDER));
+ try {
+ FileSystem fs = metaFolder.getFileSystem(conf);
+ LOG.info("Cleaning up temporary work folder: " + metaFolder);
+ fs.delete(metaFolder, true);
+ } catch (IOException ignore) {
+ LOG.error("Exception encountered ", ignore);
+ }
+ }
+
+ // This method changes the target-directories' file-attributes (owner,
+ // user/group permissions, etc.) based on the corresponding source directories.
+ private void preserveFileAttributesForDirectories(Configuration conf) throws IOException {
+ String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+ LOG.info("About to preserve attributes: " + attrSymbols);
+
+ EnumSet<FileAttribute> attributes = DistCpUtils.unpackAttributes(attrSymbols);
+
+ Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+ FileSystem clusterFS = sourceListing.getFileSystem(conf);
+ SequenceFile.Reader sourceReader = new SequenceFile.Reader(
+ clusterFS, sourceListing, conf);
+ long totalLen = clusterFS.getFileStatus(sourceListing).getLen();
+
+ Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+ long preservedEntries = 0;
+ try {
+ FileStatus srcFileStatus = new FileStatus();
+ Text srcRelPath = new Text();
+
+ // Iterate over every source path that was copied.
+ while (sourceReader.next(srcRelPath, srcFileStatus)) {
+ // File-attributes for files are set at the time of copy,
+ // in the map-task.
+ if (! srcFileStatus.isDir()) continue;
+
+ Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
+
+ // Skip the root folder.
+ // Status can't be preserved on root-folder. (E.g. multiple paths may
+ // be copied to a single target folder. Which source-attributes to use
+ // on the target is undefined.)
+ if (targetRoot.equals(targetFile)) continue;
+
+ FileSystem targetFS = targetFile.getFileSystem(conf);
+ DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes);
+
+ taskAttemptContext.progress();
+ taskAttemptContext.setStatus("Preserving status on directory entries. [" +
+ sourceReader.getPosition() * 100 / totalLen + "%]");
+ }
+ } finally {
+ IOUtils.closeStream(sourceReader);
+ }
+ LOG.info("Preserved status on " + preservedEntries + " dir entries on target");
+ }
+
+ // This method deletes "extra" files from the target, if they're not
+ // available at the source.
+ private void deleteMissing(Configuration conf) throws IOException {
+ LOG.info("-delete option is enabled. About to remove entries from " +
+ "target that are missing in source");
+
+ // Sort the source-file listing alphabetically.
+ Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+ FileSystem clusterFS = sourceListing.getFileSystem(conf);
+ Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing);
+
+ // Similarly, create the listing of target-files. Sort alphabetically.
+ Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
+ CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
+
+ List<Path> targets = new ArrayList<Path>(1);
+ Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+ targets.add(targetFinalPath);
+ DistCpOptions options = new DistCpOptions(targets, new Path("/NONE"));
+
+ target.buildListing(targetListing, options);
+ Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
+ long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();
+
+ SequenceFile.Reader sourceReader = new SequenceFile.Reader(
+ clusterFS, sortedSourceListing, conf);
+ SequenceFile.Reader targetReader = new SequenceFile.Reader(
+ clusterFS, sortedTargetListing, conf);
+
+ // Walk both source and target file listings.
+ // Delete all from target that doesn't also exist on source.
+ long deletedEntries = 0;
+ try {
+ FileStatus srcFileStatus = new FileStatus();
+ Text srcRelPath = new Text();
+ FileStatus trgtFileStatus = new FileStatus();
+ Text trgtRelPath = new Text();
+
+ FileSystem targetFS = targetFinalPath.getFileSystem(conf);
+ boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+ while (targetReader.next(trgtRelPath, trgtFileStatus)) {
+ // Skip sources that don't exist on target.
+ while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) {
+ srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+ }
+
+ if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue;
+
+ // Target doesn't exist at source. Delete.
+ boolean result = (!targetFS.exists(trgtFileStatus.getPath()) ||
+ targetFS.delete(trgtFileStatus.getPath(), true));
+ if (result) {
+ LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source");
+ deletedEntries++;
+ } else {
+ throw new IOException("Unable to delete " + trgtFileStatus.getPath());
+ }
+ taskAttemptContext.progress();
+ taskAttemptContext.setStatus("Deleting missing files from target. [" +
+ targetReader.getPosition() * 100 / totalLen + "%]");
+ }
+ } finally {
+ IOUtils.closeStream(sourceReader);
+ IOUtils.closeStream(targetReader);
+ }
+ LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0));
+ }
+
+ private void commitData(Configuration conf) throws IOException {
+
+ Path workDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+ Path finalDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+ FileSystem targetFS = workDir.getFileSystem(conf);
+
+ LOG.info("Atomic commit enabled. Moving " + workDir + " to " + finalDir);
+ if (targetFS.exists(finalDir) && targetFS.exists(workDir)) {
+ LOG.error("Pre-existing final-path found at: " + finalDir);
+ throw new IOException("Target-path can't be committed to because it " +
+ "exists at " + finalDir + ". Copied data is in temp-dir: " + workDir + ". ");
+ }
+
+ boolean result = targetFS.rename(workDir, finalDir);
+ if (!result) {
+ LOG.warn("Rename failed. Perhaps data already moved. Verifying...");
+ result = targetFS.exists(finalDir) && !targetFS.exists(workDir);
+ }
+ if (result) {
+ LOG.info("Data committed successfully to " + finalDir);
+ taskAttemptContext.setStatus("Data committed successfully to " + finalDir);
+ } else {
+ LOG.error("Unable to commit data to " + finalDir);
+ throw new IOException("Atomic commit failed. Temporary data in " + workDir +
+ ", Unable to move to " + finalDir);
+ }
+ }
+}