You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/14 09:48:05 UTC

svn commit: r421826 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/util/CopyFiles.java

Author: cutting
Date: Fri Jul 14 00:48:05 2006
New Revision: 421826

URL: http://svn.apache.org/viewvc?rev=421826&view=rev
Log:
HADOOP-341.  Permit input URIs to CopyFiles to use the HTTP protocol.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=421826&r1=421825&r2=421826&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jul 14 00:48:05 2006
@@ -27,6 +27,10 @@
  7. HADOOP-359.  Permit map output to be compressed.
    (omalley via cutting)
 
+ 8. HADOOP-341.  Permit input URI to CopyFiles to use the HTTP
+    protocol.  This lets one, e.g., more easily copy log files into
+    DFS.  (Arun C Murthy via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=421826&r1=421825&r2=421826&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Fri Jul 14 00:48:05 2006
@@ -16,15 +16,23 @@
 
 package org.apache.hadoop.util;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.FileReader;
 import java.io.IOException;
+import java.io.File;
+import java.net.HttpURLConnection;
 import java.net.URI;
+import java.net.URL;
 import java.net.URISyntaxException;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.Random;
 import java.util.StringTokenizer;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,30 +62,104 @@
 public class CopyFiles extends ToolBase {
   
   private static final String usage = "distcp "+
-          "[-fs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
-          "[-conf <config-file.xml>] " + "[-D <property=value>] "+
-          "[-i] <srcurl> <desturl>";
+  "[-fs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
+  "[-conf <config-file.xml>] " + "[-D <property=value>] "+
+  "[-i] <srcurl> <desturl>";
   
   private static final long MIN_BYTES_PER_MAP = 1L << 28;
   private static final int MAX_NUM_MAPS = 10000;
   private static final int MAX_MAPS_PER_NODE = 10;
+  
   private static final String readFailuresAttribute = 
-     "distcp.ignore.read.failures";
+    "distcp.ignore.read.failures";
   
   public void setConf(Configuration conf) {
-      if (conf instanceof JobConf) {
-        this.conf = (JobConf) conf;
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+  }
+  
+  /**
+   * Base-class for all mappers for distcp
+   * @author Arun C Murthy
+   */
+  public static abstract class CopyFilesMapper extends MapReduceBase 
+  {
+    /**
+     * Interface to initialize *distcp* specific map tasks.
+     * @param conf : The dfs/mapred configuration.
+     * @param jobConf : The handle to the jobConf object to be initialized.
+     * @param srcPaths : The source paths.
+     * @param destPath : The destination path.
+     * @param ignoreReadFailures : Ignore read failures?
+     * @throws IOException
+     */
+    public abstract void setup(Configuration conf, JobConf jobConf, 
+        String[] srcPaths, String destPath, boolean ignoreReadFailures) 
+    throws IOException;
+    
+    /**
+     * Interface to cleanup *distcp* specific resources
+     * @param conf : The dfs/mapred configuration.
+     * @param jobConf : The handle to the jobConf object to be initialized.
+     * @param srcPath : The source uri.
+     * @param destPath : The destination uri.
+     * @throws IOException
+     */
+    public abstract void cleanup(Configuration conf, JobConf jobConf, 
+        String srcPath, String destPath) throws IOException;
+    
+    public String getFileSysName(URI url) {
+      String fsname = url.getScheme();
+      if ("dfs".equals(fsname)) {
+        String host = url.getHost();
+        int port = url.getPort();
+        return (port==(-1)) ? host : (host+":"+port);
       } else {
-        this.conf = new JobConf(conf);
+        return "local";
+      }
+    }
+    
+    /**
+     * Make a path relative with respect to a root path.
+     * absPath is always assumed to descend from root.
+     * Otherwise returned path is null.
+     */
+    public Path makeRelative(Path root, Path absPath) {
+      if (!absPath.isAbsolute()) { return absPath; }
+      String sRoot = root.toString();
+      String sPath = absPath.toString();
+      Enumeration rootTokens = new StringTokenizer(sRoot, "/");
+      ArrayList rList = Collections.list(rootTokens);
+      Enumeration pathTokens = new StringTokenizer(sPath, "/");
+      ArrayList pList = Collections.list(pathTokens);
+      Iterator rIter = rList.iterator();
+      Iterator pIter = pList.iterator();
+      while (rIter.hasNext()) {
+        String rElem = (String) rIter.next();
+        String pElem = (String) pIter.next();
+        if (!rElem.equals(pElem)) { return null; }
       }
+      StringBuffer sb = new StringBuffer();
+      while (pIter.hasNext()) {
+        String pElem = (String) pIter.next();
+        sb.append(pElem);
+        if (pIter.hasNext()) { sb.append("/"); }
+      }
+      return new Path(sb.toString());
+    }
+    
   }
   
   /**
-   * Mappper class for Copying files.
+   * DFSCopyFilesMapper: The mapper for copying files from the DFS.
+   * @author Milind Bhandarkar
    */
-  
-  public static class CopyFilesMapper extends MapReduceBase implements Mapper {
-    
+  public static class DFSCopyFilesMapper extends CopyFilesMapper 
+  implements Mapper 
+  {
     private int sizeBuf = 4096;
     private FileSystem srcFileSys = null;
     private FileSystem destFileSys = null;
@@ -109,14 +191,14 @@
         out.write(buffer, 0, nread);
         bytesSinceLastReport += nread;
         if (bytesSinceLastReport > reportInterval) {
-            totalBytesCopied += bytesSinceLastReport;
-            bytesSinceLastReport = 0L;
-            reporter.setStatus("Copy "+ src + ": " + 
-                               percentFormat.format(100.0 * totalBytesCopied / 
-                                                    totalBytes) +
-                               "% and " +
-                               StringUtils.humanReadableInt(totalBytesCopied) +
-                               " bytes");
+          totalBytesCopied += bytesSinceLastReport;
+          bytesSinceLastReport = 0L;
+          reporter.setStatus("Copy "+ src + ": " + 
+              percentFormat.format(100.0 * totalBytesCopied / 
+                  totalBytes) +
+                  "% and " +
+                  StringUtils.humanReadableInt(totalBytesCopied) +
+          " bytes");
         }
       }
       
@@ -126,7 +208,151 @@
       totalBytesCopied += bytesSinceLastReport;
       bytesSinceLastReport = 0L;
       reporter.setStatus("Finished. Bytes copied: " + 
-                         StringUtils.humanReadableInt(totalBytesCopied));
+          StringUtils.humanReadableInt(totalBytesCopied));
+    }
+    
+    /**
+     * Initialize DFSCopyFileMapper specific job-configuration.
+     * @param conf : The dfs/mapred configuration.
+     * @param jobConf : The handle to the jobConf object to be initialized.
+     * @param srcPaths : The source URIs.
+     * @param destPath : The destination URI.
+     * @param ignoreReadFailures : Ignore read failures?
+     */
+    public void setup(Configuration conf, JobConf jobConf, 
+        String[] srcPaths, String destPath, 
+        boolean ignoreReadFailures) 
+    throws IOException
+    {
+      URI srcURI = null;
+      URI destURI = null;
+      try {
+        srcURI = new URI(srcPaths[0]);
+        destURI = new URI(destPath);
+      } catch (URISyntaxException ex) {
+        throw new RuntimeException("URL syntax error.", ex);
+      }
+      
+      String srcFileSysName = getFileSysName(srcURI);
+      String destFileSysName = getFileSysName(destURI);
+      jobConf.set("copy.src.fs", srcFileSysName);
+      jobConf.set("copy.dest.fs", destFileSysName);
+      
+      FileSystem srcfs = FileSystem.getNamed(srcFileSysName, conf);
+      
+      String srcPath = srcURI.getPath();
+      if ("".equals(srcPath)) { srcPath = "/"; }
+      destPath = destURI.getPath();
+      if ("".equals(destPath)) { destPath = "/"; }
+      
+      Path tmpPath = new Path(srcPath);
+      Path rootPath = new Path(srcPath);
+      if (srcfs.isFile(tmpPath)) {
+        tmpPath = tmpPath.getParent();
+        rootPath = rootPath.getParent();
+        jobConf.set("copy.src.path", tmpPath.toString());
+      } else {
+        jobConf.set("copy.src.path", srcPath);
+      }
+      jobConf.set("copy.dest.path", destPath);
+      
+      if (!srcfs.exists(tmpPath)) {
+        System.out.println(srcPath+" does not exist.");
+        return;
+      }
+      
+      // turn off speculative execution, because DFS doesn't handle
+      // multiple writers to the same file.
+      jobConf.setSpeculativeExecution(false);
+      jobConf.setInputKeyClass(UTF8.class);
+      jobConf.setInputValueClass(UTF8.class);
+      jobConf.setInputFormat(SequenceFileInputFormat.class);
+      
+      jobConf.setOutputKeyClass(UTF8.class);
+      jobConf.setOutputValueClass(UTF8.class);
+      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+      
+      jobConf.setMapperClass(DFSCopyFilesMapper.class);
+      jobConf.setReducerClass(CopyFilesReducer.class);
+      
+      jobConf.setNumReduceTasks(1);
+      jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
+      
+      Random r = new Random();
+      Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" 
+          + Integer.toString(Math.abs(r.nextInt()), 36));
+      Path inDir = new Path(jobDirectory, "in");
+      Path fakeOutDir = new Path(jobDirectory, "out");
+      FileSystem fileSys = FileSystem.get(jobConf);
+      fileSys.mkdirs(inDir);
+      jobConf.set("distcp.job.dir", jobDirectory.toString());
+      
+      jobConf.setInputPath(inDir);
+      jobConf.setOutputPath(fakeOutDir);
+      
+      // create new sequence-files for holding paths
+      ArrayList pathList = new ArrayList();
+      ArrayList finalPathList = new ArrayList();
+      pathList.add(new Path(srcPath));
+      long totalBytes = 0;
+      //int part = 0;
+      while(!pathList.isEmpty()) {
+        Path top = (Path) pathList.remove(0);
+        if (srcfs.isFile(top)) {
+          totalBytes += srcfs.getLength(top);
+          top = makeRelative(rootPath, top);
+          finalPathList.add(top.toString());
+        } else {
+          Path[] paths = srcfs.listPaths(top);
+          for (int idx = 0; idx < paths.length; idx++) {
+            pathList.add(paths[idx]);
+          }
+        }
+      }
+      
+      // ideal number of maps is one per file (if the map-launching overhead
+      // were 0. It is limited by jobtrackers handling capacity, which lets say
+      // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for 
+      // small files it is better to determine number of maps by amount of 
+      // data per map.
+      int nFiles = finalPathList.size();
+      int numMaps = nFiles;
+      if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; }
+      if (numMaps > (int) (totalBytes / MIN_BYTES_PER_MAP)) {
+        numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP);
+      }
+      JobClient client = new JobClient(jobConf);
+      ClusterStatus cluster = client.getClusterStatus();
+      int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
+      if (numMaps > tmpMaps) { numMaps = tmpMaps; }
+      if (numMaps == 0) { numMaps = 1; }
+      jobConf.setNumMapTasks(numMaps);
+      
+      for(int idx=0; idx < numMaps; ++idx) {
+        Path file = new Path(inDir, "part"+idx);
+        SequenceFile.Writer writer = 
+          new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
+        for (int ipath = idx; ipath < nFiles; ipath += numMaps) {
+          String path = (String) finalPathList.get(ipath);
+          writer.append(new UTF8(path), new UTF8(""));
+        }
+        writer.close();
+      }
+      finalPathList = null;
+      
+    }
+    
+    public void cleanup(Configuration conf, JobConf jobConf, 
+        String srcPath, String destPath) 
+    throws IOException
+    {
+      //Clean up jobDirectory
+      Path jobDirectory = new Path(jobConf.get("distcp.job.dir", "/"));
+      FileSystem fs = FileSystem.get(jobConf);
+      
+      if(!jobDirectory.equals("/")) {
+        fs.delete(jobDirectory);
+      }
     }
     
     /** Mapper configuration.
@@ -134,7 +360,8 @@
      * top-level paths on source and destination directories.
      * Gets the named file systems, to be used later in map.
      */
-    public void configure(JobConf job) {
+    public void configure(JobConf job) 
+    {
       String srcfs = job.get("copy.src.fs", "local");
       String destfs = job.get("copy.dest.fs", "local");
       srcPath = new Path(job.get("copy.src.path", "/"));
@@ -154,9 +381,10 @@
      * @param key source file name
      * @param value not-used.
      * @param out not-used.
+     * @param reporter
      */
     public void map(WritableComparable key,
-        Writable val,
+        Writable value,
         OutputCollector out,
         Reporter reporter) throws IOException {
       String src = ((UTF8) key).toString();
@@ -165,7 +393,7 @@
       } catch (IOException except) {
         if (ignoreReadFailures) {
           reporter.setStatus("Failed to copy " + src + " : " + 
-                             StringUtils.stringifyException(except));
+              StringUtils.stringifyException(except));
           try {
             destFileSys.delete(new Path(destPath, src));
           } catch (Throwable ex) {
@@ -180,56 +408,315 @@
     public void close() {
       // nothing
     }
+    
   }
   
-  public static class CopyFilesReducer extends MapReduceBase implements Reducer {
-      public void reduce(WritableComparable key,
-                         Iterator values,
-                         OutputCollector output,
-                         Reporter reporter) throws IOException {
-          // nothing
+  public static class HTTPCopyFilesMapper extends CopyFilesMapper 
+  implements Mapper 
+  {
+    private URI srcURI = null;
+    private FileSystem destFileSys = null;
+    private Path destPath = null;
+    private JobConf jobConf = null;
+    private boolean ignoreReadFailures;
+    
+    /**
+     * Initialize HTTPCopyFileMapper specific job.
+     * @param conf : The dfs/mapred configuration.
+     * @param jobConf : The handle to the jobConf object to be initialized.
+     * @param srcPaths : The source URI.
+     * @param destPath : The destination URI.
+     * @param ignoreReadFailures : Ignore read failures?
+     */
+    public void setup(Configuration conf, JobConf jobConf, 
+        String[] srcPaths, String destPath, 
+        boolean ignoreReadFailures) 
+    throws IOException
+    {
+      //Destination
+      URI destURI = null;
+      try {
+        destURI = new URI(destPath);
+      } catch (URISyntaxException ue) {
+        throw new IOException("Illegal destination path!");
       }
+      String destFileSysName = getFileSysName(destURI);
+      jobConf.set("copy.dest.fs", destFileSysName);
+      destPath = destURI.getPath();
+      jobConf.set("copy.dest.path", destPath);
+      
+      //Setup the MR-job configuration
+      jobConf.setSpeculativeExecution(false);
+      
+      jobConf.setInputKeyClass(UTF8.class);
+      jobConf.setInputValueClass(UTF8.class);
+      jobConf.setInputFormat(SequenceFileInputFormat.class);
+      
+      jobConf.setOutputKeyClass(UTF8.class);
+      jobConf.setOutputValueClass(UTF8.class);
+      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+      
+      jobConf.setMapperClass(HTTPCopyFilesMapper.class);
+      jobConf.setReducerClass(CopyFilesReducer.class);
+      
+      // ideal number of maps is one per file (if the map-launching overhead
+      // were 0. It is limited by jobtrackers handling capacity, which lets say
+      // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for 
+      // small files it is better to determine number of maps by 
+      // amount of data per map.
+      int nFiles = srcPaths.length;
+      int numMaps = nFiles;
+      if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; }
+      JobClient client = new JobClient(jobConf);
+      ClusterStatus cluster = client.getClusterStatus();
+      int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
+      if (numMaps > tmpMaps) { numMaps = tmpMaps; }
+      if (numMaps == 0) { numMaps = 1; }
+      jobConf.setNumMapTasks(numMaps);
+      
+      jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
+      
+      FileSystem fileSystem = FileSystem.get(conf);
+      Random r = new Random();
+      Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + 
+          Integer.toString(Math.abs(r.nextInt()), 36));
+      Path jobInputDir = new Path(jobDirectory, "in");
+      fileSystem.mkdirs(jobInputDir);
+      jobConf.setInputPath(jobInputDir);
+      
+      jobConf.set("distcp.job.dir", jobDirectory.toString());
+      Path jobOutputDir = new Path(jobDirectory, "out");
+      jobConf.setOutputPath(jobOutputDir);
+      
+      for(int i=0; i < srcPaths.length; ++i) {
+        Path ipFile = new Path(jobInputDir, "part" + i);
+        SequenceFile.Writer writer = 
+          new SequenceFile.Writer(fileSystem, ipFile, UTF8.class, UTF8.class);
+        writer.append(new UTF8(srcPaths[i]), new UTF8(""));
+        writer.close();
+      }
+    }	
+    
+    public void cleanup(Configuration conf, JobConf jobConf, 
+        String srcPath, String destPath) 
+    throws IOException
+    {
+      //Clean up jobDirectory
+      Path jobDirectory = new Path(jobConf.get("distcp.job.dir", "/"));
+      FileSystem fs = FileSystem.get(jobConf);
+      
+      if(!jobDirectory.equals("/")) {
+        fs.delete(jobDirectory);
+      }
+    }
+    
+    public void configure(JobConf job)
+    {
+      //Save jobConf
+      jobConf = job;
+      
+      try {
+        //Destination
+        destFileSys = 
+          FileSystem.getNamed(job.get("copy.dest.fs", "local"), job);
+        destPath = new Path(job.get("copy.dest.path", "/"));
+        if(!destFileSys.exists(destPath)) {
+          return;
+        }
+      } catch(IOException ioe) {
+        return;
+      }
+      
+      ignoreReadFailures = job.getBoolean(readFailuresAttribute, false);
+    }
+    
+    public void map(WritableComparable key,
+        Writable val,
+        OutputCollector out,
+        Reporter reporter) throws IOException 
+        {
+      //The url of the file
+      try {
+        srcURI = new URI(((UTF8)key).toString());
+        
+        //Construct the complete destination path
+        File urlPath = new File(srcURI.getPath());
+        Path destinationPath = new Path(destPath, urlPath.getName());
+        
+        //Copy the file 
+        URL url = srcURI.toURL();
+        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+        connection.setRequestMethod("GET");
+        connection.connect();
+        
+        int bufferSize = jobConf.getInt("io.file.buffer.size", 4096);
+        byte[] buffer = new byte[bufferSize];
+        BufferedInputStream is = 
+          new BufferedInputStream(connection.getInputStream());
+        
+        FSDataOutputStream os = 
+          new FSDataOutputStream(destFileSys, destinationPath, true, 
+              jobConf,	bufferSize, (short)jobConf.getInt("dfs.replication", 3), 
+              jobConf.getLong("dfs.block.size", 67108864)
+          );
+        
+        int readBytes = 0;
+        while((readBytes = is.read(buffer, 0, bufferSize)) != -1) {
+          os.write(buffer, 0, readBytes);
+        }
+        
+        is.close();
+        os.close();
+        connection.disconnect();
+        
+        reporter.setStatus("Copied: " + srcURI.toString() + 
+            " to: " + destinationPath.toString());
+        
+      } catch(Exception e) {
+        reporter.setStatus("Failed to copy from: " + (UTF8)key);
+        if(ignoreReadFailures) {
+          return;
+        } else {
+          throw new IOException("Failed to copy from: " + (UTF8)key);
+        }
+      }
+    }
   }
   
-  private static String getFileSysName(URI url) {
-    String fsname = url.getScheme();
-    if ("dfs".equals(fsname)) {
-      String host = url.getHost();
-      int port = url.getPort();
-      return (port==(-1)) ? host : (host+":"+port);
-    } else {
-      return "local";
+  /**
+   * Factory to create requisite Mapper objects for distcp.
+   * @author Arun C Murthy
+   */
+  private static class CopyMapperFactory
+  {
+    public static CopyFilesMapper getMapper(String protocol)
+    {
+      CopyFilesMapper mapper = null;
+      
+      if("dfs".equals(protocol) || "file".equals(protocol)) {
+        mapper = new DFSCopyFilesMapper();
+      } else if("http".equals(protocol)) {
+        mapper = new HTTPCopyFilesMapper();
+      }
+      
+      return mapper;
     }
   }
-
+  
+  public static class CopyFilesReducer extends MapReduceBase implements Reducer {
+    public void reduce(WritableComparable key,
+        Iterator values,
+        OutputCollector output,
+        Reporter reporter) throws IOException {
+      // nothing
+    }
+  }
+  
   /**
-   * Make a path relative with respect to a root path.
-   * absPath is always assumed to descend from root.
-   * Otherwise returned path is null.
+   * Helper function to parse input file and return source urls for 
+   * a given protocol.
+   * @param protocol : The protocol for which to find source urls.
+   * @param inputFilePath : The file containing the urls.
+   * @return
    */
-  private static Path makeRelative(Path root, Path absPath) {
-    if (!absPath.isAbsolute()) { return absPath; }
-    String sRoot = root.toString();
-    String sPath = absPath.toString();
-    Enumeration rootTokens = new StringTokenizer(sRoot, "/");
-    ArrayList rList = Collections.list(rootTokens);
-    Enumeration pathTokens = new StringTokenizer(sPath, "/");
-    ArrayList pList = Collections.list(pathTokens);
-    Iterator rIter = rList.iterator();
-    Iterator pIter = pList.iterator();
-    while (rIter.hasNext()) {
-      String rElem = (String) rIter.next();
-      String pElem = (String) pIter.next();
-      if (!rElem.equals(pElem)) { return null; }
-    }
-    StringBuffer sb = new StringBuffer();
-    while (pIter.hasNext()) {
-      String pElem = (String) pIter.next();
-      sb.append(pElem);
-      if (pIter.hasNext()) { sb.append("/"); }
+  private static String[] parseInputFile(String protocol, String inputFilePath)
+  throws IOException
+  {
+    ArrayList urls = new ArrayList();
+    String url;
+    BufferedReader fis = new BufferedReader(new FileReader(inputFilePath));
+    while((url = fis.readLine()) != null) {
+      if(!url.startsWith("#") && url.startsWith(protocol)) {
+        urls.add(url);
+      }
+    }
+    fis.close();
+    
+    if(!urls.isEmpty()) {
+      return (String[])urls.toArray(new String[0]);
+    }
+    
+    return null;
+  }
+  
+  /**
+   * Driver to copy srcPath to destPath depending on required protocol.
+   * @param conf : Configuration
+   * @param srcPath : Source path
+   * @param destPath : Destination path
+   */
+  public static void copy(Configuration conf, String srcPath, String destPath,
+      boolean srcAsList, boolean ignoreReadFailures) 
+  throws IOException
+  {
+    //Job configuration
+    JobConf jobConf = new JobConf(conf, CopyFiles.class);
+    jobConf.setJobName("distcp");
+    
+    URI srcURI = null;
+    //URI destURI = null;
+    try {
+      if(!srcAsList) {
+        srcURI = new URI(srcPath);
+      }
+      //destURI = new URI(destPath);
+    } catch (URISyntaxException ex) {
+      throw new IOException("Illegal source path!");
+    }
+    
+    //Source paths
+    String[] srcPaths = null;
+    
+    //Create the task-specific mapper 
+    CopyFilesMapper mapper = null;
+    if(srcAsList) {
+      //Ugly?!
+      
+      // Protocol - 'dfs://'
+      String[] dfsUrls = parseInputFile("dfs", srcPath);
+      if(dfsUrls != null) {
+        for(int i=0; i < dfsUrls.length; ++i) {
+          copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures);
+        }
+      }
+      
+      // Protocol - 'file://'
+      String[] localUrls = parseInputFile("file", srcPath);
+      if(localUrls != null) {
+        for(int i=0; i < localUrls.length; ++i) {
+          copy(conf, localUrls[i], destPath, false, ignoreReadFailures);
+        }
+      }
+      
+      // Protocol - 'http://'
+      String[] httpUrls = parseInputFile("http", srcPath);
+      if(httpUrls != null) {
+        srcPaths = httpUrls;
+        mapper = CopyMapperFactory.getMapper("http");
+      } else {
+        //Done
+        return;
+      }
+      
+    } else {
+      //Single source - ugly!
+      String[] tmpSrcPath = {srcPath};
+      srcPaths = tmpSrcPath;
+      mapper = CopyMapperFactory.getMapper(srcURI.getScheme());
+    }
+    
+    //Initialize the mapper
+    mapper.setup(conf, jobConf, srcPaths, destPath, ignoreReadFailures);
+    
+    //We are good to go!
+    try {
+      JobClient.runJob(jobConf);
+    } finally {
+      mapper.cleanup(conf, jobConf, srcPath, destPath);
     }
-    return new Path(sb.toString());
+    
   }
+  
   /**
    * This is the main driver for recursively copying directories
    * across file systems. It takes at least two cmdline parameters. A source
@@ -242,159 +729,43 @@
     String srcPath = null;
     String destPath = null;
     boolean ignoreReadFailures = false;
+    boolean srcAsList = false;
     
     for (int idx = 0; idx < args.length; idx++) {
-        if ("-i".equals(args[idx])) {
-          ignoreReadFailures = true;
-        } else if (srcPath == null) {
-                srcPath = args[idx];
-        } else if (destPath == null) {
-                destPath = args[idx];
-        } else {
-                System.out.println(usage);
-                return -1;
-        }
+      if ("-i".equals(args[idx])) {
+        ignoreReadFailures = true;
+      } else if ("-f".equals(args[idx])) {
+        srcAsList = true;
+      } else if (srcPath == null) {
+        srcPath = args[idx];
+      } else if (destPath == null) {
+        destPath = args[idx];
+      } else {
+        System.out.println(usage);
+        return -1;
+      }
     }
     
     // mandatory command-line parameters
     if (srcPath == null || destPath == null) {
-        System.out.println(usage);
-        return -1;
+      System.out.println(usage);
+      return -1;
     }
     
-    URI srcurl = null;
-    URI desturl = null;
     try {
-      srcurl = new URI(srcPath);
-      desturl = new URI(destPath);
-    } catch (URISyntaxException ex) {
-      throw new RuntimeException("URL syntax error.", ex);
-    }
-    
-    JobConf jobConf = (JobConf)conf;
-    
-    jobConf.setJobName("copy-files");
-    
-    String srcFileSysName = getFileSysName(srcurl);
-    String destFileSysName = getFileSysName(desturl);
-    
-    jobConf.set("copy.src.fs", srcFileSysName);
-    jobConf.set("copy.dest.fs", destFileSysName);
-    FileSystem srcfs;
-   
-    srcfs = FileSystem.getNamed(srcFileSysName, jobConf);
-    FileSystem destfs = FileSystem.getNamed(destFileSysName, jobConf);
- 
-    srcPath = srcurl.getPath();
-    if ("".equals(srcPath)) { srcPath = "/"; }
-    destPath = desturl.getPath();
-    if ("".equals(destPath)) { destPath = "/"; }
-    
-    Path tmpPath = new Path(srcPath);
-    Path rootPath = new Path(srcPath);
-    if (srcfs.isFile(tmpPath)) {
-      tmpPath = tmpPath.getParent();
-      rootPath = rootPath.getParent();
-      jobConf.set("copy.src.path", tmpPath.toString());
-    } else {
-      jobConf.set("copy.src.path", srcPath);
-    }
-    jobConf.set("copy.dest.path", destPath);
-    
-    if (!srcfs.exists(tmpPath)) {
-      System.out.println(srcPath+" does not exist.");
+      copy(conf, srcPath, destPath, srcAsList, ignoreReadFailures);
+    } catch (Exception e) {
+      System.out.println("Caught: " + e);
       return -1;
     }
     
-    // turn off speculative execution, because DFS doesn't handle
-    // multiple writers to the same file.
-    jobConf.setSpeculativeExecution(false);
-    jobConf.setInputKeyClass(UTF8.class);
-    jobConf.setInputValueClass(UTF8.class);
-    jobConf.setInputFormat(SequenceFileInputFormat.class);
-        
-    jobConf.setOutputKeyClass(UTF8.class);
-    jobConf.setOutputValueClass(UTF8.class);
-    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-    
-    jobConf.setMapperClass(CopyFilesMapper.class);
-    jobConf.setReducerClass(CopyFilesReducer.class);
-    
-    jobConf.setNumReduceTasks(1);
-    jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
-
-    Path tmpDir = new Path("copy-files");
-    Path inDir = new Path(tmpDir, "in");
-    Path fakeOutDir = new Path(tmpDir, "out");
-    FileSystem fileSys = FileSystem.get(jobConf);
-    fileSys.delete(tmpDir);
-    fileSys.mkdirs(inDir);
-    
-    
-    jobConf.setInputPath(inDir);
-    jobConf.setOutputPath(fakeOutDir);
-    
-    // create new sequence-files for holding paths
-    ArrayList pathList = new ArrayList();
-    ArrayList finalPathList = new ArrayList();
-    pathList.add(new Path(srcPath));
-    long totalBytes = 0;
-    while(!pathList.isEmpty()) {
-      Path top = (Path) pathList.remove(0);
-      if (srcfs.isFile(top)) {
-        totalBytes += srcfs.getLength(top);
-        top = makeRelative(rootPath, top);
-        finalPathList.add(top.toString());
-      } else {
-        Path[] paths = srcfs.listPaths(top);
-        for (int idx = 0; idx < paths.length; idx++) {
-          pathList.add(paths[idx]);
-        }
-      }
-    }
-    // ideal number of maps is one per file (if the map-launching overhead
-    // were 0. It is limited by jobtrackers handling capacity, which lets say
-    // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for small
-    // files it is better to determine number of maps by amount of data per map.
-    
-    int nFiles = finalPathList.size();
-    int numMaps = nFiles;
-    if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; }
-    if (numMaps > (int) (totalBytes / MIN_BYTES_PER_MAP)) {
-        numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP);
-    }
-    JobClient client = new JobClient(jobConf);
-    ClusterStatus cluster = client.getClusterStatus();
-    int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
-    if (numMaps > tmpMaps) { numMaps = tmpMaps; }
-    if (numMaps == 0) { numMaps = 1; }
-    jobConf.setNumMapTasks(numMaps);
-    
-    for(int idx=0; idx < numMaps; ++idx) {
-      Path file = new Path(inDir, "part"+idx);
-      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
-      for (int ipath = idx; ipath < nFiles; ipath += numMaps) {
-        String path = (String) finalPathList.get(ipath);
-        writer.append(new UTF8(path), new UTF8(""));
-      }
-      writer.close();
-    }
-    finalPathList = null;
-    
-    int exitCode = -1;
-    try {
-      JobClient.runJob(jobConf);
-      exitCode = 0;
-    } finally {
-      fileSys.delete(tmpDir);
-    }
-  
-    return exitCode;
+    return 0;
   }
   
   public static void main(String[] args) throws Exception {
-      new CopyFiles().doMain(
-              new JobConf(new Configuration(), CopyFiles.class), 
-              args);
+    new CopyFiles().doMain(
+        new JobConf(new Configuration(), CopyFiles.class), 
+        args);
   }
+  
 }