You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/19 21:52:07 UTC

svn commit: r407894 - in /lucene/hadoop/trunk: bin/hadoop src/java/org/apache/hadoop/util/CopyFiles.java

Author: cutting
Date: Fri May 19 12:52:06 2006
New Revision: 407894

URL: http://svn.apache.org/viewvc?rev=407894&view=rev
Log:
HADOOP-220.  Improvements to mapreduce-based file copier.  Contributed by Milind.

Modified:
    lucene/hadoop/trunk/bin/hadoop
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java

Modified: lucene/hadoop/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/hadoop?rev=407894&r1=407893&r2=407894&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop (original)
+++ lucene/hadoop/trunk/bin/hadoop Fri May 19 12:52:06 2006
@@ -39,7 +39,7 @@
   echo "  tasktracker       run a MapReduce task Tracker node" 
   echo "  job               manipulate MapReduce jobs" 
   echo "  jar <jar>         run a jar file"
-  echo "  cp <srcurl> <desturl> copy file or directories recursively"
+  echo "  distcp <srcurl> <desturl> copy file or directories recursively"
   echo " or"
   echo "  CLASSNAME         run the class named CLASSNAME"
   echo "Most commands print help when invoked w/o parameters."
@@ -137,7 +137,7 @@
   CLASS=org.apache.hadoop.mapred.JobClient
 elif [ "$COMMAND" = "jar" ] ; then
   CLASS=org.apache.hadoop.util.RunJar
-elif [ "$COMMAND" = "cp" ] ; then
+elif [ "$COMMAND" = "distcp" ] ; then
   CLASS=org.apache.hadoop.util.CopyFiles
 else
   CLASS=$COMMAND

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=407894&r1=407893&r2=407894&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Fri May 19 12:52:06 2006
@@ -53,8 +53,13 @@
  */
 public class CopyFiles extends MapReduceBase implements Reducer {
   
-  private static final String usage = "cp <srcurl> <desturl>";
+  private static final String usage = "distcp <srcurl> <desturl> "+
+          "[-dfs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
+          "[-config <config-file.xml>]";
   
+  private static final long MIN_BYTES_PER_MAP = 1L << 28;
+  private static final int MAX_NUM_MAPS = 10000;
+  private static final int MAX_MAPS_PER_NODE = 10;
   /**
    * Mappper class for Copying files.
    */
@@ -67,8 +72,11 @@
     private Path srcPath = null;
     private Path destPath = null;
     private byte[] buffer = null;
+    private static final long reportInterval = 1L << 25;
+    private long bytesSinceLastReport = 0L;
+    private long totalBytesCopied = 0L;
     
-    private void copy(String src) throws IOException {
+    private void copy(String src, Reporter reporter) throws IOException {
       // open source file
       Path srcFile = new Path(srcPath, src);
       FSDataInputStream in = srcFileSys.open(srcFile);
@@ -82,14 +90,22 @@
       // copy file
       while (true) {
         int nread = in.read(buffer);
-        if (nread < 0) {
-          break;
-        }
+        if (nread < 0) { break; }
         out.write(buffer, 0, nread);
+        bytesSinceLastReport += nread;
+        if (bytesSinceLastReport > reportInterval) {
+            totalBytesCopied += bytesSinceLastReport;
+            bytesSinceLastReport = 0L;
+            reporter.setStatus("Total bytes copied: "+totalBytesCopied);
+        }
       }
       
       in.close();
       out.close();
+      // report at least once for each file
+      totalBytesCopied += bytesSinceLastReport;
+      bytesSinceLastReport = 0L;
+      reporter.setStatus("Total bytes copied: "+totalBytesCopied);
     }
     
     /** Mapper configuration.
@@ -122,7 +138,7 @@
         OutputCollector out,
         Reporter reporter) throws IOException {
       String src = ((UTF8) key).toString();
-      copy(src);
+      copy(src, reporter);
     }
     
     public void close() {
@@ -185,15 +201,47 @@
    * the reduce is empty.
    */
   public static void main(String[] args) throws IOException {
-    if (args.length != 2) {
-      System.out.println(usage);
-      return;
-    }
-    
+
     Configuration conf = new Configuration();
+    String srcPath = null;
+    String destPath = null;
+    
+    for (int idx = 0; idx < args.length; idx++) {
+        if ("-dfs".equals(args[idx])) {
+            if (idx == (args.length-1)) {
+                System.out.println(usage);
+                return;
+            }
+            conf.set("fs.default.name", args[++idx]);
+        } else if ("-jt".equals(args[idx])) {
+            if (idx == (args.length-1)) {
+                System.out.println(usage);
+                return;
+            }
+            conf.set("mapred.job.tracker", args[++idx]);
+        } else if ("-config".equals(args[idx])) {
+            if (idx == (args.length-1)) {
+                System.out.println(usage);
+                return;
+            }
+            conf.addFinalResource(new Path(args[++idx]));
+        } else {
+            if (srcPath == null) {
+                srcPath = args[idx];
+            } else if (destPath == null) {
+                destPath = args[idx];
+            } else {
+                System.out.println(usage);
+                return;
+            }
+        }
+    }
     
-    String srcPath = args[0];
-    String destPath = args[1];
+    // mandatory command-line parameters
+    if (srcPath == null || destPath == null) {
+        System.out.println(usage);
+        return;
+    }
     
     URI srcurl = null;
     URI desturl = null;
@@ -204,7 +252,7 @@
       throw new RuntimeException("URL syntax error.", ex);
     }
     
-    JobConf jobConf = new JobConf(conf);
+    JobConf jobConf = new JobConf(conf, CopyFiles.class);
     jobConf.setJobName("copy-files");
     
     String srcFileSysName = getFileSysName(srcurl);
@@ -254,7 +302,6 @@
     jobConf.setMapperClass(CopyFilesMapper.class);
     jobConf.setReducerClass(CopyFiles.class);
     
-    int filesPerMap = jobConf.getInt("copy.files_per_map", 10);
     jobConf.setNumReduceTasks(1);
 
     Path tmpDir = new Path("copy-files");
@@ -272,10 +319,12 @@
     ArrayList pathList = new ArrayList();
     ArrayList finalPathList = new ArrayList();
     pathList.add(new Path(srcPath));
+    long totalBytes = 0;
     int part = 0;
     while(!pathList.isEmpty()) {
       Path top = (Path) pathList.remove(0);
       if (srcfs.isFile(top)) {
+        totalBytes += srcfs.getLength(top);
         top = makeRelative(rootPath, top);
         finalPathList.add(top.toString());
       } else {
@@ -285,27 +334,34 @@
         }
       }
     }
-    int numMaps = finalPathList.size() / filesPerMap;
+    // ideal number of maps is one per file (if the map-launching overhead
+    // were 0. It is limited by jobtrackers handling capacity, which lets say
+    // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for small
+    // files it is better to determine number of maps by amount of data per map.
+    
+    int nFiles = finalPathList.size();
+    int numMaps = nFiles;
+    if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; }
+    if (numMaps > (int) (totalBytes / MIN_BYTES_PER_MAP)) {
+        numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP);
+    }
+    JobClient client = new JobClient(jobConf);
+    ClusterStatus cluster = client.getClusterStatus();
+    int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
+    if (numMaps > tmpMaps) { numMaps = tmpMaps; }
     if (numMaps == 0) { numMaps = 1; }
     jobConf.setNumMapTasks(numMaps);
-    SequenceFile.Writer[] writers = new SequenceFile.Writer[numMaps];
     
     for(int idx=0; idx < numMaps; ++idx) {
       Path file = new Path(inDir, "part"+idx);
-      writers[idx] = new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
-    }
-    while (!finalPathList.isEmpty()) {
-      String top = (String) finalPathList.remove(0);
-      UTF8 key = new UTF8(top);
-      UTF8 value = new UTF8("");
-      writers[part].append(key, value);
-      part = (part+1)%numMaps;
-    }
-
-    for(part = 0; part < numMaps; part++) {
-      writers[part].close();
-      writers[part] = null;
+      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
+      for (int ipath = idx; ipath < nFiles; ipath += numMaps) {
+        String path = (String) finalPathList.get(ipath);
+        writer.append(new UTF8(path), new UTF8(""));
+      }
+      writer.close();
     }
+    finalPathList = null;
     
     try {
       JobClient.runJob(jobConf);