You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2013/03/28 06:00:10 UTC

svn commit: r1461952 [4/6] - in /hadoop/common/branches/branch-1: ./ bin/ src/docs/src/documentation/content/xdocs/ src/test/ src/test/org/apache/hadoop/tools/distcp2/ src/test/org/apache/hadoop/tools/distcp2/mapred/ src/test/org/apache/hadoop/tools/di...

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.tools.distcp2.CopyListing.DuplicateFileException;
+import org.apache.hadoop.tools.distcp2.CopyListing.InvalidInputException;
+import org.apache.hadoop.tools.distcp2.mapred.CopyMapper;
+import org.apache.hadoop.tools.distcp2.mapred.CopyOutputFormat;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * DistCp is the main driver-class for DistCpV2.
+ * For command-line use, DistCp::main() orchestrates the parsing of command-line
+ * parameters and the launch of the DistCp job.
+ * For programmatic use, a DistCp object can be constructed by specifying
+ * options (in a DistCpOptions object), and DistCp::execute() may be used to
+ * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
+ * behaviour.
+ */
+public class DistCp extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(DistCp.class);
+
+  private DistCpOptions inputOptions;
+  private Path metaFolder;
+
+  private static final String PREFIX = "_distcp";
+  private static final String WIP_PREFIX = "._WIP_";
+  private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
+  public static final Random rand = new Random();
+
+  private boolean submitted;
+  private FileSystem jobFS;
+
+  /**
+   * Public Constructor. Creates DistCp object with specified input-parameters.
+   * (E.g. source-paths, target-location, etc.)
+   * @param inputOptions Options (indicating source-paths, target-location.)
+   * @param configuration The Hadoop configuration against which the Copy-mapper must run.
+   * @throws Exception, on failure.
+   */
+  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
+    final JobConf config = new JobConf(configuration);
+    config.addResource(DISTCP_DEFAULT_XML);
+    setConf(config);
+    this.inputOptions = inputOptions;
+    this.metaFolder   = createMetaFolderPath();
+  }
+
+  /**
+   * To be used with the ToolRunner. Not for public consumption.
+   */
+  private DistCp() {}
+
+  /**
+   * Implementation of Tool::run(). Orchestrates the copy of source file(s)
+   * to target location, by:
+   *  1. Creating a list of files to be copied to target.
+   *  2. Launching a Map-only job to copy the files. (Delegates to execute().)
+   * @param argv List of arguments passed to DistCp, from the ToolRunner.
+   * @return On success, it returns 0. Else, -1.
+   */
+  public int run(String[] argv) {
+    if (argv.length < 1) {
+      OptionsParser.usage();
+      return DistCpConstants.INVALID_ARGUMENT;
+    }
+    
+    try {
+      inputOptions = (OptionsParser.parse(argv));
+
+      LOG.info("Input Options: " + inputOptions);
+    } catch (Throwable e) {
+      LOG.error("Invalid arguments: ", e);
+      System.err.println("Invalid arguments: " + e.getMessage());
+      OptionsParser.usage();      
+      return DistCpConstants.INVALID_ARGUMENT;
+    }
+    
+    try {
+      execute();
+    } catch (InvalidInputException e) {
+      LOG.error("Invalid input: ", e);
+      return DistCpConstants.INVALID_ARGUMENT;
+    } catch (DuplicateFileException e) {
+      LOG.error("Duplicate files in input path: ", e);
+      return DistCpConstants.DUPLICATE_INPUT;
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      return DistCpConstants.UNKNOWN_ERROR;
+    }
+    return DistCpConstants.SUCCESS;
+  }
+
+  /**
+   * Implements the core-execution. Creates the file-list for copy,
+   * and launches the Hadoop-job, to do the copy.
+   * @return Job handle
+   * @throws Exception, on failure.
+   */
+  public Job execute() throws Exception {
+    assert inputOptions != null;
+    assert getConf() != null;
+
+    Job job = null;
+    try {
+      synchronized(this) {
+        metaFolder = createMetaFolderPath();
+        jobFS = metaFolder.getFileSystem(getConf());
+  
+        job = createJob();
+      }
+      createInputFileListing(job);
+
+      job.submit();
+      submitted = true;
+    } finally {
+      if (!submitted) {
+        cleanup();
+      }
+    }
+
+    String jobID = job.getJobID().toString();
+    job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+    
+    LOG.info("DistCp job-id: " + jobID);
+    if (inputOptions.shouldBlock()) {
+      job.waitForCompletion(true);
+    }
+    return job;
+  }
+
+  /**
+   * Create Job object for submitting it, with all the configuration
+   *
+   * @return Reference to job object.
+   * @throws IOException - Exception if any
+   */
+  private Job createJob() throws IOException {
+    String jobName = "distcp";
+    Job job = Job.getInstance(getConf());
+    String userChosenName = job.getJobName();
+    if (userChosenName != null)
+      jobName += ": " + userChosenName;
+    job.setJobName(jobName);
+    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
+    job.setJarByClass(CopyMapper.class);
+    configureOutputFormat(job);
+
+    job.setMapperClass(CopyMapper.class);
+    job.setNumReduceTasks(0);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputFormatClass(CopyOutputFormat.class);
+
+    job.setSpeculativeExecution(false);
+    ((JobConf)job.getConfiguration()).setNumMapTasks(inputOptions.getMaxMaps());
+
+    if (inputOptions.getSslConfigurationFile() != null) {
+      setupSSLConfig(job);
+    }
+
+    inputOptions.appendToConf(job.getConfiguration());
+    return job;
+  }
+
+  /**
+   * Setup ssl configuration on the job configuration to enable hsftp access
+   * from map job. Also copy the ssl configuration file to Distributed cache
+   *
+   * @param job - Reference to job's handle
+   * @throws java.io.IOException - Exception if unable to locate ssl config file
+   */
+  private void setupSSLConfig(Job job) throws IOException  {
+    Configuration configuration = job.getConfiguration();
+    Path sslConfigPath = new Path(configuration.
+        getResource(inputOptions.getSslConfigurationFile()).toString());
+
+    addSSLFilesToDistCache(job, sslConfigPath);
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName());
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName());
+  }
+
+  /**
+   * Add SSL files to distributed cache. Trust store, key store and ssl config xml
+   *
+   * @param job - Job handle
+   * @param sslConfigPath - ssl Configuration file specified through options
+   * @throws IOException - If any
+   */
+  private void addSSLFilesToDistCache(Job job,
+                                      Path sslConfigPath) throws IOException {
+    Configuration configuration = job.getConfiguration();
+    FileSystem localFS = FileSystem.getLocal(configuration);
+
+    Configuration sslConf = new Configuration(false);
+    sslConf.addResource(sslConfigPath);
+
+    Path localStorePath = getLocalStorePath(sslConf,
+                            DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
+    addCacheFile(job, localStorePath.makeQualified(localFS.getUri(),
+                                      localFS.getWorkingDirectory()).toUri());
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
+                      localStorePath.getName());
+
+    localStorePath = getLocalStorePath(sslConf,
+                             DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
+    addCacheFile(job, localStorePath.makeQualified(localFS.getUri(),
+                                      localFS.getWorkingDirectory()).toUri());
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
+                                      localStorePath.getName());
+
+    addCacheFile(job, sslConfigPath.makeQualified(localFS.getUri(),
+                                      localFS.getWorkingDirectory()).toUri());
+
+  }
+
+  private static void addCacheFile(Job job, URI uri) {
+    DistributedCache.addCacheFile(uri, job.getConfiguration());
+  }
+
+  /**
+   * Get Local Trust store/key store path
+   *
+   * @param sslConf - Config from SSL Client xml
+   * @param storeKey - Key for either trust store or key store
+   * @return - Path where the store is present
+   * @throws IOException -If any
+   */
+  private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException {
+    if (sslConf.get(storeKey) != null) {
+      return new Path(sslConf.get(storeKey));
+    } else {
+      throw new IOException("Store for " + storeKey + " is not set in " +
+          inputOptions.getSslConfigurationFile());
+    }
+  }
+
+  /**
+   * Setup output format appropriately
+   *
+   * @param job - Job handle
+   * @throws IOException - Exception if any
+   */
+  private void configureOutputFormat(Job job) throws IOException {
+    final Configuration configuration = job.getConfiguration();
+    Path targetPath = inputOptions.getTargetPath();
+    FileSystem targetFS = targetPath.getFileSystem(configuration);
+    targetPath = targetPath.makeQualified(targetFS.getUri(),
+                                          targetFS.getWorkingDirectory());
+
+    if (inputOptions.shouldAtomicCommit()) {
+      Path workDir = inputOptions.getAtomicWorkPath();
+      if (workDir == null) {
+        workDir = targetPath.getParent();
+      }
+      workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
+                                + rand.nextInt());
+      FileSystem workFS = workDir.getFileSystem(configuration);
+      if (!DistCpUtils.compareFs(targetFS, workFS)) {
+        throw new IllegalArgumentException("Work path " + workDir +
+            " and target path " + targetPath + " are in different file system");
+      }
+      CopyOutputFormat.setWorkingDirectory(job, workDir);
+    } else {
+      CopyOutputFormat.setWorkingDirectory(job, targetPath);
+    }
+    CopyOutputFormat.setCommitDirectory(job, targetPath);
+
+    Path logPath = inputOptions.getLogPath();
+    if (logPath == null) {
+      logPath = new Path(metaFolder, "_logs");
+    } else {
+      LOG.info("DistCp job log path: " + logPath);
+    }
+    CopyOutputFormat.setOutputPath(job, logPath);
+  }
+
+  /**
+   * Create input listing by invoking an appropriate copy listing
+   * implementation. Also add delegation tokens for each path
+   * to job's credential store
+   *
+   * @param job - Handle to job
+   * @return Returns the path where the copy listing is created
+   * @throws IOException - If any
+   */
+  protected Path createInputFileListing(Job job) throws IOException {
+    Path fileListingPath = getFileListingPath();
+    CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
+        job.getCredentials(), inputOptions);
+    copyListing.buildListing(fileListingPath, inputOptions);
+    return fileListingPath;
+  }
+
+  /**
+   * Get default name of the copy listing file. Use the meta folder
+   * to create the copy listing file
+   *
+   * @return - Path where the copy listing file has to be saved
+   * @throws IOException - Exception if any
+   */
+  protected Path getFileListingPath() throws IOException {
+    String fileListPathStr = metaFolder + "/fileList.seq";
+    Path path = new Path(fileListPathStr);
+    return new Path(path.toUri().normalize().toString());
+  }
+
+  /**
+   * Create a default working folder for the job, under the
+   * job staging directory
+   *
+   * @return Returns the working folder information
+   * @throws Exception - EXception if any
+   */
+  private Path createMetaFolderPath() throws Exception {
+    final JobConf configuration = (JobConf)getConf();
+    Path stagingDir = JobSubmissionFiles.getStagingDir(
+        new JobClient(configuration), configuration);
+    Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
+    if (LOG.isDebugEnabled())
+      LOG.debug("Meta folder location: " + metaFolderPath);
+    configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
+    return metaFolderPath;
+  }
+
+  /**
+   * Main function of the DistCp program. Parses the input arguments (via OptionsParser),
+   * and invokes the DistCp::run() method, via the ToolRunner.
+   * @param argv Command-line arguments sent to DistCp.
+   */
+  public static void main(String argv[]) {
+    try {
+      DistCp distCp = new DistCp();
+      Cleanup CLEANUP = new Cleanup(distCp);
+
+      Runtime.getRuntime().addShutdownHook(CLEANUP);
+      System.exit(ToolRunner.run(getDefaultConf(), distCp, argv));
+    }
+    catch (Exception e) {
+      LOG.error("Couldn't complete DistCp operation: ", e);
+      System.exit(DistCpConstants.UNKNOWN_ERROR);
+    }
+  }
+
+  /**
+   * Loads properties from distcp-default.xml into configuration
+   * object
+   * @return Configuration which includes properties from distcp-default.xml
+   */
+  private static JobConf getDefaultConf() {
+    JobConf config = new JobConf();
+    config.addResource(DISTCP_DEFAULT_XML);
+    return config;
+  }
+
+  private synchronized void cleanup() {
+    try {
+      if (metaFolder == null) return;
+
+      jobFS.delete(metaFolder, true);
+      metaFolder = null;
+    } catch (IOException e) {
+      LOG.error("Unable to cleanup meta folder: " + metaFolder, e);
+    }
+  }
+
+  private boolean isSubmitted() {
+    return submitted;
+  }
+
+  private static class Cleanup extends Thread {
+    private final DistCp distCp;
+
+    public Cleanup(DistCp distCp) {
+      this.distCp = distCp;
+    }
+
+    @Override
+    public void run() {
+      if (distCp.isSubmitted()) return;
+
+      distCp.cleanup();
+    }
+  }
+}

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,107 @@
+package org.apache.hadoop.tools.distcp2;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility class to hold commonly used constants.
+ */
+public class DistCpConstants {
+
+  /* Default number of maps to use for DistCp */
+  public static final int DEFAULT_MAPS = 20;
+
+  /* Default bandwidth if none specified */
+  public static final int DEFAULT_BANDWIDTH_MB = 100;
+
+  /* Default strategy for copying. Implementation looked up
+     from distcp-default.xml
+   */
+  public static final String UNIFORMSIZE = "uniformsize";
+
+  /**
+   *  Constants mapping to command line switches/input options
+   */
+  public static final String CONF_LABEL_ATOMIC_COPY = "distcp.atomic.copy";
+  public static final String CONF_LABEL_WORK_PATH = "distcp.work.path";
+  public static final String CONF_LABEL_LOG_PATH = "distcp.log.path";
+  public static final String CONF_LABEL_IGNORE_FAILURES = "distcp.ignore.failures";
+  public static final String CONF_LABEL_PRESERVE_STATUS = "distcp.preserve.status";
+  public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
+  public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
+  public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
+  public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
+  public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
+  public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
+  public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
+  public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
+  public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
+
+  /* Total bytes to be copied. Updated by copylisting. Unfiltered count */
+  public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";
+
+  /* Total number of paths to copy, includes directories. Unfiltered count */
+  public static final String CONF_LABEL_TOTAL_NUMBER_OF_RECORDS = "mapred.number.of.records";
+
+  /* SSL keystore resource */
+  public static final String CONF_LABEL_SSL_KEYSTORE = "dfs.https.client.keystore.resource";
+
+  /* If input is based -f <<source listing>>, file containing the src paths */
+  public static final String CONF_LABEL_LISTING_FILE_PATH = "distcp.listing.file.path";
+
+  /* Directory where the mapreduce job will write to. If not atomic commit, then same
+    as CONF_LABEL_TARGET_FINAL_PATH
+   */
+  public static final String CONF_LABEL_TARGET_WORK_PATH = "distcp.target.work.path";
+
+  /* Directory where the final data will be committed to. If not atomic commit, then same
+    as CONF_LABEL_TARGET_WORK_PATH
+   */
+  public static final String CONF_LABEL_TARGET_FINAL_PATH = "distcp.target.final.path";
+
+  /**
+   * DistCp job id for consumers of the Disctp 
+   */
+  public static final String CONF_LABEL_DISTCP_JOB_ID = "distcp.job.id";
+
+  /* Meta folder where the job's intermediate data is kept */
+  public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder";
+
+  /* DistCp CopyListing class override param */
+  public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
+
+  /**
+   * Conf label for SSL Trust-store location.
+   */
+  public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION
+      = "ssl.client.truststore.location";
+
+  /**
+   * Conf label for SSL Key-store location.
+   */
+  public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION
+      = "ssl.client.keystore.location";
+
+  /**
+   * Constants for DistCp return code to shell / consumer of ToolRunner's run
+   */
+  public static final int SUCCESS = 0;
+  public static final int INVALID_ARGUMENT = -1;
+  public static final int DUPLICATE_INPUT = -2;
+  public static final int UNKNOWN_ERROR = -999;
+}

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.cli.Option;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Enumeration mapping configuration keys to distcp command line
+ * options.
+ */
+public enum DistCpOptionSwitch {
+
+  /**
+   * Ignores any failures during copy, and continues with rest.
+   * Logs failures in a file
+   */
+  IGNORE_FAILURES(DistCpConstants.CONF_LABEL_IGNORE_FAILURES,
+      new Option("i", false, "Ignore failures during copy")),
+
+  /**
+   * Preserves status of file/path in the target.
+   * Default behavior with -p, is to preserve replication,
+   * block size, user, group and permission on the target file
+   *
+   * If any of the optional switches are present among rbugp, then
+   * only the corresponding file attribute is preserved
+   *
+   */
+  PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+      new Option("p", true, "preserve status (rbugp)" +
+          "(replication, block-size, user, group, permission)")),
+
+  /**
+   * Update target location by copying only files that are missing
+   * in the target. This can be used to periodically sync two folders
+   * across source and target. Typically used with DELETE_MISSING
+   * Incompatible with ATOMIC_COMMIT
+   */
+  SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, 
+      new Option("update", false, "Update target, copying only missing" +
+          "files or directories")),
+
+  /**
+   * Deletes missing files in target that are missing from source
+   * This allows the target to be in sync with the source contents
+   * Typically used in conjunction with SYNC_FOLDERS
+   * Incompatible with ATOMIC_COMMIT
+   */
+  DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
+      new Option("delete", false, "Delete from target, " +
+          "files missing in source")),
+
+  /**
+   * Configuration file to use with hftps:// for securely copying
+   * files across clusters. Typically the configuration file contains
+   * truststore/keystore information such as location, password and type
+   */
+  SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF,
+      new Option("mapredSslConf", true, "Configuration for ssl config file" +
+          ", to use with hftps://")),
+
+  /**
+   * Max number of maps to use during copy. DistCp will split work
+   * as equally as possible among these maps
+   */
+  MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS, 
+      new Option("m", true, "Max number of concurrent maps to use for copy")),
+
+  /**
+   * Source file listing can be provided to DistCp in a file.
+   * This allows DistCp to copy random list of files from source
+   * and copy them to target
+   */
+  SOURCE_FILE_LISTING(DistCpConstants.CONF_LABEL_SOURCE_LISTING,
+      new Option("f", true, "List of files that need to be copied")),
+
+  /**
+   * Copy all the source files and commit them atomically to the target
+   * This is typically useful in cases where there is a process
+   * polling for availability of a file/dir. This option is incompatible
+   * with SYNC_FOLDERS & DELETE_MISSING
+   */
+  ATOMIC_COMMIT(DistCpConstants.CONF_LABEL_ATOMIC_COPY,
+      new Option("atomic", false, "Commit all changes or none")),
+
+  /**
+   * Work path to be used only in conjunction in Atomic commit
+   */
+  WORK_PATH(DistCpConstants.CONF_LABEL_WORK_PATH,
+      new Option("tmp", true, "Intermediate work path to be used for atomic commit")),
+
+  /**
+   * Log path where distcp output logs are written to
+   */
+  LOG_PATH(DistCpConstants.CONF_LABEL_LOG_PATH,
+      new Option("log", true, "Folder on DFS where distcp execution logs are saved")),
+
+  /**
+   * Copy strategy is use. This could be dynamic or uniform size etc.
+   * DistCp would use an appropriate input format based on this.
+   */
+  COPY_STRATEGY(DistCpConstants.CONF_LABEL_COPY_STRATEGY,
+      new Option("strategy", true, "Copy strategy to use. Default is " +
+          "dividing work based on file sizes")),
+
+  /**
+   * Skip CRC checks between source and target, when determining what
+   * files need to be copied.
+   */
+  SKIP_CRC(DistCpConstants.CONF_LABEL_SKIP_CRC,
+      new Option("skipcrccheck", false, "Whether to skip CRC checks between " +
+          "source and target paths.")),
+
+  /**
+   * Overwrite target-files unconditionally.
+   */
+  OVERWRITE(DistCpConstants.CONF_LABEL_OVERWRITE,
+      new Option("overwrite", false, "Choose to overwrite target files " +
+          "unconditionally, even if they exist.")),
+
+  /**
+   * Should DisctpExecution be blocking
+   */
+  BLOCKING("",
+      new Option("async", false, "Should distcp execution be blocking")),
+
+  FILE_LIMIT("",
+      new Option("filelimit", true, "(Deprecated!) Limit number of files " +
+              "copied to <= n")),
+
+  SIZE_LIMIT("",
+      new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
+              "copied to <= n bytes")),
+
+  /**
+   * Specify bandwidth per map in MB
+   */
+  BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+      new Option("bandwidth", true, "Specify bandwidth per map in MB"));
+
+  private final String confLabel;
+  private final Option option;
+
+  DistCpOptionSwitch(String confLabel, Option option) {
+    this.confLabel = confLabel;
+    this.option = option;
+  }
+
+  /**
+   * Get Configuration label for the option
+   * @return configuration label name
+   */
+  public String getConfigLabel() {
+    return confLabel;
+  }
+
+  /**
+   * Get CLI Option corresponding to the distcp option
+   * @return option
+   */
+  public Option getOption() {
+    return option;
+  }
+
+  /**
+   * Get Switch symbol
+   * @return switch symbol char
+   */
+  public String getSwitch() {
+    return option.getOpt();
+  }
+
+  @Override
+  public String toString() {
+    return  super.name() + " {" +
+        "confLabel='" + confLabel + '\'' +
+        ", option=" + option + '}';
+  }
+
+  /**
+   * Helper function to add an option to hadoop configuration object
+   * @param conf - Configuration object to include the option
+   * @param option - Option to add
+   * @param value - Value
+   */
+  public static void addToConf(Configuration conf,
+                               DistCpOptionSwitch option,
+                               String value) {
+    conf.set(option.getConfigLabel(), value);
+  }
+
+  /**
+   * Helper function to set an option to hadoop configuration object
+   * @param conf - Configuration object to include the option
+   * @param option - Option to add
+   */
+  public static void addToConf(Configuration conf,
+                               DistCpOptionSwitch option) {
+    conf.set(option.getConfigLabel(), "true");
+  }
+}

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * The Options class encapsulates all DistCp options.
+ * These may be set from command-line (via the OptionsParser)
+ * or may be set manually.
+ */
+public class DistCpOptions {
+
+  private boolean atomicCommit = false;
+  private boolean syncFolder = false;
+  private boolean deleteMissing = false;
+  private boolean ignoreFailures = false;
+  private boolean overwrite = false;
+  private boolean skipCRC = false;
+  private boolean blocking = true;
+
+  private int maxMaps = DistCpConstants.DEFAULT_MAPS;
+  private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
+
+  private String sslConfigurationFile;
+
+  private String copyStrategy = DistCpConstants.UNIFORMSIZE;
+
+  private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class);
+
+  private Path atomicWorkPath;
+
+  private Path logPath;
+
+  private Path sourceFileListing;
+  private List<Path> sourcePaths;
+
+  private Path targetPath;
+
+  public static enum FileAttribute{
+    REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION;
+
+    public static FileAttribute getAttribute(char symbol) {
+      for (FileAttribute attribute : values()) {
+        if (attribute.name().charAt(0) == Character.toUpperCase(symbol)) {
+          return attribute;
+        }
+      }
+      throw new NoSuchElementException("No attribute for " + symbol);
+    }
+  }
+
+  /**
+   * Constructor, to initialize source/target paths.
+   * @param sourcePaths List of source-paths (including wildcards)
+   *                     to be copied to target.
+   * @param targetPath Destination path for the dist-copy.
+   */
+  public DistCpOptions(List<Path> sourcePaths, Path targetPath) {
+    assert sourcePaths != null && !sourcePaths.isEmpty() : "Invalid source paths";
+    assert targetPath != null : "Invalid Target path";
+
+    this.sourcePaths = sourcePaths;
+    this.targetPath = targetPath;
+  }
+
+  /**
+   * Constructor, to initialize source/target paths.
+   * @param sourceFileListing File containing list of source paths
+   * @param targetPath Destination path for the dist-copy.
+   */
+  public DistCpOptions(Path sourceFileListing, Path targetPath) {
+    assert sourceFileListing != null : "Invalid source paths";
+    assert targetPath != null : "Invalid Target path";
+
+    this.sourceFileListing = sourceFileListing;
+    this.targetPath = targetPath;
+  }
+
+  /**
+   * Copy constructor.
+   * @param that DistCpOptions being copied from.
+   */
+  public DistCpOptions(DistCpOptions that) {
+    if (this != that && that != null) {
+      this.atomicCommit = that.atomicCommit;
+      this.syncFolder = that.syncFolder;
+      this.deleteMissing = that.deleteMissing;
+      this.ignoreFailures = that.ignoreFailures;
+      this.overwrite = that.overwrite;
+      this.skipCRC = that.skipCRC;
+      this.blocking = that.blocking;
+      this.maxMaps = that.maxMaps;
+      this.mapBandwidth = that.mapBandwidth;
+      this.sslConfigurationFile = that.getSslConfigurationFile();
+      this.copyStrategy = that.copyStrategy;
+      this.preserveStatus = that.preserveStatus;
+      this.atomicWorkPath = that.getAtomicWorkPath();
+      this.logPath = that.getLogPath();
+      this.sourceFileListing = that.getSourceFileListing();
+      this.sourcePaths = that.getSourcePaths();
+      this.targetPath = that.getTargetPath();
+    }
+  }
+
+  /**
+   * Should the data be committed atomically?
+   *
+   * @return true if data should be committed automically. false otherwise
+   */
+  public boolean shouldAtomicCommit() {
+    return atomicCommit;
+  }
+
+  /**
+   * Set if data need to be committed automatically
+   *
+   * @param atomicCommit - boolean switch
+   */
+  public void setAtomicCommit(boolean atomicCommit) {
+    validate(DistCpOptionSwitch.ATOMIC_COMMIT, atomicCommit);
+    this.atomicCommit = atomicCommit;
+  }
+
+  /**
+   * Should the data be sync'ed between source and target paths?
+   *
+   * @return true if data should be sync'ed up. false otherwise
+   */
+  public boolean shouldSyncFolder() {
+    return syncFolder;
+  }
+
+  /**
+   * Set if source and target folder contents be sync'ed up
+   *
+   * @param syncFolder - boolean switch
+   */
+  public void setSyncFolder(boolean syncFolder) {
+    validate(DistCpOptionSwitch.SYNC_FOLDERS, syncFolder);
+    this.syncFolder = syncFolder;
+  }
+
+  /**
+   * Should target files missing in source should be deleted?
+   *
+   * @return true if zoombie target files to be removed. false otherwise
+   */
+  public boolean shouldDeleteMissing() {
+    return deleteMissing;
+  }
+
+  /**
+   * Set if files only present in target should be deleted
+   *
+   * @param deleteMissing - boolean switch
+   */
+  public void setDeleteMissing(boolean deleteMissing) {
+    validate(DistCpOptionSwitch.DELETE_MISSING, deleteMissing);
+    this.deleteMissing = deleteMissing;
+  }
+
+  /**
+   * Should failures be logged and ignored during copy?
+   *
+   * @return true if failures are to be logged and ignored. false otherwise
+   */
+  public boolean shouldIgnoreFailures() {
+    return ignoreFailures;
+  }
+
+  /**
+   * Set if failures during copy be ignored
+   *
+   * @param ignoreFailures - boolean switch
+   */
+  public void setIgnoreFailures(boolean ignoreFailures) {
+    this.ignoreFailures = ignoreFailures;
+  }
+
+  /**
+   * Should DistCp be running in blocking mode
+   *
+   * @return true if should run in blocking, false otherwise
+   */
+  public boolean shouldBlock() {
+    return blocking;
+  }
+
+  /**
+   * Set if Disctp should run blocking or non-blocking
+   *
+   * @param blocking - boolean switch
+   */
+  public void setBlocking(boolean blocking) {
+    this.blocking = blocking;
+  }
+
+  /**
+   * Should files be overwritten always?
+   *
+   * @return true if files in target that may exist before distcp, should always
+   *         be overwritten. false otherwise
+   */
+  public boolean shouldOverwrite() {
+    return overwrite;
+  }
+
+  /**
+   * Set if files should always be overwritten on target
+   *
+   * @param overwrite - boolean switch
+   */
+  public void setOverwrite(boolean overwrite) {
+    validate(DistCpOptionSwitch.OVERWRITE, overwrite);
+    this.overwrite = overwrite;
+  }
+
+  /**
+   * Should CRC/checksum check be skipped while checking files are identical
+   *
+   * @return true if checksum check should be skipped while checking files are
+   *         identical. false otherwise
+   */
+  public boolean shouldSkipCRC() {
+    return skipCRC;
+  }
+
+  /**
+   * Set if checksum comparison should be skipped while determining if
+   * source and destination files are identical
+   *
+   * @param skipCRC - boolean switch
+   */
+  public void setSkipCRC(boolean skipCRC) {
+    validate(DistCpOptionSwitch.SKIP_CRC, skipCRC);
+    this.skipCRC = skipCRC;
+  }
+
+  /** Get the max number of maps to use for this copy
+   *
+   * @return Max number of maps
+   */
+  public int getMaxMaps() {
+    return maxMaps;
+  }
+
+  /**
+   * Set the max number of maps to use for copy
+   *
+   * @param maxMaps - Number of maps
+   */
+  public void setMaxMaps(int maxMaps) {
+    this.maxMaps = Math.max(maxMaps, 1);
+  }
+
+  /** Get the map bandwidth in MB
+   *
+   * @return Bandwidth in MB
+   */
+  public int getMapBandwidth() {
+    return mapBandwidth;
+  }
+
+  /**
+   * Set per map bandwidth
+   *
+   * @param mapBandwidth - per map bandwidth
+   */
+  public void setMapBandwidth(int mapBandwidth) {
+    assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
+    this.mapBandwidth = mapBandwidth;
+  }
+
+  /**
+   * Get path where the ssl configuration file is present to use for hftps://
+   *
+   * @return Path on local file system
+   */
+  public String getSslConfigurationFile() {
+    return sslConfigurationFile;
+  }
+
+  /**
+   * Set the SSL configuration file path to use with hftps:// (local path)
+   *
+   * @param sslConfigurationFile - Local ssl config file path
+   */
+  public void setSslConfigurationFile(String sslConfigurationFile) {
+    this.sslConfigurationFile = sslConfigurationFile;
+  }
+
+  /**
+   * Returns an iterator with the list of file attributes to preserve
+   *
+   * @return iterator of file attributes to preserve
+   */
+  public Iterator<FileAttribute> preserveAttributes() {
+    return preserveStatus.iterator();
+  }
+
+  /**
+   * Checks if the input attibute should be preserved or not
+   *
+   * @param attribute - Attribute to check
+   * @return True if attribute should be preserved, false otherwise
+   */
+  public boolean shouldPreserve(FileAttribute attribute) {
+    return preserveStatus.contains(attribute);
+  }
+
+  /**
+   * Add file attributes that need to be preserved. This method may be
+   * called multiple times to add attributes.
+   *
+   * @param fileAttribute - Attribute to add, one at a time
+   */
+  public void preserve(FileAttribute fileAttribute) {
+    for (FileAttribute attribute : preserveStatus) {
+      if (attribute.equals(fileAttribute)) {
+        return;
+      }
+    }
+    preserveStatus.add(fileAttribute);
+  }
+
+  /** Get work path for atomic commit. If null, the work
+   * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath)
+   *
+   * @return Atomic work path on the target cluster. Null if not set
+   */
+  public Path getAtomicWorkPath() {
+    return atomicWorkPath;
+  }
+
+  /**
+   * Set the work path for atomic commit
+   *
+   * @param atomicWorkPath - Path on the target cluster
+   */
+  public void setAtomicWorkPath(Path atomicWorkPath) {
+    this.atomicWorkPath = atomicWorkPath;
+  }
+
+  /** Get output directory for writing distcp logs. Otherwise logs
+   * are temporarily written to JobStagingDir/_logs and deleted
+   * upon job completion
+   *
+   * @return Log output path on the cluster where distcp job is run
+   */
+  public Path getLogPath() {
+    return logPath;
+  }
+
+  /**
+   * Set the log path where distcp output logs are stored
+   * Uses JobStagingDir/_logs by default
+   *
+   * @param logPath - Path where logs will be saved
+   */
+  public void setLogPath(Path logPath) {
+    this.logPath = logPath;
+  }
+
+  /**
+   * Get the copy strategy to use. Uses appropriate input format
+   *
+   * @return copy strategy to use
+   */
+  public String getCopyStrategy() {
+    return copyStrategy;
+  }
+
+  /**
+   * Set the copy strategy to use. Should map to a strategy implementation
+   * in distp-default.xml
+   *
+   * @param copyStrategy - copy Strategy to use
+   */
+  public void setCopyStrategy(String copyStrategy) {
+    this.copyStrategy = copyStrategy;
+  }
+
+  /**
+   * File path (hdfs:// or file://) that contains the list of actual
+   * files to copy
+   *
+   * @return - Source listing file path
+   */
+  public Path getSourceFileListing() {
+    return sourceFileListing;
+  }
+
+  /**
+   * Getter for sourcePaths.
+   * @return List of source-paths.
+   */
+  public List<Path> getSourcePaths() {
+    return sourcePaths;
+  }
+
+  /**
+   * Setter for sourcePaths.
+   * @param sourcePaths The new list of source-paths.
+   */
+  public void setSourcePaths(List<Path> sourcePaths) {
+    assert sourcePaths != null && sourcePaths.size() != 0;
+    this.sourcePaths = sourcePaths;
+  }
+
+  /**
+   * Getter for the targetPath.
+   * @return The target-path.
+   */
+  public Path getTargetPath() {
+    return targetPath;
+  }
+
+  public void validate(DistCpOptionSwitch option, boolean value) {
+
+    boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
+        value : this.syncFolder);
+    boolean overwrite = (option == DistCpOptionSwitch.OVERWRITE ?
+        value : this.overwrite);
+    boolean deleteMissing = (option == DistCpOptionSwitch.DELETE_MISSING ?
+        value : this.deleteMissing);
+    boolean atomicCommit = (option == DistCpOptionSwitch.ATOMIC_COMMIT ?
+        value : this.atomicCommit);
+    boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
+        value : this.skipCRC);
+
+    if (syncFolder && atomicCommit) {
+      throw new IllegalArgumentException("Atomic commit can't be used with " +
+          "sync folder or overwrite options");
+    }
+
+    if (deleteMissing && !(overwrite || syncFolder)) {
+      throw new IllegalArgumentException("Delete missing is applicable " +
+          "only with update or overwrite options");
+    }
+
+    if (overwrite && syncFolder) {
+      throw new IllegalArgumentException("Overwrite and update options are " +
+          "mutually exclusive");
+    }
+
+    if (!syncFolder && skipCRC) {
+      throw new IllegalArgumentException("Skip CRC is valid only with update options");
+    }
+
+  }
+
+  /**
+   * Add options to configuration. These will be used in the Mapper/committer
+   *
+   * @param conf - Configruation object to which the options need to be added
+   */
+  public void appendToConf(Configuration conf) {
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
+        String.valueOf(atomicCommit));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
+        String.valueOf(ignoreFailures));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
+        String.valueOf(syncFolder));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
+        String.valueOf(deleteMissing));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
+        String.valueOf(overwrite));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
+        String.valueOf(skipCRC));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
+        String.valueOf(mapBandwidth));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
+        DistCpUtils.packAttributes(preserveStatus));
+  }
+
+  /**
+   * Utility to easily string-ify Options, for logging.
+   *
+   * @return String representation of the Options.
+   */
+  @Override
+  public String toString() {
+    return "DistCpOptions{" +
+        "atomicCommit=" + atomicCommit +
+        ", syncFolder=" + syncFolder +
+        ", deleteMissing=" + deleteMissing +
+        ", ignoreFailures=" + ignoreFailures +
+        ", maxMaps=" + maxMaps +
+        ", sslConfigurationFile='" + sslConfigurationFile + '\'' +
+        ", copyStrategy='" + copyStrategy + '\'' +
+        ", sourceFileListing=" + sourceFileListing +
+        ", sourcePaths=" + sourcePaths +
+        ", targetPath=" + targetPath +
+        '}';
+  }
+
+  @Override
+  protected DistCpOptions clone() throws CloneNotSupportedException {
+    return (DistCpOptions) super.clone();
+  }
+}

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * FileBasedCopyListing implements the CopyListing interface,
+ * to create the copy-listing for DistCp,
+ * by iterating over all source paths mentioned in a specified input-file.
+ */
+public class FileBasedCopyListing extends CopyListing {
+
+  private final CopyListing globbedListing;
+  /**
+   * Constructor, to initialize base-class.
+   * @param configuration The input Configuration object.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  public FileBasedCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+    globbedListing = new GlobbedCopyListing(getConf(), credentials);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException {
+  }
+
+  /**
+   * Implementation of CopyListing::buildListing().
+   *   Iterates over all source paths mentioned in the input-file.
+   * @param pathToListFile Path on HDFS where the listing file is written.
+   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @throws IOException
+   */
+  @Override
+  public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException {
+    DistCpOptions newOption = new DistCpOptions(options);
+    newOption.setSourcePaths(fetchFileList(options.getSourceFileListing()));
+    globbedListing.buildListing(pathToListFile, newOption);
+  }
+
+  private List<Path> fetchFileList(Path sourceListing) throws IOException {
+    List<Path> result = new ArrayList<Path>();
+    FileSystem fs = sourceListing.getFileSystem(getConf());
+    BufferedReader input = null;
+    try {
+      input = new BufferedReader(new InputStreamReader(fs.open(sourceListing)));
+      String line = input.readLine();
+      while (line != null) {
+        result.add(new Path(line));
+        line = input.readLine();
+      }
+    } finally {
+      IOUtils.closeStream(input);
+    }
+    return result;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getBytesToCopy() {
+    return globbedListing.getBytesToCopy();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getNumberOfPaths() {
+    return globbedListing.getNumberOfPaths();
+  }
+}

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * GlobbedCopyListing implements the CopyListing interface, to create the copy
+ * listing-file by "globbing" all specified source paths (wild-cards and all.)
+ */
+public class GlobbedCopyListing extends CopyListing {
+  private static final Log LOG = LogFactory.getLog(GlobbedCopyListing.class);
+
+  private final CopyListing simpleListing;
+  /**
+   * Constructor, to initialize the configuration.
+   * @param configuration The input Configuration object.
+   * @param credentials Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  public GlobbedCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+    simpleListing = new SimpleCopyListing(getConf(), credentials) ;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException {
+  }
+
+  /**
+   * Implementation of CopyListing::buildListing().
+   * Creates the copy listing by "globbing" all source-paths.
+   * @param pathToListingFile The location at which the copy-listing file
+   *                           is to be created.
+   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @throws IOException
+   */
+  @Override
+  public void doBuildListing(Path pathToListingFile,
+                             DistCpOptions options) throws IOException {
+
+    List<Path> globbedPaths = new ArrayList<Path>();
+    if (options.getSourcePaths().isEmpty()) {
+      throw new InvalidInputException("Nothing to process. Source paths::EMPTY");  
+    }
+
+    for (Path p : options.getSourcePaths()) {
+      FileSystem fs = p.getFileSystem(getConf());
+      FileStatus[] inputs = fs.globStatus(p);
+
+      if(inputs != null && inputs.length > 0) {
+        for (FileStatus onePath: inputs) {
+          globbedPaths.add(onePath.getPath());
+        }
+      } else {
+        throw new InvalidInputException(p + " doesn't exist");        
+      }
+    }
+
+    DistCpOptions optionsGlobbed = new DistCpOptions(options);
+    optionsGlobbed.setSourcePaths(globbedPaths);
+    simpleListing.buildListing(pathToListingFile, optionsGlobbed);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getBytesToCopy() {
+    return simpleListing.getBytesToCopy();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getNumberOfPaths() {
+    return simpleListing.getNumberOfPaths();
+  }
+
+}

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.cli.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+
+import java.util.*;
+
+/**
+ * The OptionsParser parses out the command-line options passed to DistCp,
+ * and interprets those specific to DistCp, to create an Options object.
+ */
+public class OptionsParser {
+
+  private static final Log LOG = LogFactory.getLog(OptionsParser.class);
+
+  private static final Options cliOptions = new Options();      
+
+  static {
+    for (DistCpOptionSwitch option : DistCpOptionSwitch.values()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding option " + option.getOption());
+      }
+      cliOptions.addOption(option.getOption());
+    }
+  }
+
+  private static class CustomParser extends GnuParser {
+    @Override
+    protected String[] flatten(Options options, String[] arguments, boolean stopAtNonOption) {
+      for (int index = 0; index < arguments.length; index++) {
+        if (arguments[index].equals("-" + DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+          arguments[index] = "-prbugp";
+        }
+      }
+      return super.flatten(options, arguments, stopAtNonOption);
+    }
+  }
+
+  /**
+   * The parse method parses the command-line options, and creates
+   * a corresponding Options object.
+   * @param args Command-line arguments (excluding the options consumed
+   *              by the GenericOptionsParser).
+   * @return The Options object, corresponding to the specified command-line.
+   * @throws IllegalArgumentException: Thrown if the parse fails.
+   */
+  public static DistCpOptions parse(String args[]) throws IllegalArgumentException {
+
+    CommandLineParser parser = new CustomParser();
+
+    CommandLine command;
+    try {
+      command = parser.parse(cliOptions, args, true);
+    } catch (ParseException e) {
+      throw new IllegalArgumentException("Unable to parse arguments. " +
+        Arrays.toString(args), e);
+    }
+
+    DistCpOptions option;
+    Path targetPath;
+    List<Path> sourcePaths = new ArrayList<Path>();
+
+    String leftOverArgs[] = command.getArgs();
+    if (leftOverArgs == null || leftOverArgs.length < 1) {
+      throw new IllegalArgumentException("Target path not specified");
+    }
+
+    //Last Argument is the target path
+    targetPath = new Path(leftOverArgs[leftOverArgs.length -1].trim());
+
+    //Copy any source paths in the arguments to the list
+    for (int index = 0; index < leftOverArgs.length - 1; index++) {
+      sourcePaths.add(new Path(leftOverArgs[index].trim()));
+    }
+
+    /* If command has source file listing, use it else, fall back on source paths in args
+       If both are present, throw exception and bail */
+    if (command.hasOption(DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) {
+      if (!sourcePaths.isEmpty()) {
+        throw new IllegalArgumentException("Both source file listing and source paths present");
+      }
+      option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
+              SOURCE_FILE_LISTING.getSwitch())), targetPath);
+    } else {
+      if (sourcePaths.isEmpty()) {
+        throw new IllegalArgumentException("Neither source file listing nor source paths present");
+      }
+      option = new DistCpOptions(sourcePaths, targetPath);
+    }
+
+    //Process all the other option switches and set options appropriately
+    if (command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) {
+      option.setIgnoreFailures(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch())) {
+      option.setAtomicCommit(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch()) &&
+        option.shouldAtomicCommit()) {
+      String workPath = getVal(command, DistCpOptionSwitch.WORK_PATH.getSwitch());
+      if (workPath != null && !workPath.isEmpty()) {
+        option.setAtomicWorkPath(new Path(workPath));
+      }
+    } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
+      throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic");      
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
+      option.setLogPath(new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch())));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch())) {
+      option.setSyncFolder(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch())) {
+      option.setOverwrite(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
+      option.setDeleteMissing(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch())) {
+      option.setSkipCRC(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) {
+      option.setBlocking(false);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
+      try {
+        Integer mapBandwidth = Integer.parseInt(
+            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
+        if (mapBandwidth.intValue() <= 0) {
+          throw new IllegalArgumentException("Bandwidth specified is not positive: " +
+              mapBandwidth);
+        }
+        option.setMapBandwidth(mapBandwidth);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Bandwidth specified is invalid: " +
+            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
+      }
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) {
+      option.setSslConfigurationFile(command.
+          getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
+      try {
+        Integer maps = Integer.parseInt(
+            getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
+        option.setMaxMaps(maps);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Number of maps is invalid: " +
+            getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
+      }
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
+      option.setCopyStrategy(
+            getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+      String attributes =
+          getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch());
+      if (attributes == null || attributes.isEmpty()) {
+        for (FileAttribute attribute : FileAttribute.values()) {
+          option.preserve(attribute);
+        }
+      } else {
+        for (int index = 0; index < attributes.length(); index++) {
+          option.preserve(FileAttribute.
+              getAttribute(attributes.charAt(index)));
+        }
+      }
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
+      String fileLimitString = getVal(command,
+                              DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
+      try {
+        Integer.parseInt(fileLimitString);
+      }
+      catch (NumberFormatException e) {
+        throw new IllegalArgumentException("File-limit is invalid: "
+                                            + fileLimitString, e);
+      }
+      LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
+              " option. Ignoring.");
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
+      String sizeLimitString = getVal(command,
+                              DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
+      try {
+        Long.parseLong(sizeLimitString);
+      }
+      catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Size-limit is invalid: "
+                                            + sizeLimitString, e);
+      }
+      LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
+              " option. Ignoring.");
+    }
+
+    return option;
+  }
+
+  private static String getVal(CommandLine command, String swtch) {
+    String optionValue = command.getOptionValue(swtch);
+    if (optionValue == null) {
+      return null;
+    } else {
+      return optionValue.trim();
+    }
+  }
+
+  public static void usage() {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp("distcp OPTIONS [source_path...] <target_path>\n\nOPTIONS", cliOptions);
+  }
+}

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.*;
+import java.util.Stack;
+
+/**
+ * The SimpleCopyListing is responsible for making the exhaustive list of
+ * all files/directories under its specified list of input-paths.
+ * These are written into the specified copy-listing file.
+ * Note: The SimpleCopyListing doesn't handle wild-cards in the input-paths.
+ */
+public class SimpleCopyListing extends CopyListing {
+  private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
+
+  private long totalPaths = 0;
+  private long totalBytesToCopy = 0;
+
+  /**
+   * Protected constructor, to initialize configuration.
+   *
+   * @param configuration The input configuration, with which the source/target FileSystems may be accessed.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  protected SimpleCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+  }
+
+  @Override
+  protected void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException {
+
+    Path targetPath = options.getTargetPath();
+    FileSystem targetFS = targetPath.getFileSystem(getConf());
+    boolean targetIsFile = targetFS.isFile(targetPath);
+
+    //If target is a file, then source has to be single file
+    if (targetIsFile) {
+      if (options.getSourcePaths().size() > 1) {
+        throw new InvalidInputException("Multiple source being copied to a file: " +
+            targetPath);
+      }
+
+      Path srcPath = options.getSourcePaths().get(0);
+      FileSystem sourceFS = srcPath.getFileSystem(getConf());
+      if (!sourceFS.isFile(srcPath)) {
+        throw new InvalidInputException("Cannot copy " + srcPath +
+            ", which is not a file to " + targetPath);
+      }
+    }
+
+    if (options.shouldAtomicCommit() && targetFS.exists(targetPath)) {
+      throw new InvalidInputException("Target path for atomic-commit already exists: " +
+        targetPath + ". Cannot atomic-commit to pre-existing target-path.");
+    }
+
+    for (Path path: options.getSourcePaths()) {
+      FileSystem fs = path.getFileSystem(getConf());
+      if (!fs.exists(path)) {
+        throw new InvalidInputException(path + " doesn't exist");
+      }
+    }
+
+    /* This is requires to allow map tasks to access each of the source
+       clusters. This would retrieve the delegation token for each unique
+       file system and add them to job's private credential store
+     */
+    Credentials credentials = getCredentials();
+    if (credentials != null) {
+      Path[] inputPaths = options.getSourcePaths().toArray(new Path[1]);
+      TokenCache.obtainTokensForNamenodes(credentials, inputPaths, getConf());
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
+
+    SequenceFile.Writer fileListWriter = null;
+
+    try {
+      fileListWriter = getWriter(pathToListingFile);
+
+      for (Path path: options.getSourcePaths()) {
+        FileSystem sourceFS = path.getFileSystem(getConf());
+        path = makeQualified(path);
+
+        FileStatus rootStatus = sourceFS.getFileStatus(path);
+        Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
+        boolean localFile = (rootStatus.getClass() != FileStatus.class);
+
+        FileStatus[] sourceFiles = sourceFS.listStatus(path);
+        if (sourceFiles != null && sourceFiles.length > 0) {
+          for (FileStatus sourceStatus: sourceFiles) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
+            }
+            writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot,
+                localFile, options);
+
+            if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath());
+              }
+              traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot,
+                  localFile, options);
+            }
+          }
+        } else {
+          writeToFileListing(fileListWriter, rootStatus, sourcePathRoot,
+              localFile, options);
+        }
+      }
+    } finally {
+      IOUtils.closeStream(fileListWriter);
+    }
+  }
+
+  private Path computeSourceRootPath(FileStatus sourceStatus,
+                                     DistCpOptions options) throws IOException {
+
+    Path target = options.getTargetPath();
+    FileSystem targetFS = target.getFileSystem(getConf());
+
+    boolean solitaryFile = options.getSourcePaths().size() == 1
+                                                && !sourceStatus.isDir();
+
+    if (solitaryFile) {
+      if (targetFS.isFile(target) || !targetFS.exists(target)) {
+        return sourceStatus.getPath();
+      } else {
+        return sourceStatus.getPath().getParent();
+      }
+    } else {
+      boolean specialHandling = (options.getSourcePaths().size() == 1 && !targetFS.exists(target)) ||
+          options.shouldSyncFolder() || options.shouldOverwrite();
+
+      return specialHandling && sourceStatus.isDir() ? sourceStatus.getPath() :
+          sourceStatus.getPath().getParent();
+    }
+  }
+
+  /**
+   * Provide an option to skip copy of a path, Allows for exclusion
+   * of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME}
+   * @param path - Path being considered for copy while building the file listing
+   * @param options - Input options passed during DistCp invocation
+   * @return - True if the path should be considered for copy, false otherwise
+   */
+  protected boolean shouldCopy(Path path, DistCpOptions options) {
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getBytesToCopy() {
+    return totalBytesToCopy;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getNumberOfPaths() {
+    return totalPaths;
+  }
+
+  private Path makeQualified(Path path) throws IOException {
+    final FileSystem fs = path.getFileSystem(getConf());
+    return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+
+  private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
+    FileSystem fs = pathToListFile.getFileSystem(getConf());
+    if (fs.exists(pathToListFile)) {
+      fs.delete(pathToListFile, false);
+    }
+    return SequenceFile.createWriter(fs, getConf(), pathToListFile,
+        Text.class, FileStatus.class, SequenceFile.CompressionType.NONE);
+  }
+
+  private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem,
+                                    FileStatus fileStatus) throws IOException {
+    return fileStatus.isDir() && getChildren(fileSystem, fileStatus).length > 0;
+  }
+
+  private static FileStatus[] getChildren(FileSystem fileSystem,
+                                         FileStatus parent) throws IOException {
+    return fileSystem.listStatus(parent.getPath());
+  }
+
+  private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
+                                         FileStatus sourceStatus,
+                                         Path sourcePathRoot,
+                                         boolean localFile,
+                                         DistCpOptions options)
+                                         throws IOException {
+    FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
+    Stack<FileStatus> pathStack = new Stack<FileStatus>();
+    pathStack.push(sourceStatus);
+
+    while (!pathStack.isEmpty()) {
+      for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Recording source-path: "
+                    + sourceStatus.getPath() + " for copy.");
+        writeToFileListing(fileListWriter, child, sourcePathRoot,
+             localFile, options);
+        if (isDirectoryAndNotEmpty(sourceFS, child)) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Traversing non-empty source dir: "
+                       + sourceStatus.getPath());
+          pathStack.push(child);
+        }
+      }
+    }
+  }
+
+  private void writeToFileListing(SequenceFile.Writer fileListWriter,
+                                  FileStatus fileStatus,
+                                  Path sourcePathRoot,
+                                  boolean localFile,
+                                  DistCpOptions options) throws IOException {
+    if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDir())
+      return; // Skip the root-paths.
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
+        fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
+    }
+
+    FileStatus status = fileStatus;
+    if (localFile) {
+      status = getFileStatus(fileStatus);
+    }
+
+    if (!shouldCopy(fileStatus.getPath(), options)) {
+      return;
+    }
+
+    fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
+        fileStatus.getPath())), status);
+    fileListWriter.sync();
+
+    if (!fileStatus.isDir()) {
+      totalBytesToCopy += fileStatus.getLen();
+    }
+    totalPaths++;
+  }
+
+  private static final ByteArrayOutputStream buffer = new ByteArrayOutputStream(64);
+  private DataInputBuffer in = new DataInputBuffer();
+  
+  private FileStatus getFileStatus(FileStatus fileStatus) throws IOException {
+    FileStatus status = new FileStatus();
+
+    buffer.reset();
+    DataOutputStream out = new DataOutputStream(buffer);
+    fileStatus.write(out);
+
+    in.reset(buffer.toByteArray(), 0, buffer.size());
+    status.readFields(in);
+    return status;
+  }
+}

Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.tools.distcp2.CopyListing;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.distcp2.GlobbedCopyListing;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+
+/**
+ * The CopyCommitter class is DistCp's OutputCommitter implementation. It is
+ * responsible for handling the completion/cleanup of the DistCp run.
+ * Specifically, it does the following:
+ *  1. Cleanup of the meta-folder (where DistCp maintains its file-list, etc.)
+ *  2. Preservation of user/group/replication-factor on any directories that
+ *     have been copied. (Files are taken care of in their map-tasks.)
+ *  3. Atomic-move of data from the temporary work-folder to the final path
+ *     (if atomic-commit was opted for).
+ *  4. Deletion of files from the target that are missing at source (if opted for).
+ *  5. Cleanup of any partially copied files, from previous, failed attempts.
+ */
+public class CopyCommitter extends FileOutputCommitter {
+  private static final Log LOG = LogFactory.getLog(CopyCommitter.class);
+
+  private final TaskAttemptContext taskAttemptContext;
+
+  /**
+   * Create a output committer
+   *
+   * @param outputPath the job's output path
+   * @param context    the task's context
+   * @throws IOException - Exception if any
+   */
+  public CopyCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    this.taskAttemptContext = context;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    Configuration conf = jobContext.getConfiguration();
+    super.commitJob(jobContext);
+
+    cleanupTempFiles(jobContext);
+
+    String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+    if (attributes != null && !attributes.isEmpty()) {
+      preserveFileAttributesForDirectories(conf);
+    }
+
+    try {
+      if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
+        deleteMissing(conf);
+      } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
+        commitData(conf);
+      }
+      taskAttemptContext.setStatus("Commit Successful");
+    }
+    finally {
+      cleanup(conf);
+    }
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void abortJob(JobContext jobContext,
+                       JobStatus.State state) throws IOException {
+    try {
+      super.abortJob(jobContext, state);
+    } finally {
+      cleanupTempFiles(jobContext);
+      cleanup(jobContext.getConfiguration());
+    }
+  }
+
+  private void cleanupTempFiles(JobContext context) {
+    try {
+      Configuration conf = context.getConfiguration();
+
+      Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+      FileSystem targetFS = targetWorkPath.getFileSystem(conf);
+
+      String jobId = context.getJobID().toString();
+      deleteAttemptTempFiles(targetWorkPath, targetFS, jobId);
+      deleteAttemptTempFiles(targetWorkPath.getParent(), targetFS, jobId);
+    } catch (Throwable t) {
+      LOG.warn("Unable to cleanup temp files", t);
+    }
+  }
+
+  private void deleteAttemptTempFiles(Path targetWorkPath,
+                                      FileSystem targetFS,
+                                      String jobId) throws IOException {
+
+    FileStatus[] tempFiles = targetFS.globStatus(
+        new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*"));
+
+    if (tempFiles != null && tempFiles.length > 0) {
+      for (FileStatus file : tempFiles) {
+        LOG.info("Cleaning up " + file.getPath());
+        targetFS.delete(file.getPath(), false);
+      }
+    }
+  }
+
+  /**
+   * Cleanup meta folder and other temporary files
+   *
+   * @param conf - Job Configuration
+   */
+  private void cleanup(Configuration conf) {
+    Path metaFolder = new Path(conf.get(DistCpConstants.CONF_LABEL_META_FOLDER));
+    try {
+      FileSystem fs = metaFolder.getFileSystem(conf);
+      LOG.info("Cleaning up temporary work folder: " + metaFolder);
+      fs.delete(metaFolder, true);
+    } catch (IOException ignore) {
+      LOG.error("Exception encountered ", ignore);
+    }
+  }
+
+  // This method changes the target-directories' file-attributes (owner,
+  // user/group permissions, etc.) based on the corresponding source directories.
+  private void preserveFileAttributesForDirectories(Configuration conf) throws IOException {
+    String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+    LOG.info("About to preserve attributes: " + attrSymbols);
+
+    EnumSet<FileAttribute> attributes = DistCpUtils.unpackAttributes(attrSymbols);
+
+    Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+    FileSystem clusterFS = sourceListing.getFileSystem(conf);
+    SequenceFile.Reader sourceReader = new SequenceFile.Reader(
+        clusterFS, sourceListing, conf);
+    long totalLen = clusterFS.getFileStatus(sourceListing).getLen();
+
+    Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+    long preservedEntries = 0;
+    try {
+      FileStatus srcFileStatus = new FileStatus();
+      Text srcRelPath = new Text();
+
+      // Iterate over every source path that was copied.
+      while (sourceReader.next(srcRelPath, srcFileStatus)) {
+        // File-attributes for files are set at the time of copy,
+        // in the map-task.
+        if (! srcFileStatus.isDir()) continue;
+
+        Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
+
+        // Skip the root folder.
+        // Status can't be preserved on root-folder. (E.g. multiple paths may
+        // be copied to a single target folder. Which source-attributes to use
+        // on the target is undefined.)
+        if (targetRoot.equals(targetFile)) continue;
+
+        FileSystem targetFS = targetFile.getFileSystem(conf);
+        DistCpUtils.preserve(targetFS, targetFile, srcFileStatus,  attributes);
+
+        taskAttemptContext.progress();
+        taskAttemptContext.setStatus("Preserving status on directory entries. [" +
+            sourceReader.getPosition() * 100 / totalLen + "%]");
+      }
+    } finally {
+      IOUtils.closeStream(sourceReader);
+    }
+    LOG.info("Preserved status on " + preservedEntries + " dir entries on target");
+  }
+
+  // This method deletes "extra" files from the target, if they're not
+  // available at the source.
+  private void deleteMissing(Configuration conf) throws IOException {
+    LOG.info("-delete option is enabled. About to remove entries from " +
+        "target that are missing in source");
+
+    // Sort the source-file listing alphabetically.
+    Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+    FileSystem clusterFS = sourceListing.getFileSystem(conf);
+    Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing);
+
+    // Similarly, create the listing of target-files. Sort alphabetically.
+    Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
+    CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
+
+    List<Path> targets = new ArrayList<Path>(1);
+    Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    targets.add(targetFinalPath);
+    DistCpOptions options = new DistCpOptions(targets, new Path("/NONE"));
+
+    target.buildListing(targetListing, options);
+    Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
+    long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();
+
+    SequenceFile.Reader sourceReader = new SequenceFile.Reader(
+        clusterFS, sortedSourceListing, conf);
+    SequenceFile.Reader targetReader = new SequenceFile.Reader(
+        clusterFS, sortedTargetListing, conf);
+
+    // Walk both source and target file listings.
+    // Delete all from target that doesn't also exist on source.
+    long deletedEntries = 0;
+    try {
+      FileStatus srcFileStatus = new FileStatus();
+      Text srcRelPath = new Text();
+      FileStatus trgtFileStatus = new FileStatus();
+      Text trgtRelPath = new Text();
+
+      FileSystem targetFS = targetFinalPath.getFileSystem(conf);
+      boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+      while (targetReader.next(trgtRelPath, trgtFileStatus)) {
+        // Skip sources that don't exist on target.
+        while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) {
+          srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+        }
+
+        if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue;
+
+        // Target doesn't exist at source. Delete.
+        boolean result = (!targetFS.exists(trgtFileStatus.getPath()) ||
+            targetFS.delete(trgtFileStatus.getPath(), true));
+        if (result) {
+          LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source");
+          deletedEntries++;
+        } else {
+          throw new IOException("Unable to delete " + trgtFileStatus.getPath());
+        }
+        taskAttemptContext.progress();
+        taskAttemptContext.setStatus("Deleting missing files from target. [" +
+            targetReader.getPosition() * 100 / totalLen + "%]");
+      }
+    } finally {
+      IOUtils.closeStream(sourceReader);
+      IOUtils.closeStream(targetReader);
+    }
+    LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0));
+  }
+
+  private void commitData(Configuration conf) throws IOException {
+
+    Path workDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+    Path finalDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    FileSystem targetFS = workDir.getFileSystem(conf);
+
+    LOG.info("Atomic commit enabled. Moving " + workDir + " to " + finalDir);
+    if (targetFS.exists(finalDir) && targetFS.exists(workDir)) {
+      LOG.error("Pre-existing final-path found at: " + finalDir);
+      throw new IOException("Target-path can't be committed to because it " +
+          "exists at " + finalDir + ". Copied data is in temp-dir: " + workDir + ". ");
+    }
+
+    boolean result = targetFS.rename(workDir, finalDir);
+    if (!result) {
+      LOG.warn("Rename failed. Perhaps data already moved. Verifying...");
+      result = targetFS.exists(finalDir) && !targetFS.exists(workDir);
+    }
+    if (result) {
+      LOG.info("Data committed successfully to " + finalDir);
+      taskAttemptContext.setStatus("Data committed successfully to " + finalDir);
+    } else {
+      LOG.error("Unable to commit data to " + finalDir);
+      throw new IOException("Atomic commit failed. Temporary data in " + workDir +
+        ", Unable to move to " + finalDir);
+    }
+  }
+}