You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/06/15 23:56:38 UTC

svn commit: r547807 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: cutting
Date: Fri Jun 15 14:56:37 2007
New Revision: 547807

URL: http://svn.apache.org/viewvc?view=rev&rev=547807
Log:
HADOOP-1320.  Rewrite RandomWriter example to bypass reduce.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jun 15 14:56:37 2007
@@ -147,6 +147,9 @@
  46. HADOOP-1417.  Disable a few FindBugs checks that generate a lot
      of spurious warnings.  (Nigel Daley via cutting)
 
+ 47. HADOOP-1320.  Rewrite RandomWriter example to bypass reduce.
+     (Arun C Murthy via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java Fri Jun 15 14:56:37 2007
@@ -71,6 +71,7 @@
   
   public void testCommandLine()
   {
+    String outFileName = "part-00000";
     File outFile = null;
     try {
       try {
@@ -85,7 +86,7 @@
       // So don't specify -config or -cluster
       job = new StreamJob(genArgs(), mayExit);      
       job.go();
-      outFile = new File(OUTPUT_DIR, "tip_m_map_0000").getAbsoluteFile();
+      outFile = new File(OUTPUT_DIR, outFileName).getAbsoluteFile();
       String output = StreamUtil.slurp(outFile);
       System.err.println("outEx1=" + outputExpect);
       System.err.println("  out1=" + output);
@@ -94,7 +95,7 @@
       failTrace(e);
     } finally {
       outFile.delete();
-      File outFileCRC = new File(OUTPUT_DIR, ".tip_m_map_0000.crc").getAbsoluteFile();
+      File outFileCRC = new File(OUTPUT_DIR, "."+outFileName+".crc").getAbsoluteFile();
       INPUT_FILE.delete();
       outFileCRC.delete();
       OUTPUT_DIR.getAbsoluteFile().delete();

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Fri Jun 15 14:56:37 2007
@@ -22,13 +22,12 @@
 import java.util.Date;
 import java.util.Random;
 
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.ToolBase;
 
 /**
  * This program uses map/reduce to just run a distributed job where there is
@@ -61,8 +60,11 @@
  *     <value>1099511627776</value>
  *   </property>
  * </configuration></xmp>
+ * 
+ * Equivalently, {@link RandomWriter} also supports all the above options
+ * and ones supported by {@link ToolBase} via the command-line.
  */
-public class RandomWriter {
+public class RandomWriter extends ToolBase {
   
   /**
    * User counters
@@ -88,7 +90,7 @@
       InputSplit[] result = new InputSplit[numSplits];
       Path outDir = job.getOutputPath();
       for(int i=0; i < result.length; ++i) {
-        result[i] = new FileSplit(new Path(outDir, "part-" + i), 0, 1, job);
+        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, job);
       }
       return result;
     }
@@ -133,8 +135,6 @@
   }
 
   static class Map extends MapReduceBase implements Mapper {
-    private FileSystem fileSys = null;
-    private JobConf jobConf = null;
     private long numBytesToWrite;
     private int minKeySize;
     private int keySizeRange;
@@ -143,7 +143,6 @@
     private Random random = new Random();
     private BytesWritable randomKey = new BytesWritable();
     private BytesWritable randomValue = new BytesWritable();
-    private Path outputDir = null;
     
     private void randomizeBytes(byte[] data, int offset, int length) {
       for(int i=offset + length - 1; i >= offset; --i) {
@@ -158,12 +157,6 @@
                     Writable value,
                     OutputCollector output, 
                     Reporter reporter) throws IOException {
-      String filename = ((Text) key).toString();
-      SequenceFile.Writer writer = 
-        SequenceFile.createWriter(fileSys, jobConf, 
-                                  new Path(outputDir, filename), 
-                                  BytesWritable.class, BytesWritable.class,
-                                  CompressionType.NONE, reporter);
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         int keyLength = minKeySize + 
@@ -174,7 +167,7 @@
           (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
         randomValue.setSize(valueLength);
         randomizeBytes(randomValue.get(), 0, randomValue.getSize());
-        writer.append(randomKey, randomValue);
+        output.collect(randomKey, randomValue);
         numBytesToWrite -= keyLength + valueLength;
         reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
         reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
@@ -184,7 +177,6 @@
         }
       }
       reporter.setStatus("done with " + itemCount + " records.");
-      writer.close();
     }
     
     /**
@@ -192,14 +184,6 @@
      * the data.
      */
     public void configure(JobConf job) {
-      jobConf = job;
-      try {
-        fileSys = FileSystem.get(job);
-      } catch (IOException e) {
-        throw new RuntimeException("Can't get default file system", e);
-      }
-      outputDir = job.getOutputPath();
-      
       numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
                                     1*1024*1024*1024);
       minKeySize = job.getInt("test.randomwrite.min_key", 10);
@@ -219,18 +203,15 @@
    * 
    * @throws IOException 
    */
-  public static void main(String[] args) throws IOException {
+  public int run(String[] args) throws Exception {    
     if (args.length == 0) {
       System.out.println("Usage: writer <out-dir> [<config>]");
-      return;
+      return -1;
     }
+    
     Path outDir = new Path(args[0]);
-    JobConf job;
-    if (args.length >= 2) {
-      job = new JobConf(new Path(args[1]));
-    } else {
-      job = new JobConf();
-    }
+    JobConf job = new JobConf(conf);
+    
     job.setJarByClass(RandomWriter.class);
     job.setJobName("random-writer");
     job.setOutputPath(outDir);
@@ -241,7 +222,7 @@
     job.setInputFormat(RandomInputFormat.class);
     job.setMapperClass(Map.class);        
     job.setReducerClass(IdentityReducer.class);
-    job.setOutputFormat(NullOutputFormat.class);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
     
     JobClient client = new JobClient(job);
     ClusterStatus cluster = client.getClusterStatus();
@@ -250,7 +231,7 @@
                                              1*1024*1024*1024);
     if (numBytesToWritePerMap == 0) {
       System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
-      System.exit(-1);
+      return -2;
     }
     long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", 
          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
@@ -262,7 +243,9 @@
     
     job.setNumMapTasks(numMaps);
     System.out.println("Running " + numMaps + " maps.");
-    job.setNumReduceTasks(1);
+    
+    // reducer NONE
+    job.setNumReduceTasks(0);
     
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
@@ -272,6 +255,13 @@
     System.out.println("The job took " + 
                        (endTime.getTime() - startTime.getTime()) /1000 + 
                        " seconds.");
+    
+    return 0;
   }
   
+  public static void main(String[] args) throws Exception {
+    int res = new RandomWriter().doMain(new Configuration(), args);
+    System.exit(res);
+  }
+
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jun 15 14:56:37 2007
@@ -239,7 +239,7 @@
         JobConf job, Reporter reporter) throws IOException {
       this.job = job;
       this.reporter = reporter;
-      String finalName = getTipId();
+      String finalName = getOutputName(getPartition());
       FileSystem fs = FileSystem.get(this.job);
 
       out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jun 15 14:56:37 2007
@@ -26,7 +26,6 @@
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.text.DecimalFormat;
-import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -373,20 +372,6 @@
       throw ioe;
     }
     done(umbilical);
-  }
-
-  /** Construct output file names so that, when an output directory listing is
-   * sorted lexicographically, positions correspond to output partitions.*/
-
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
-  }
-
-  static synchronized String getOutputName(int partition) {
-    return "part-" + NUMBER_FORMAT.format(partition);
   }
 
   private class ReduceCopier implements MRConstants {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Jun 15 14:56:37 2007
@@ -22,6 +22,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.net.URI;
+import java.text.NumberFormat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,8 +55,23 @@
     REDUCE_INPUT_RECORDS,
     REDUCE_OUTPUT_RECORDS
   }
+
+  ///////////////////////////////////////////////////////////
+  // Helper methods to construct task-output paths
+  ///////////////////////////////////////////////////////////
   
-  
+  /** Construct output file names so that, when an output directory listing is
+   * sorted lexicographically, positions correspond to output partitions.*/
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  static synchronized String getOutputName(int partition) {
+    return "part-" + NUMBER_FORMAT.format(partition);
+  }
+
   ////////////////////////////////////////////
   // Fields
   ////////////////////////////////////////////

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Fri Jun 15 14:56:37 2007
@@ -305,7 +305,7 @@
       assertTrue("job was complete", rj.isComplete());
       assertTrue("job was successful", rj.isSuccessful());
       Path output = new Path(outDir,
-                             ReduceTask.getOutputName(0));
+                             Task.getOutputName(0));
       assertTrue("reduce output exists " + output, fs.exists(output));
       SequenceFile.Reader rdr = 
         new SequenceFile.Reader(fs, output, conf);