You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/09/06 09:04:32 UTC

svn commit: r573166 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/util/CopyFiles.java src/test/org/apache/hadoop/fs/TestCopyFiles.java

Author: omalley
Date: Thu Sep  6 00:04:31 2007
New Revision: 573166

URL: http://svn.apache.org/viewvc?rev=573166&view=rev
Log:
HADOOP-1569.  Fixes DistCP to use the standard FileSystem interface. 
Contributed by Chris Douglas.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=573166&r1=573165&r2=573166&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep  6 00:04:31 2007
@@ -163,6 +163,9 @@
     HADOOP-1425.  Replace uses of ToolBase with the Tool interface.
     (Enis Soztutar via cutting)
 
+    HADOOP-1569.  Reimplement DistCP to use the standard FileSystem/URI
+    code in Hadoop so that you can copy from and to all of the supported file 
+    systems.(Chris Douglas via omalley)
 
 Release 0.14.1 - 2007-09-04
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=573166&r1=573165&r2=573166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Thu Sep  6 00:04:31 2007
@@ -18,24 +18,25 @@
 
 package org.apache.hadoop.util;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.EnumSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
+import java.util.Stack;
 import java.util.StringTokenizer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -43,39 +44,54 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InvalidInputException;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
 
 /**
  * A Map-reduce program to recursively copy directories between
  * different file-systems.
  */
 public class CopyFiles implements Tool {
-  private static final String HDFS = "hdfs";
-  private static final String S3 = "s3";
-  
-  private static final String usage = "distcp "+
-    "[-i] <srcurl> | -f <urilist_uri> <desturl> [-log <logpath>]";
-  
-  private static final long MIN_BYTES_PER_MAP = 1L << 28;
-  private static final int MAX_NUM_MAPS = 10000;
-  private static final int MAX_MAPS_PER_NODE = 10;
-  
-  private static final String readFailuresAttribute = 
-    "distcp.ignore.read.failures";
-  
+  private static final Log LOG = LogFactory.getLog(CopyFiles.class);
+
+  private static final String usage =
+    "distcp [OPTIONS] <srcurl>* <desturl>" +
+    "\n\nOPTIONS:" +
+    "\n-p                     Preserve status" +
+    "\n-i                     Ignore failures" +
+    "\n-log <logdir>          Write logs to <logdir>" +
+    "\n-overwrite             Overwrite destination" +
+    "\n-update                Overwrite if src modif time later than dst" +
+    "\n-f <urilist_uri>       Use list at <urilist_uri> as src list" +
+    "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
+    "\n      interpreted as an isomorphic update to an existing directory." +
+    "\nFor example:" +
+    "\nhadoop distcp -p -update \"hdfs://A:8020/user/foo/bar\" " +
+    "\"hdfs://B:8020/user/foo/baz\"\n" +
+    "\n     would update all descendants of 'baz' also in 'bar'; it would " +
+    "\n     *not* update /user/foo/baz/bar\n";
+
+  private static final long BYTES_PER_MAP =  256 * 1024 * 1024;
+  private static final int MAX_MAPS_PER_NODE = 20;
+
+  private static final int SYNC_FILE_MAX = 10;
+
   private JobConf conf;
-  
+
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
       this.conf = (JobConf) conf;
@@ -83,723 +99,354 @@
       this.conf = new JobConf(conf);
     }
   }
-  
+
   public Configuration getConf() {
     return conf;
   }
-  
-  public CopyFiles() {
-  }
-  
+
+  @Deprecated
+  public CopyFiles() { }
+
   public CopyFiles(Configuration conf) {
     setConf(conf);
   }
-  
+
   /**
-   * Base-class for all mappers for distcp
+   * An input/output pair of filenames.
    */
-  public static abstract class CopyFilesMapper extends MapReduceBase 
-  {
-    /**
-     * Interface to initialize *distcp* specific map tasks.
-     * @param conf : The dfs/mapred configuration.
-     * @param jobConf : The handle to the jobConf object to be initialized.
-     * @param srcPaths : The source paths.
-     * @param destPath : The destination path.
-     * @param logPath : The log path.
-     * @param ignoreReadFailures : Ignore read failures?
-     * @throws IOException
-     */
-    public abstract void setup(Configuration conf, JobConf jobConf, 
-                               String[] srcPaths, String destPath,
-                               Path logPath, boolean ignoreReadFailures) 
-      throws IOException;
-    
+  static class FilePair implements Writable {
+    FileStatus input = new FileStatus();
+    Path output;
+    FilePair() { }
+    FilePair(FileStatus input, Path output) {
+      this.input = input;
+      this.output = output;
+    }
+    public void readFields(DataInput in) throws IOException {
+      input.readFields(in);
+      output = new Path(Text.readString(in));
+    }
+    public void write(DataOutput out) throws IOException {
+      input.write(out);
+      Text.writeString(out, output.toString());
+    }
+  }
+
+  /**
+   * InputFormat of a distcp job responsible for generating splits of the src
+   * file list.
+   */
+  static class CopyInputFormat implements InputFormat {
+
     /**
-     * Interface to cleanup *distcp* specific resources
-     * @param conf : The dfs/mapred configuration.
-     * @param jobConf : The handle to the jobConf object to be initialized.
-     * @param srcPath : The source uri.
-     * @param destPath : The destination uri.
-     * @throws IOException
+     * Does nothing.
      */
-    public abstract void cleanup(Configuration conf, JobConf jobConf, 
-                                 String srcPath, String destPath) throws IOException;
-    
+    public void validateInput(JobConf job) throws IOException { }
+
     /**
-     * Make a path relative with respect to a root path.
-     * absPath is always assumed to descend from root.
-     * Otherwise returned path is null.
+     * Produce splits such that each is no greater than the quotient of the
+     * total size and the number of splits requested.
+     * @param job The handle to the JobConf object
+     * @param numSplits Number of splits requested
      */
-    public static Path makeRelative(Path root, Path absPath) {
-      if (!absPath.isAbsolute()) { return absPath; }
-      String sRoot = root.toUri().getPath();
-      String sPath = absPath.toUri().getPath();
-      Enumeration<Object> rootTokens = new StringTokenizer(sRoot, "/");
-      ArrayList rList = Collections.list(rootTokens);
-      Enumeration<Object> pathTokens = new StringTokenizer(sPath, "/");
-      ArrayList pList = Collections.list(pathTokens);
-      Iterator rIter = rList.iterator();
-      Iterator pIter = pList.iterator();
-      while (rIter.hasNext()) {
-        String rElem = (String) rIter.next();
-        String pElem = (String) pIter.next();
-        if (!rElem.equals(pElem)) { return null; }
-      }
-      StringBuffer sb = new StringBuffer();
-      while (pIter.hasNext()) {
-        String pElem = (String) pIter.next();
-        sb.append(pElem);
-        if (pIter.hasNext()) { sb.append("/"); }
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+        throws IOException {
+      int cnfiles = job.getInt("distcp.file.count", -1);
+      long cbsize = job.getLong("distcp.total.size", -1);
+      String srcfilelist = job.get("distcp.src.list", "");
+      if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
+        throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
+                                   ") total_size(" + cbsize + ") listuri(" +
+                                   srcfilelist + ")");
+      }
+      Path src = new Path(srcfilelist);
+      FileSystem fs = src.getFileSystem(job);
+      FileStatus srcst = fs.getFileStatus(src);
+
+      ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+      LongWritable key = new LongWritable();
+      FilePair value = new FilePair();
+      final long targetsize = cbsize / numSplits;
+      long pos = 0L;
+      long last = 0L;
+      long acc = 0L;
+      long cbrem = srcst.getLen();
+      for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, job);
+           sl.next(key, value); last = sl.getPosition()) {
+        // if adding this split would put this split past the target size,
+        // cut the last split and put this next file in the next split.
+        if (acc + key.get() > targetsize && acc != 0) {
+          long splitsize = last - pos;
+          splits.add(new FileSplit(src, pos, splitsize, job));
+          cbrem -= splitsize;
+          pos = last;
+          acc = 0L;
+        }
+        acc += key.get();
+      }
+      if (cbrem != 0) {
+        splits.add(new FileSplit(src, pos, cbrem, job));
       }
-      return new Path(sb.toString());
+
+      return splits.toArray(new FileSplit[splits.size()]);
     }
+
     /**
-     * Calculate how many maps to run.
-     * Ideal number of maps is one per file (if the map-launching overhead
-     * were 0). It is limited by jobtrackers handling capacity which, lets say,
-     * is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for 
-     * small files it is better to determine number of maps by 
-     * amount of data per map.
-     * @param initialEstimate Initial guess at number of maps (e.g. count of
-     * files).
-     * @param totalBytes Count of total bytes for job (If not known, pass -1).
-     * @param client
-     * @return Count of maps to run.
-     * @throws IOException
+     * Returns a reader for this split of the src file list.
      */
-    public int getMapCount(final int initialEstimate, final long totalBytes,
-                           final JobClient client)
-      throws IOException {
-      int numMaps = initialEstimate;
-      if (numMaps > MAX_NUM_MAPS) {
-        numMaps = MAX_NUM_MAPS;
-      }
-      if (totalBytes != -1 &&
-          numMaps > (int)(totalBytes / MIN_BYTES_PER_MAP)) {
-        numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP);
-      }
-      ClusterStatus cluster = client.getClusterStatus();
-      int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
-      if (numMaps > tmpMaps) {
-        numMaps = tmpMaps;
-      }
-      if (numMaps == 0) {
-        numMaps = 1;
-      }
-      return numMaps;
-    } 
+    public RecordReader getRecordReader(InputSplit split, JobConf job,
+                                 Reporter reporter) throws IOException {
+      return new SequenceFileRecordReader(job, (FileSplit)split);
+    }
+  }
+
+  /**
+   * Return true if dst should be replaced by src and the update flag is set.
+   * Right now, this merely checks that the src and dst len are not equal. This
+   * should be improved on once modification times, CRCs, etc. can
+   * be meaningful in this context.
+   */
+  private static boolean needsUpdate(FileStatus src, FileStatus dst) {
+    return src.getLen() != dst.getLen();
   }
-  
+
   /**
-   * DFSCopyFilesMapper: The mapper for copying files from the DFS.
+   * FSCopyFilesMapper: The mapper for copying files between FileSystems.
    */
-  public static class FSCopyFilesMapper extends CopyFilesMapper 
-    implements Mapper<Text, Writable, WritableComparable, Text> 
-  {
-    private int sizeBuf = 4096;
-    private FileSystem srcFileSys = null;
+  public static class FSCopyFilesMapper
+      implements Mapper<LongWritable, FilePair, WritableComparable, Text> {
+    // config
+    private int sizeBuf = 128 * 1024;
     private FileSystem destFileSys = null;
-    private Path srcPath = null;
+    private boolean ignoreReadFailures;
+    private boolean preserve_status;
+    private boolean overwrite;
+    private boolean update;
     private Path destPath = null;
     private byte[] buffer = null;
-    private static final long reportInterval = 1L << 25;
+    private JobConf job;
+
+    // stats
+    private static final long reportInterval = BYTES_PER_MAP / 8;
     private long bytesSinceLastReport = 0L;
     private long totalBytesCopied = 0L;
-    private static DecimalFormat percentFormat = new DecimalFormat("0.00");
-    private boolean ignoreReadFailures;
-    
-    private void copy(String src, Reporter reporter) throws IOException {
-      // open source file
-      Path srcFile = new Path(srcPath, src);
-      FSDataInputStream in = srcFileSys.open(srcFile);
-      FileStatus srcFileStatus = srcFileSys.getFileStatus(srcFile);
-      long totalBytes = srcFileStatus.getLen();
-      
-      // create directories to hold destination file and create destFile
-      Path destFile = new Path(destPath, src);
-      Path destParent = destFile.getParent();
-      if (destParent != null) { 
-        if (!destFileSys.mkdirs(destParent)) {
-          throw new IOException("Mkdirs failed to create " + 
-                                destParent.toString()); 
-        }
-      }
-      FSDataOutputStream out = destFileSys.create(destFile);
-      
-      // copy file
-      while (true) {
-        int nread = in.read(buffer);
-        if (nread < 0) { break; }
-        out.write(buffer, 0, nread);
-        bytesSinceLastReport += nread;
-        if (bytesSinceLastReport > reportInterval) {
-          totalBytesCopied += bytesSinceLastReport;
-          bytesSinceLastReport = 0L;
-          reporter.setStatus("Copy "+ src + ": " + 
-                             percentFormat.format(100.0 * totalBytesCopied / 
-                                                  totalBytes) +
-                             "% and " +
-                             StringUtils.humanReadableInt(totalBytesCopied) +
-                             " bytes");
-        }
-      }
-      
-      in.close();
-      out.close();
-      // report at least once for each file
-      totalBytesCopied += bytesSinceLastReport;
-      bytesSinceLastReport = 0L;
-      reporter.setStatus("Finished. Bytes copied: " + 
-                         StringUtils.humanReadableInt(totalBytesCopied));
-    }
-    
+    private static final DecimalFormat pcntfmt = new DecimalFormat("0.00");
+
     /**
-     * Initialize DFSCopyFileMapper specific job-configuration.
-     * @param conf : The dfs/mapred configuration.
-     * @param jobConf : The handle to the jobConf object to be initialized.
-     * @param srcPaths : The source URIs.
-     * @param destPath : The destination URI.
-     * @param logPath : The log Path.
-     * @param ignoreReadFailures : Ignore read failures?
+     * Copy a file to a destination.
+     * @param srcstat src path and metadata
+     * @param dstpath dst path
+     * @param reporter
      */
-    @Override
-    public void setup(Configuration conf, JobConf jobConf, 
-                      String[] srcPaths, String destPath, 
-                      Path logPath, boolean ignoreReadFailures) 
-      throws IOException
-    {
-      URI srcURI = toURI(srcPaths[0]);
-      URI destURI = toURI(destPath);
-      
-      FileSystem srcfs = FileSystem.get(srcURI, conf);
-      jobConf.set("copy.src.fs", srcURI.toString());
-      jobConf.set("copy.dest.fs", destURI.toString());
-      
-      String srcPath = srcURI.getPath();
-      if ("".equals(srcPath)) { srcPath = "/"; }
-      destPath = destURI.getPath();
-      if ("".equals(destPath)) { destPath = "/"; }
-      
-      Path tmpPath = new Path(srcPath);
-      Path rootPath = new Path(srcPath);
-      if (srcfs.isFile(tmpPath)) {
-        tmpPath = tmpPath.getParent();
-        rootPath = rootPath.getParent();
-        jobConf.set("copy.src.path", tmpPath.toString());
-      } else {
-        jobConf.set("copy.src.path", srcPath);
-      }
-      jobConf.set("copy.dest.path", destPath);
-      
-      if (!srcfs.exists(tmpPath)) {
-        System.out.println(srcPath+" does not exist.");
+    private void copy(FileStatus srcstat, Path dstpath, Reporter reporter)
+        throws IOException {
+
+      int totfiles = job.getInt("distcp.file.count", -1);
+      assert totfiles >= 0 : "Invalid file count " + totfiles;
+
+      // if a directory, ensure created even if empty
+      if (srcstat.isDir()) {
+        if (!destFileSys.mkdirs(dstpath)) {
+          throw new IOException("Failed to create" + dstpath);
+        }
+        // TODO: when modification times can be set, directories should be
+        // emitted to reducers so they might be preserved. Also, mkdirs does
+        // not currently return an error when the directory already exists;
+        // if this changes, all directory work might as well be done in reduce
         return;
       }
-      
-      // turn off speculative execution, because DFS doesn't handle
-      // multiple writers to the same file.
-      jobConf.setSpeculativeExecution(false);
-      jobConf.setInputFormat(SequenceFileInputFormat.class);
-      
-      jobConf.setMapperClass(FSCopyFilesMapper.class);
-      
-      jobConf.setNumReduceTasks(0);
-      jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
-      
-      Random r = new Random();
-      Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_"
-                                   + Integer.toString(r.nextInt(Integer.MAX_VALUE), 36));
-      Path inDir = new Path(jobDirectory, "in");
-      FileSystem fileSys = FileSystem.get(jobConf);
-      if (!fileSys.mkdirs(inDir)) {
-        throw new IOException("Mkdirs failed to create " +
-                              inDir.toString());
-      }
-      jobConf.set("distcp.job.dir", jobDirectory.toString());
-      
-      jobConf.setInputPath(inDir);
-      jobConf.setOutputPath(logPath);
-      
-      // create new sequence-files for holding paths
-      ArrayList<Path> pathList = new ArrayList<Path>();
-      ArrayList<String> finalPathList = new ArrayList<String>();
-      pathList.add(new Path(srcPath));
-      long totalBytes = 0;
-      //int part = 0;
-      while(!pathList.isEmpty()) {
-        Path top = pathList.remove(0);
-        if (srcfs.isFile(top)) {
-          totalBytes += srcfs.getFileStatus(top).getLen();
-          top = makeRelative(rootPath, top);
-          finalPathList.add(top.toString());
-        } else {
-          Path[] paths = srcfs.listPaths(top);
-          for (int idx = 0; idx < paths.length; idx++) {
-            pathList.add(paths[idx]);
-          }
+      Path destParent = dstpath.getParent();
+      if (totfiles > 1) {
+        // create directories to hold destination file
+        if (destParent != null && !destFileSys.mkdirs(destParent)) {
+          throw new IOException("mkdirs failed to create " + destParent);
         }
+      } else {
+        // Copying a single file; use dst path provided by user as destination
+        // rather than destination directory
+        dstpath = destParent;
       }
-      
-      // ideal number of maps is one per file (if the map-launching overhead
-      // were 0. It is limited by jobtrackers handling capacity, which lets say
-      // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for 
-      // small files it is better to determine number of maps by amount of 
-      // data per map.
-      int numMaps = finalPathList.size();
-      if (numMaps > MAX_NUM_MAPS) {
-        numMaps = MAX_NUM_MAPS;
-      }
-
-      JobClient client = new JobClient(jobConf);
-      jobConf.setNumMapTasks(getMapCount(numMaps, totalBytes, client));
-      
-      for(int idx=0; idx < numMaps; ++idx) {
-        Path file = new Path(inDir, "part"+idx);
-        SequenceFile.Writer writer = 
-          SequenceFile.createWriter(fileSys, conf, file, Text.class, Text.class);
-        for (int ipath = idx; ipath < finalPathList.size(); ipath += numMaps) {
-          String path = finalPathList.get(ipath);
-          writer.append(new Text(path), new Text(""));
+
+      FSDataInputStream in = null;
+      FSDataOutputStream out = null;
+      try {
+        if (destFileSys.exists(dstpath)
+           && (!overwrite && !(update
+               && needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
+          reporter.setStatus("Skipped " + srcstat.getPath());
+          return;
         }
-        writer.close();
-      }
-      finalPathList = null;
-      
-    }
-    
-    @Override
-    public void cleanup(Configuration conf, JobConf jobConf, 
-                        String srcPath, String destPath) 
-      throws IOException
-    {
-      //Clean up jobDirectory
-      Path jobDirectory = new Path(jobConf.get("distcp.job.dir", "/"));
-      FileSystem fs = FileSystem.get(jobConf);
-      
-      if (!jobDirectory.equals(new Path("/"))) {
-        FileUtil.fullyDelete(fs, jobDirectory);
+        // open src file
+        in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
+        long totalBytes = srcstat.getLen();
+        // open dst file
+        out = preserve_status
+          ? destFileSys.create(dstpath, true, sizeBuf, srcstat.getReplication(),
+             srcstat.getBlockSize(), reporter)
+          : destFileSys.create(dstpath, reporter);
+        // copy file
+        int nread;
+        while ((nread = in.read(buffer)) >= 0) {
+          out.write(buffer, 0, nread);
+          bytesSinceLastReport += nread;
+          if (bytesSinceLastReport > reportInterval) {
+            totalBytesCopied += bytesSinceLastReport;
+            bytesSinceLastReport = 0L;
+            reporter.setStatus("Copy " + dstpath + ": " +
+                pcntfmt.format(100.0 * totalBytesCopied / totalBytes) + "% and "
+                + StringUtils.humanReadableInt(totalBytesCopied) + " bytes");
+          }
+        }
+      } finally {
+        if (in != null)
+          in.close();
+        if (out != null)
+          out.close();
       }
+      // report at least once for each file
+      totalBytesCopied += bytesSinceLastReport;
+      bytesSinceLastReport = 0L;
+      reporter.setStatus("Finished. Bytes copied: " +
+                         StringUtils.humanReadableInt(totalBytesCopied));
     }
-    
+
     /** Mapper configuration.
      * Extracts source and destination file system, as well as
      * top-level paths on source and destination directories.
      * Gets the named file systems, to be used later in map.
      */
-    @Override
-    public void configure(JobConf job) 
+    public void configure(JobConf job)
     {
-      String srcfs = job.get("copy.src.fs", "local");
-      String destfs = job.get("copy.dest.fs", "local");
-      srcPath = new Path(job.get("copy.src.path", "/"));
       destPath = new Path(job.get("copy.dest.path", "/"));
       try {
-        srcFileSys = FileSystem.get(new URI(srcfs), job);
-        destFileSys = FileSystem.get(new URI(destfs), job);
-      } catch (URISyntaxException e) {
-        throw new RuntimeException("Failed parse of src or dest URI.", e);
+        destFileSys = destPath.getFileSystem(job);
       } catch (IOException ex) {
         throw new RuntimeException("Unable to get the named file system.", ex);
       }
-      sizeBuf = job.getInt("copy.buf.size", 4096);
+      sizeBuf = job.getInt("copy.buf.size", 128 * 1024);
       buffer = new byte[sizeBuf];
-      ignoreReadFailures = job.getBoolean(readFailuresAttribute, false);
+      ignoreReadFailures = job.getBoolean("distcp.ignore.read.failures", false);
+      preserve_status = job.getBoolean("distcp.preserve.status.info", false);
+      update = job.getBoolean("distcp.overwrite.ifnewer", false);
+      overwrite = !update && job.getBoolean("distcp.overwrite.always", false);
+      this.job = job;
     }
-    
+
     /** Map method. Copies one file from source file system to destination.
-     * @param key source file name
-     * @param value not-used.
-     * @param out not-used.
+     * @param key src len
+     * @param value FilePair (FileStatus src, Path dst)
+     * @param out Log of failed copies
      * @param reporter
      */
-    public void map(Text key,
-                    Writable value,
+    public void map(LongWritable key,
+                    FilePair value,
                     OutputCollector<WritableComparable, Text> out,
                     Reporter reporter) throws IOException {
-      String src = key.toString();
+      FileStatus srcstat = value.input;
+      Path dstpath = value.output;
       try {
-        copy(src, reporter);
+        copy(srcstat, dstpath, reporter);
       } catch (IOException except) {
-        out.collect(null, new Text("Failed to copy " + src + " : " +
-                           StringUtils.stringifyException(except)));
+        out.collect(null, new Text("Failed to copy " + srcstat.getPath() +
+              " : " + StringUtils.stringifyException(except)));
         if (ignoreReadFailures) {
-          reporter.setStatus("Failed to copy " + src + " : " + 
-                             StringUtils.stringifyException(except));
+          reporter.setStatus("Failed to copy " + srcstat.getPath() + " : " +
+              StringUtils.stringifyException(except));
           try {
-            destFileSys.delete(new Path(destPath, src));
+            destFileSys.delete(dstpath);
           } catch (Throwable ex) {
             // ignore, we are just cleaning up
+            LOG.debug("Ignoring cleanup exception", ex);
           }
         } else {
           throw except;
         }
       }
     }
-    
-    @Override
-    public void close() {
-      // nothing
-    }
-    
-  }
-  
-  public static class HTTPCopyFilesMapper extends CopyFilesMapper 
-    implements Mapper 
-  {
-    private URI srcURI = null;
-    private FileSystem destFileSys = null;
-    private Path destPath = null;
-    private JobConf jobConf = null;
-    private boolean ignoreReadFailures;
-    
-    /**
-     * Initialize HTTPCopyFileMapper specific job.
-     * @param conf : The dfs/mapred configuration.
-     * @param jobConf : The handle to the jobConf object to be initialized.
-     * @param srcPaths : The source URI.
-     * @param destPath : The destination URI.
-     * @param logPath : The log Path.
-     * @param ignoreReadFailures : Ignore read failures?
-     */
-    @Override
-    public void setup(Configuration conf, JobConf jobConf, 
-                      String[] srcPaths, String destPath, 
-                      Path logPath, boolean ignoreReadFailures) 
-      throws IOException
-    {
-      //Destination
-      URI destURI = toURI(destPath);
-      jobConf.set("copy.dest.fs", destURI.toString());
-      destPath = destURI.getPath();
-      jobConf.set("copy.dest.path", destPath);
-
-      //Setup the MR-job configuration
-      jobConf.setSpeculativeExecution(false);
-      
-      jobConf.setInputFormat(SequenceFileInputFormat.class);
-      
-      jobConf.setMapperClass(HTTPCopyFilesMapper.class);
-      
-      JobClient client = new JobClient(jobConf);
-      jobConf.setNumMapTasks(getMapCount(srcPaths.length, -1, client));
-      
-      jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
-      
-      FileSystem fileSystem = FileSystem.get(conf);
-      Random r = new Random();
-      Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + 
-                                   Integer.toString(r.nextInt(Integer.MAX_VALUE), 36));
-      Path jobInputDir = new Path(jobDirectory, "in");
-      if (!fileSystem.mkdirs(jobInputDir)) {
-        throw new IOException("Mkdirs failed to create " + jobInputDir.toString());
-      }
-      jobConf.setInputPath(jobInputDir);
-      
-      jobConf.set("distcp.job.dir", jobDirectory.toString());
-      jobConf.setOutputPath(logPath);
-      
-      for(int i=0; i < srcPaths.length; ++i) {
-        Path ipFile = new Path(jobInputDir, "part" + i);
-        SequenceFile.Writer writer = 
-          SequenceFile.createWriter(fileSystem, conf, ipFile,
-                                    Text.class, Text.class);
-        writer.append(new Text(srcPaths[i]), new Text(""));
-        writer.close();
-      }
-    }	
-    
-    @Override
-    public void cleanup(Configuration conf, JobConf jobConf, 
-                        String srcPath, String destPath) 
-      throws IOException
-    {
-      //Clean up jobDirectory
-      Path jobDirectory = new Path(jobConf.get("distcp.job.dir", "/"));
-      FileSystem fs = FileSystem.get(jobConf);
-      
-      if (!jobDirectory.equals(new Path("/"))) {
-        FileUtil.fullyDelete(fs, jobDirectory);
-      }
-    }
-    
-    @Override
-    public void configure(JobConf job)
-    {
-      //Save jobConf
-      jobConf = job;
-      
-      try {
-        //Destination
-        destFileSys = FileSystem.get(URI.create(job.get("copy.dest.fs", "file:///")), job);
-        destPath = new Path(job.get("copy.dest.path", "/"));
-        if (!destFileSys.exists(destPath)) {
-          return;
-        }
-      } catch(IOException ioe) {
-        return;
-      }
-      
-      ignoreReadFailures = job.getBoolean(readFailuresAttribute, false);
-    }
-    
-    public void map(WritableComparable key,
-                    Writable val,
-                    OutputCollector out,
-                    Reporter reporter) throws IOException 
-    {
-      //The url of the file
-      try {
-        srcURI = new URI(((Text)key).toString());
-        
-        //Construct the complete destination path
-        File urlPath = new File(srcURI.getPath());
-        Path destinationPath = new Path(destPath, urlPath.getName());
-        
-        //Copy the file 
-        URL url = srcURI.toURL();
-        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
-        connection.setRequestMethod("GET");
-        connection.connect();
-        
-        int bufferSize = jobConf.getInt("io.file.buffer.size", 4096);
-        byte[] buffer = new byte[bufferSize];
-        BufferedInputStream is = 
-          new BufferedInputStream(connection.getInputStream());
-        
-        FSDataOutputStream os = destFileSys.create(destinationPath, true, 
-                                                   bufferSize, (short)jobConf.getInt("dfs.replication", 3), 
-                                                   jobConf.getLong("dfs.block.size", 67108864));
-        
-        int readBytes = 0;
-        while((readBytes = is.read(buffer, 0, bufferSize)) != -1) {
-          os.write(buffer, 0, readBytes);
-        }
-        
-        is.close();
-        os.close();
-        connection.disconnect();
-        
-        reporter.setStatus("Copied: " + srcURI.toString() + 
-                           " to: " + destinationPath.toString());
-        
-      } catch (URISyntaxException e) {
-        handleException(reporter, (Text)key, e);
-      } catch (IOException ioe) {
-        handleException(reporter,(Text)key, ioe);
-      }
-    }
 
-    /* handle exceptions */
-    private void handleException( Reporter reporter, Text key, Throwable e )
-    throws IOException {
-      String errMsg = "Failed to copy from: " + key;
-      reporter.setStatus(errMsg);
-      if ( !ignoreReadFailures ) {
-        throw new IOException(errMsg);
-      }
-    }
+    public void close() { }
+
   }
-    
-  /**
-   * Factory to create requisite Mapper objects for distcp.
-   */
-  private static class CopyMapperFactory
-  {
-    public static CopyFilesMapper getMapper(Configuration conf, String protocol)
-      throws IOException
-    {
-      CopyFilesMapper mapper = null;
-      if (protocol == null) {
-        // Use 'default' filesystem.
-        protocol = FileSystem.get(conf).getUri().getScheme();
-      }
-      protocol = protocol.toLowerCase();
-      
-      if (HDFS.equalsIgnoreCase(protocol) || "file".equalsIgnoreCase(protocol) ||
-          S3.equalsIgnoreCase(protocol)) {
-        mapper = new FSCopyFilesMapper();
-      } else if ("http".equalsIgnoreCase(protocol)) {
-        mapper = new HTTPCopyFilesMapper();
-      }
-      
-      return mapper;
-    }
-  }
-  
-  private static String[] fetchSrcURIs(Configuration conf, URI srcListURI) throws IOException
-  {
-    ArrayList<String> uris = new ArrayList<String>();
-    BufferedReader fis = null;
-    
-    String srcListURIScheme = srcListURI.getScheme();
-    String srcListURIPath = srcListURI.getPath();
-    
-    if ("file".equalsIgnoreCase(srcListURIScheme)) {
-      fis = new BufferedReader(new FileReader(srcListURIPath));
-    } else if (srcListURIScheme != null &&
-               HDFS.equalsIgnoreCase(srcListURIScheme)) {
-      FileSystem fs = FileSystem.get(srcListURI, conf);
-      fis = new BufferedReader(
-                               new InputStreamReader(fs.open(new Path(srcListURIPath)))
-                               );
-    } else if ("http".equalsIgnoreCase(srcListURIScheme)) {
-      //Copy the file 
-      URL url = srcListURI.toURL();
-      HttpURLConnection connection = (HttpURLConnection)url.openConnection();
-      connection.setRequestMethod("GET");
-      connection.connect();
-      
-      fis = new BufferedReader(
-                               new InputStreamReader(connection.getInputStream())
-                               );
-    } else {
-      throw new IOException("Unsupported source list uri: " + srcListURIScheme);
-    }
 
+  private static List<Path> fetchFileList(Configuration conf, Path srcList)
+      throws IOException {
+    List<Path> result = new ArrayList<Path>();
+    FileSystem fs = srcList.getFileSystem(conf);
+    DataInputStream raw = fs.open(srcList);
+    BufferedReader input = null;
     try {
-      String uri = null;
-      while((uri = fis.readLine()) != null) {
-        if (!uri.startsWith("#")) {
-          // Check source is parseable as URI by passing via getPathURI.
-          toURI(uri);
-          uris.add(uri);
-        }
-      }
-    } catch (Exception e) {
-      if (fis != null) {
-        fis.close();
+      input = new BufferedReader(new InputStreamReader(raw));
+      String line = input.readLine();
+      while (line != null) {
+        result.add(new Path(line));
+        line = input.readLine();
       }
     } finally {
-      if (fis != null) {
-        fis.close();
+      if (input != null) {
+        input.close();
       }
     }
-
-    return !uris.isEmpty()? uris.toArray(new String[0]): null;
+    return result;
   }
-  
-  /**
-   * Helper function to parse input file and return source urls for 
-   * a given protocol.
-   * @param protocol The protocol for which to find source urls.
-   * @param inputFilePath The file containing the urls.
-   * @return
-   */
-  private static String[] parseInputFile(String protocol, String[] uris)
-    throws IOException
-  {
-    ArrayList<String> protocolURIs = new ArrayList<String>(uris.length);
-    
-    for(int i=0; i < uris.length; ++i) {
-      // uri must start w/ protocol 
-      if (uris[i].startsWith(protocol)) {
-        protocolURIs.add(uris[i]);
-      }
-    }
-    
-    return !protocolURIs.isEmpty()? protocolURIs.toArray(new String[0]): null;
-  }
-  
-  public static URI toURI(final String u) throws IOException {
-    URI result = null;
-    try {
-      result = new URI(u);
-    } catch (URISyntaxException ex) {
-      throw new IOException("Path does not parse as URI: " + u);
+
+  @Deprecated
+  public static void copy(Configuration conf, String srcPath,
+                          String destPath, Path logPath,
+                          boolean srcAsList, boolean ignoreReadFailures)
+      throws IOException {
+    final Path src = new Path(srcPath);
+    List<Path> tmp = new ArrayList<Path>();
+    if (srcAsList) {
+      tmp.addAll(fetchFileList(conf, src));
+    } else {
+      tmp.add(src);
     }
-    return result;
+    EnumSet<cpOpts> flags = ignoreReadFailures
+      ? EnumSet.of(cpOpts.IGNORE_READ_FAILURES)
+      : EnumSet.noneOf(cpOpts.class);
+    copy(conf, tmp, new Path(destPath), logPath, flags);
   }
-  
+
   /**
    * Driver to copy srcPath to destPath depending on required protocol.
-   * @param conf Configuration
-   * @param srcPath Source path URL
-   * @param destPath Destination path URL
-   * @param logPath the log path
-   * @param srcAsList List of source URLs to copy.
-   * @param ignoreReadFailures True if we are to ignore read failures.
+   * @param srcPaths list of source paths
+   * @param destPath Destination path
+   * @param logPath Log output directory
+   * @param flags Command-line flags
    */
-  public static void copy(Configuration conf, String srcPath,
-                          String destPath, Path logPath,
-                          boolean srcAsList, boolean ignoreReadFailures) 
-    throws IOException
-  {
+  public static void copy(Configuration conf, List<Path> srcPaths,
+      Path destPath, Path logPath,
+      EnumSet<cpOpts> flags) throws IOException {
     //Job configuration
-    JobConf jobConf = new JobConf(conf, CopyFiles.class);
-    jobConf.setJobName("distcp");
-    
-    //Sanity check for srcPath/destPath
-    URI srcURI = toURI(srcPath);
-    toURI(destPath);
-  
-    // default logPath
-    if (logPath == null) {
-      logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" +
-                         System.currentTimeMillis());
-    }
+    JobConf job = new JobConf(conf, CopyFiles.class);
+    job.setJobName("distcp");
 
-    //Create the task-specific mapper 
-    CopyFilesMapper mapper = null;
-    String[] srcPaths = null;
-    if (srcAsList) {
-      //Ugly?!
-      
-      //Source paths
-      srcPaths = fetchSrcURIs(conf, srcURI);  
-      
-      // Protocol - 'hdfs://'
-      String[] dfsUrls = parseInputFile(HDFS, srcPaths);
-      if (dfsUrls != null) {
-        for(int i=0; i < dfsUrls.length; ++i) {
-          copy(conf, dfsUrls[i], destPath, logPath, false, ignoreReadFailures);
-        }
-      }
-      
-      // Protocol - 'file://'
-      String[] localUrls = parseInputFile("file", srcPaths);
-      if (localUrls != null) {
-        for(int i=0; i < localUrls.length; ++i) {
-          copy(conf, localUrls[i], destPath, logPath, false, ignoreReadFailures);
-        }
-      }
-      
-      // Protocol - 'http://'
-      String[] httpUrls = parseInputFile("http", srcPaths);
-      if (httpUrls != null) {
-        srcPaths = httpUrls;
-        mapper = CopyMapperFactory.getMapper(conf, "http");
-      } else {   
-        // Protocol - 's3://'
-        String[] s3Urls = parseInputFile(S3, srcPaths);
-        if (s3Urls != null) {
-          srcPaths = s3Urls;
-          mapper = CopyMapperFactory.getMapper(conf, S3);
-        } else {   
-          return;
-        }
+    //Sanity check for srcPath/destPath
+    List<IOException> rslt = new ArrayList<IOException>();
+    for (Path p : srcPaths) {
+      FileSystem fs = p.getFileSystem(conf);
+      if (!fs.exists(p)) {
+        rslt.add(new IOException("Input source " + p + " does not exist."));
       }
-      
-      // TODO: Add support for URIs w/o scheme (In this case, use the 'default'
-      // filesystem).
-      
-    } else {
-      //Single source - ugly!
-      srcPaths = new String [] {srcPath};
-      mapper = CopyMapperFactory.getMapper(conf, srcURI.getScheme());
     }
-    
+    if (!rslt.isEmpty()) {
+      throw new InvalidInputException(rslt);
+    }
+
     //Initialize the mapper
-    mapper.setup(conf, jobConf, srcPaths, destPath, logPath, ignoreReadFailures);
-    
-    //We are good to go!
     try {
-      JobClient.runJob(jobConf);
+      setup(conf, job, srcPaths, destPath, logPath, flags);
+      JobClient.runJob(job);
     } finally {
-      mapper.cleanup(conf, jobConf, srcPath, destPath);
+      cleanup(conf, job);
     }
   }
-  
+
+  enum cpOpts { IGNORE_READ_FAILURES,
+                PRESERVE_STATUS,
+                OVERWRITE,
+                UPDATE }
+
   /**
    * This is the main driver for recursively copying directories
    * across file systems. It takes at least two cmdline parameters. A source
@@ -809,84 +456,255 @@
    * reduce is empty.
    */
   public int run(String[] args) throws Exception {
-    String srcPath = null;
-    String destPath = null;
+    List<Path> srcPath = new ArrayList<Path>();
+    Path destPath = null;
     Path logPath = null;
-    boolean ignoreReadFailures = false;
-    boolean srcAsList = false;
-    
+    EnumSet<cpOpts> flags = EnumSet.noneOf(cpOpts.class);
+
     for (int idx = 0; idx < args.length; idx++) {
       if ("-i".equals(args[idx])) {
-        ignoreReadFailures = true;
+        flags.add(cpOpts.IGNORE_READ_FAILURES);
+      } else if ("-p".equals(args[idx])) {
+        flags.add(cpOpts.PRESERVE_STATUS);
+      } else if ("-overwrite".equals(args[idx])) {
+        flags.add(cpOpts.OVERWRITE);
+      } else if ("-update".equals(args[idx])) {
+        flags.add(cpOpts.UPDATE);
       } else if ("-f".equals(args[idx])) {
-        srcAsList = true;
-      } else if (srcPath == null) {
-        srcPath = args[idx];
-      } else if (destPath == null) {
-        destPath = args[idx];
+        if (++idx ==  args.length) {
+          System.out.println("urilist_uri not specified");
+          System.out.println(usage);
+          return -1;
+        }
+        srcPath.addAll(fetchFileList(conf, new Path(args[idx])));
+
       } else if ("-log".equals(args[idx])) {
-        logPath = new Path(toURI(args[++idx]).getPath());
-      } else {
+        if (++idx ==  args.length) {
+          System.out.println("logdir not specified");
+          System.out.println(usage);
+          return -1;
+        }
+        logPath = new Path(args[idx]);
+      } else if ('-' == args[idx].codePointAt(0)) {
+        System.out.println("Invalid switch " + args[idx]);
         System.out.println(usage);
         ToolRunner.printGenericCommandUsage(System.out);
         return -1;
+      } else if (idx == args.length -1) {
+        destPath = new Path(args[idx]);
+      } else {
+        srcPath.add(new Path(args[idx]));
       }
     }
-    
     // mandatory command-line parameters
-    if (srcPath == null || destPath == null) {
+    if (srcPath.isEmpty() || destPath == null) {
+      System.out.println("Missing " + (destPath == null ? "dst path" : "src"));
       System.out.println(usage);
       ToolRunner.printGenericCommandUsage(System.out);
       return -1;
     }
-  
-    // default logPath
-    if (logPath == null) {
-      logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" +
-                         System.currentTimeMillis());
-      System.out.println("Using default logPath: " + logPath);
-    }
-
-    // verify if srcPath, destPath are valid and logPath is valid and doesnot exist
-    try {
-      URI srcURI = toURI(srcPath);
-      FileSystem srcfs = FileSystem.get(srcURI, conf);
-      if (!srcfs.exists(new Path(srcURI.getPath()))) {
-        System.out.println(srcPath + " does not exist.");
-        return -1;
-      }
-
-      URI destURI = toURI(destPath);
-      FileSystem destfs = FileSystem.get(destURI, conf);
-      if (destfs.exists(new Path(destURI.getPath()))) {
-        System.out.println("WARNING: " + destPath + " already exists.");
-      }
-
-      FileSystem logfs = FileSystem.get(logPath.toUri(), conf);
-      if (logfs.exists(logPath)) {
-        System.out.println("ERROR: " + logPath + " already exists.");
-        return -1;
-      }
-    } catch (Exception e) {
-      System.err.println("Copy failed: " + StringUtils.stringifyException(e));
+    // incompatible command-line flags
+    if (flags.contains(cpOpts.OVERWRITE) && flags.contains(cpOpts.UPDATE)) {
+      System.out.println("Conflicting overwrite policies");
+      System.out.println(usage);
       return -1;
     }
-
     try {
-      copy(conf, srcPath, destPath, logPath, srcAsList, ignoreReadFailures);
+      copy(conf, srcPath, destPath, logPath, flags);
     } catch (Exception e) {
-      System.err.println("Copy failed: "+StringUtils.stringifyException(e));
+      System.err.println("Copy failed: " + StringUtils.stringifyException(e));
       return -1;
     }
-    
     return 0;
   }
-  
+
   public static void main(String[] args) throws Exception {
-    JobConf job = new JobConf(new Configuration(), CopyFiles.class);
-    CopyFiles distcp = new CopyFiles();
-    distcp.setConf(job);
+    JobConf job = new JobConf(CopyFiles.class);
+    CopyFiles distcp = new CopyFiles(job);
     int res = ToolRunner.run(distcp, args);
     System.exit(res);
   }
+
+  /**
+   * Make a path relative with respect to a root path.
+   * absPath is always assumed to descend from root.
+   * Otherwise returned path is null.
+   */
+  public static Path makeRelative(Path root, Path absPath) {
+    if (!absPath.isAbsolute()) { return absPath; }
+    String sRoot = root.toUri().getPath();
+    String sPath = absPath.toUri().getPath();
+    Enumeration<Object> rootTokens = new StringTokenizer(sRoot, "/");
+    ArrayList rList = Collections.list(rootTokens);
+    Enumeration<Object> pathTokens = new StringTokenizer(sPath, "/");
+    ArrayList pList = Collections.list(pathTokens);
+    Iterator rIter = rList.iterator();
+    Iterator pIter = pList.iterator();
+    while (rIter.hasNext()) {
+      String rElem = (String) rIter.next();
+      String pElem = (String) pIter.next();
+      if (!rElem.equals(pElem)) { return null; }
+    }
+    StringBuffer sb = new StringBuffer();
+    while (pIter.hasNext()) {
+      String pElem = (String) pIter.next();
+      sb.append(pElem);
+      if (pIter.hasNext()) { sb.append("/"); }
+    }
+    return new Path(sb.toString());
+  }
+
+  /**
+   * Calculate how many maps to run.
+   * Number of maps is bounded by a minimum of the cumulative size of the copy /
+   * BYTES_PER_MAP and at most MAX_MAPS_PER_NODE * nodes in the
+   * cluster.
+   * @param totalBytes Count of total bytes for job
+   * @param numNodes the number of nodes in cluster
+   * @return Count of maps to run.
+   */
+  private static int getMapCount(long totalBytes, int numNodes) {
+    int numMaps = (int)(totalBytes / BYTES_PER_MAP);
+    numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
+    return Math.max(numMaps, 1);
+  }
+  /**
+   * Delete the temporary dir containing the src file list.
+   * @param conf The dfs/mapred configuration
+   * @param jobConf The handle to the jobConf object
+   */
+  private static void cleanup(Configuration conf, JobConf jobConf)
+      throws IOException {
+    //Clean up jobDirectory
+    String jobDirName = jobConf.get("distdp.job.dir");
+    if (jobDirName != null) {
+      Path jobDirectory = new Path(jobDirName);
+      FileSystem fs = jobDirectory.getFileSystem(jobConf);
+      FileUtil.fullyDelete(fs, jobDirectory);
+    }
+  }
+
+  /**
+   * Initialize DFSCopyFileMapper specific job-configuration.
+   * @param conf : The dfs/mapred configuration.
+   * @param jobConf : The handle to the jobConf object to be initialized.
+   * @param srcPaths : The source URIs.
+   * @param destPath : The destination URI.
+   * @param logPath : Log output directory
+   * @param flags : Command-line flags
+   */
+  private static void setup(Configuration conf, JobConf jobConf, 
+                            List<Path> srcPaths, Path destPath, 
+                            Path logPath, EnumSet<cpOpts> flags)
+      throws IOException {
+    boolean update;
+    boolean overwrite;
+    jobConf.set("copy.dest.path", destPath.toUri().toString());
+
+    // turn off speculative execution, because DFS doesn't handle
+    // multiple writers to the same file.
+    jobConf.setSpeculativeExecution(false);
+
+    jobConf.setInputFormat(CopyInputFormat.class);
+
+    jobConf.setOutputKeyClass(Text.class);
+    jobConf.setOutputValueClass(Text.class);
+
+    jobConf.setMapperClass(FSCopyFilesMapper.class);
+
+    jobConf.setNumReduceTasks(0);
+    jobConf.setBoolean("distcp.ignore.read.failures",
+        flags.contains(cpOpts.IGNORE_READ_FAILURES));
+    jobConf.setBoolean("distcp.preserve.status.info",
+        flags.contains(cpOpts.PRESERVE_STATUS));
+    jobConf.setBoolean("distcp.overwrite.ifnewer",
+        update = flags.contains(cpOpts.UPDATE));
+    jobConf.setBoolean("distcp.overwrite.always",
+        overwrite = !update && flags.contains(cpOpts.OVERWRITE));
+
+    Random r = new Random();
+    String randomId = Integer.toString(r.nextInt(Integer.MAX_VALUE), 36);
+    Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + randomId);
+    jobConf.set("distcp.job.dir", jobDirectory.toString());
+    Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
+    jobConf.set("distcp.src.list", srcfilelist.toString());
+
+    // default logPath
+    FileSystem dstfs = destPath.getFileSystem(conf);
+    if (logPath == null) {
+      FileStatus stat = dstfs.getFileStatus(destPath);
+      String filename = "_distcp_logs_" + randomId;
+      if (!stat.isDir()) {
+        logPath = new Path(destPath.getParent(), filename);        
+      } else {
+        logPath = new Path(destPath, filename);
+      }
+    }
+    jobConf.setOutputPath(logPath);
+
+    // create src list
+    SequenceFile.Writer writer = SequenceFile.createWriter(
+        jobDirectory.getFileSystem(jobConf), jobConf, srcfilelist,
+        LongWritable.class, FilePair.class,
+        SequenceFile.CompressionType.NONE);
+
+    int cnfiles = 0;
+    long cbsize = 0L;
+    try {
+      // handle the case where the destination directory doesn't exist
+      // and we've only a single src directory OR we're updating/overwriting
+      // the contents of the destination directory.
+      final boolean special_case =
+        (srcPaths.size() == 1 && !dstfs.exists(destPath))
+        || update || overwrite;
+      int cnsyncf = 0;
+      long cbsyncs = 0L;
+      for (Path p : srcPaths) {
+        Path root = p.getParent();
+        FileSystem fs = p.getFileSystem(conf);
+
+        if (special_case && fs.getFileStatus(p).isDir()) {
+          root = p;
+        }
+
+        Stack<Path> pathstack = new Stack<Path>();
+        pathstack.push(p);
+        while (!pathstack.empty()) {
+          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
+            if (stat.isDir()) {
+              pathstack.push(stat.getPath());
+            } else {
+              ++cnsyncf;
+              cbsyncs += stat.getLen();
+              ++cnfiles;
+              cbsize += stat.getLen();
+            }
+            if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
+              writer.sync();
+              cnsyncf = 0;
+              cbsyncs = 0L;
+            }
+            writer.append(new LongWritable(stat.isDir() ? 0 : stat.getLen()),
+                          new FilePair(stat, new Path(destPath,
+                              makeRelative(root, stat.getPath()))));
+          }
+        }
+      }
+    } finally {
+      writer.close();
+    }
+
+    // create dest path dir if copying > 1 file
+    if (cnfiles > 1 && !dstfs.mkdirs(destPath)) {
+      throw new IOException("Failed to create" + destPath);
+    }
+
+    jobConf.setInt("distcp.file.count", cnfiles);
+    jobConf.setLong("distcp.total.size", cbsize);
+
+    JobClient client = new JobClient(jobConf);
+    jobConf.setNumMapTasks(getMapCount(cbsize,
+          client.getClusterStatus().getTaskTrackers()));
+  }
+
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=573166&r1=573165&r2=573166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu Sep  6 00:04:31 2007
@@ -50,12 +50,13 @@
     private static String[] dirNames = {
       "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
     };
-    private String name = "";
-    private int size;
-    private long seed;
-    
+    private final String name;
+    private int size = 0;
+    private long seed = 0L;
+
     MyFile() {
       int nLevels = gen.nextInt(MAX_LEVELS);
+      String xname = "";
       if (nLevels != 0) {
         int[] levels = new int[nLevels];
         for (int idx = 0; idx < nLevels; idx++) {
@@ -66,20 +67,23 @@
           sb.append(dirNames[levels[idx]]);
           sb.append("/");
         }
-        name = sb.toString();
+        xname = sb.toString();
       }
-      long fidx = -1;
-      while (fidx < 0) { fidx = gen.nextLong(); }
-      name = name + Long.toString(fidx);
-      size = gen.nextInt(MAX_SIZE);
-      seed = gen.nextLong();
+      long fidx = gen.nextLong() & Long.MAX_VALUE;
+      name = xname + Long.toString(fidx);
+      reset();
+    }
+    void reset() {
+      final int oldsize = size;
+      do { size = gen.nextInt(MAX_SIZE); } while (oldsize == size);
+      final long oldseed = seed;
+      do { seed = gen.nextLong() & Long.MAX_VALUE; } while (oldseed == seed);
     }
-    
     String getName() { return name; }
     int getSize() { return size; }
     long getSeed() { return seed; }
   }
-  
+
   public TestCopyFiles(String testName) {
     super(testName);
   }
@@ -126,7 +130,7 @@
     
     return files;
   }
-  
+
   /** check if the files have been copied correctly. */
   private static boolean checkFiles(String fsname, String topdir, MyFile[] files) 
     throws IOException {
@@ -155,7 +159,67 @@
     
     return true;
   }
-  
+
+  private static void updateFiles(String fsname, String topdir, MyFile[] files,
+        int nupdate) throws IOException {
+    assert nupdate <= NFILES;
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+
+    for (int idx = 0; idx < nupdate; ++idx) {
+      Path fPath = new Path(root, files[idx].getName());
+      // overwrite file
+      assertTrue(fPath.toString() + " does not exist", fs.exists(fPath));
+      FSDataOutputStream out = fs.create(fPath);
+      files[idx].reset();
+      byte[] toWrite = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toWrite);
+      out.write(toWrite);
+      out.close();
+    }
+  }
+
+  private static FileStatus[] getFileStatus(String namenode,
+      String topdir, MyFile[] files) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(namenode, conf);
+    Path root = new Path(topdir);
+    FileStatus[] ret = new FileStatus[NFILES];
+    for (int idx = 0; idx < NFILES; ++idx) {
+      ret[idx] = fs.getFileStatus(new Path(root, files[idx].getName()));
+    }
+    return ret;
+  }
+
+  private static boolean checkUpdate(FileStatus[] old, String namenode,
+      String topdir, MyFile[] upd, final int nupdate) throws IOException {
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(namenode, conf);
+    Path root = new Path(topdir);
+
+    // overwrote updated files
+    for (int idx = 0; idx < nupdate; ++idx) {
+      final FileStatus stat =
+        fs.getFileStatus(new Path(root, upd[idx].getName()));
+      if (stat.getModificationTime() <= old[idx].getModificationTime()) {
+        return false;
+      }
+    }
+    // did not overwrite files not updated
+    for (int idx = nupdate; idx < NFILES; ++idx) {
+      final FileStatus stat =
+        fs.getFileStatus(new Path(root, upd[idx].getName()));
+      if (stat.getModificationTime() != old[idx].getModificationTime()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /** delete directory and everything underneath it.*/
   private static void deldir(String fsname, String topdir)
     throws IOException {
@@ -169,8 +233,8 @@
   public void testCopyFromLocalToLocal() throws Exception {
     MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
     ToolRunner.run(new CopyFiles(new Configuration()),
-                           new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
-                                         "file://"+TEST_ROOT_DIR+"/destdat"});
+                           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
+                                         "file:///"+TEST_ROOT_DIR+"/destdat"});
     assertTrue("Source and destination directories do not match.",
                checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
     deldir("local", TEST_ROOT_DIR+"/destdat");
@@ -187,14 +251,15 @@
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {"hdfs://"+namenode+"/srcdat",
-                                                   "hdfs://"+namenode+"/destdat",
-                                                   "-log",
-                                                   "hdfs://"+namenode+"/logs"});
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(namenode, "/destdat", files));
         FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
-        assertTrue("Log directory doesnot exist.",
+        assertTrue("Log directory does not exist.",
                     fs.exists(new Path("hdfs://"+namenode+"/logs")));
         deldir(namenode, "/destdat");
         deldir(namenode, "/srcdat");
@@ -215,14 +280,15 @@
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
-                                                   "hdfs://"+namenode+"/destdat",
-                                                   "-log",
-                                                   "hdfs://"+namenode+"/logs"});
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "file:///"+TEST_ROOT_DIR+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(namenode, "/destdat", files));
         FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
-        assertTrue("Log directory doesnot exist.",
+        assertTrue("Log directory does not exist.",
                     fs.exists(new Path("hdfs://"+namenode+"/logs")));
         deldir(namenode, "/destdat");
         deldir(namenode, "/logs");
@@ -243,14 +309,15 @@
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {"hdfs://"+namenode+"/srcdat",
-                                                   "file://"+TEST_ROOT_DIR+"/destdat",
-                                                   "-log",
-                                                   "/logs"});
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-log",
+                                         "/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "file:///"+TEST_ROOT_DIR+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
         FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
-        assertTrue("Log directory doesnot exist.",
+        assertTrue("Log directory does not exist.",
                     fs.exists(new Path("/logs")));
         deldir("local", TEST_ROOT_DIR+"/destdat");
         deldir(namenode, "/logs");
@@ -260,5 +327,65 @@
       if (cluster != null) { cluster.shutdown(); }
     }
   }
-  
+
+  public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
+    String namenode = null;
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(conf, 2, true, null);
+      namenode = conf.get("fs.default.name", "local");
+      if (!"local".equals(namenode)) {
+        MyFile[] files = createFiles(namenode, "/srcdat");
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-p",
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(namenode, "/destdat", files));
+        FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
+        assertTrue("Log directory does not exist.",
+                    fs.exists(new Path("hdfs://"+namenode+"/logs")));
+
+        FileStatus[] dchkpoint = getFileStatus(namenode, "/destdat", files);
+        final int nupdate = NFILES>>2;
+        updateFiles(namenode, "/srcdat", files, nupdate);
+        deldir(namenode, "/logs");
+
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-p",
+                                         "-update",
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(namenode, "/destdat", files));
+        assertTrue("Update failed to replicate all changes in src",
+                 checkUpdate(dchkpoint, namenode, "/destdat", files, nupdate));
+
+        deldir(namenode, "/logs");
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-p",
+                                         "-overwrite",
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(namenode, "/destdat", files));
+        assertTrue("-overwrite didn't.",
+                 checkUpdate(dchkpoint, namenode, "/destdat", files, NFILES));
+
+        deldir(namenode, "/destdat");
+        deldir(namenode, "/srcdat");
+        deldir(namenode, "/logs");
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+
 }