You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/18 14:02:00 UTC

svn commit: r423058 - /lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java

Author: cutting
Date: Tue Jul 18 05:01:59 2006
New Revision: 423058

URL: http://svn.apache.org/viewvc?rev=423058&view=rev
Log:
Further improvements to HADOOP-341.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=423058&r1=423057&r2=423058&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Tue Jul 18 05:01:59 2006
@@ -21,6 +21,7 @@
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.File;
+import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URL;
@@ -64,7 +65,7 @@
   private static final String usage = "distcp "+
   "[-fs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
   "[-conf <config-file.xml>] " + "[-D <property=value>] "+
-  "[-i] <srcurl> <desturl>";
+  "[-i] <srcurl> | -f <urilist_uri> <desturl>";
   
   private static final long MIN_BYTES_PER_MAP = 1L << 28;
   private static final int MAX_NUM_MAPS = 10000;
@@ -111,7 +112,7 @@
     public abstract void cleanup(Configuration conf, JobConf jobConf, 
         String srcPath, String destPath) throws IOException;
     
-    public String getFileSysName(URI url) {
+    public static String getFileSysName(URI url) {
       String fsname = url.getScheme();
       if ("dfs".equals(fsname)) {
         String host = url.getHost();
@@ -127,7 +128,7 @@
      * absPath is always assumed to descend from root.
      * Otherwise returned path is null.
      */
-    public Path makeRelative(Path root, Path absPath) {
+    public static Path makeRelative(Path root, Path absPath) {
       if (!absPath.isAbsolute()) { return absPath; }
       String sRoot = root.toString();
       String sPath = absPath.toString();
@@ -612,6 +613,50 @@
     }
   }
   
+  private static String[] fetchSrcURIs(Configuration conf, URI srcListURI) throws IOException
+  {
+    ArrayList uris = new ArrayList();
+    BufferedReader fis = null;
+    
+    String srcListURIScheme = srcListURI.getScheme();
+    String srcListURIPath = srcListURI.getPath();
+    
+    if("file".equals(srcListURIScheme)) {
+      fis = new BufferedReader(new FileReader(srcListURIPath));
+    } else if("dfs".equals(srcListURIScheme)) {
+      FileSystem fs = FileSystem.getNamed(CopyFilesMapper.getFileSysName(srcListURI), conf);
+      fis = new BufferedReader(
+          new InputStreamReader(new FSDataInputStream(fs, new Path(srcListURIPath), conf))
+          );
+    } else if("http".equals(srcListURIScheme)) {
+      //Copy the file 
+      URL url = srcListURI.toURL();
+      HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+      connection.setRequestMethod("GET");
+      connection.connect();
+      
+      fis = new BufferedReader(
+          new InputStreamReader(connection.getInputStream())
+          );
+    } else {
+      throw new IOException("Unsupported source list uri!");
+    }
+
+    String uri = null;
+    while((uri = fis.readLine()) != null) {
+      if(!uri.startsWith("#")) {
+        uris.add(uri);
+      }
+    }
+    fis.close();
+
+    if(!uris.isEmpty()) {
+      return (String[])uris.toArray(new String[0]);
+    }
+    
+    return null;
+  }
+  
   /**
    * Helper function to parse input file and return source urls for 
    * a given protocol.
@@ -619,21 +664,19 @@
    * @param inputFilePath : The file containing the urls.
    * @return
    */
-  private static String[] parseInputFile(String protocol, String inputFilePath)
+  private static String[] parseInputFile(String protocol, String[] uris)
   throws IOException
   {
-    ArrayList urls = new ArrayList();
-    String url;
-    BufferedReader fis = new BufferedReader(new FileReader(inputFilePath));
-    while((url = fis.readLine()) != null) {
-      if(!url.startsWith("#") && url.startsWith(protocol)) {
-        urls.add(url);
+    ArrayList protocolURIs = new ArrayList();
+    
+    for(int i=0; i < uris.length; ++i) {
+      if(uris[i].startsWith(protocol)) {
+        protocolURIs.add(uris[i]);
       }
     }
-    fis.close();
     
-    if(!urls.isEmpty()) {
-      return (String[])urls.toArray(new String[0]);
+    if(!protocolURIs.isEmpty()) {
+      return (String[])protocolURIs.toArray(new String[0]);
     }
     
     return null;
@@ -653,27 +696,35 @@
     JobConf jobConf = new JobConf(conf, CopyFiles.class);
     jobConf.setJobName("distcp");
     
+    //Sanity check for srcPath/destPath
     URI srcURI = null;
-    //URI destURI = null;
     try {
-      if(!srcAsList) {
         srcURI = new URI(srcPath);
-      }
-      //destURI = new URI(destPath);
     } catch (URISyntaxException ex) {
       throw new IOException("Illegal source path!");
     }
     
+    URI destURI = null;
+    try {
+      destURI = new URI(destPath);
+    } catch (URISyntaxException ex) {
+      throw new IOException("Illegal destination path!");
+    }
+  
     //Source paths
     String[] srcPaths = null;
     
+    if(srcAsList) {
+      srcPaths = fetchSrcURIs(conf, srcURI);
+    }
+    
     //Create the task-specific mapper 
     CopyFilesMapper mapper = null;
     if(srcAsList) {
       //Ugly?!
       
       // Protocol - 'dfs://'
-      String[] dfsUrls = parseInputFile("dfs", srcPath);
+      String[] dfsUrls = parseInputFile("dfs", srcPaths);
       if(dfsUrls != null) {
         for(int i=0; i < dfsUrls.length; ++i) {
           copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures);
@@ -681,7 +732,7 @@
       }
       
       // Protocol - 'file://'
-      String[] localUrls = parseInputFile("file", srcPath);
+      String[] localUrls = parseInputFile("file", srcPaths);
       if(localUrls != null) {
         for(int i=0; i < localUrls.length; ++i) {
           copy(conf, localUrls[i], destPath, false, ignoreReadFailures);
@@ -689,7 +740,7 @@
       }
       
       // Protocol - 'http://'
-      String[] httpUrls = parseInputFile("http", srcPath);
+      String[] httpUrls = parseInputFile("http", srcPaths);
       if(httpUrls != null) {
         srcPaths = httpUrls;
         mapper = CopyMapperFactory.getMapper("http");