You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/11/23 21:10:02 UTC

svn commit: r1038314 - in /avro/trunk: ./ lang/java/ lang/java/src/java/org/apache/avro/mapred/ lang/java/src/test/java/org/apache/avro/mapred/

Author: cutting
Date: Tue Nov 23 20:10:02 2010
New Revision: 1038314

URL: http://svn.apache.org/viewvc?rev=1038314&view=rev
Log:
AVRO-698. Java: Add MapReduce tests and documentation for jobs that mix Avro and non-Avro data.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/build.xml
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Nov 23 20:10:02 2010
@@ -36,6 +36,9 @@ Avro 1.5.0 (unreleased)
     AVRO-648. Java: Use Velocity templates to generate specific code.
     (philz via cutting)
 
+    AVRO-698. Java: Add MapReduce tests and documentation for jobs
+    that mix Avro and non-Avro data. (cutting)
+
   BUG FIXES
 
     AVRO-675. C: Bytes and fixed setters don't update datum size.

Modified: avro/trunk/lang/java/build.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/build.xml?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/build.xml (original)
+++ avro/trunk/lang/java/build.xml Tue Nov 23 20:10:02 2010
@@ -75,7 +75,7 @@
   <property name="javadoc.link.servlet"
 	    value="http://java.sun.com/products/servlet/2.3/javadoc/"/>
   <property name="javadoc.link.hadoop"
-	    value="http://hadoop.apache.org/common/docs/current/api/"/>
+	    value="http://hadoop.apache.org/common/docs/r0.20.0/api/"/>
   <property name="javadoc.packages" value="org.${org}.${name}.*"/>
 
   <property name="javac.encoding" value="ISO-8859-1"/>

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Tue Nov 23 20:10:02 2010
@@ -24,6 +24,8 @@ import java.io.UnsupportedEncodingExcept
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.avro.Schema;
 
 /** Setters to configure jobs for Avro data. */
@@ -48,7 +50,7 @@ public class AvroJob {
   /** Configure a job's map input schema. */
   public static void setInputSchema(JobConf job, Schema s) {
     job.set(INPUT_SCHEMA, s.toString());
-    configureAvroJob(job);
+    configureAvroInput(job);
   }
 
   /** Return a job's map input schema. */
@@ -61,7 +63,7 @@ public class AvroJob {
    * be a {@link Pair} schema. */
   public static void setMapOutputSchema(JobConf job, Schema s) {
     job.set(MAP_OUTPUT_SCHEMA, s.toString());
-    configureAvroJob(job);
+    configureAvroShuffle(job);
   }
 
   /** Return a job's map output key schema. */
@@ -73,7 +75,7 @@ public class AvroJob {
    * must be a {@link Pair} schema. */
   public static void setOutputSchema(JobConf job, Schema s) {
     job.set(OUTPUT_SCHEMA, s.toString());
-    configureAvroJob(job);
+    configureAvroOutput(job);
   }
 
   /** Add metadata to job output files.*/
@@ -105,21 +107,32 @@ public class AvroJob {
     return Schema.parse(job.get(OUTPUT_SCHEMA));
   }
 
-  private static void configureAvroJob(JobConf job) {
+  private static void configureAvroInput(JobConf job) {
     if (job.get("mapred.input.format.class") == null)
       job.setInputFormat(AvroInputFormat.class);
+
+    if (job.getMapperClass() == IdentityMapper.class)
+      job.setMapperClass(HadoopMapper.class);
+
+    configureAvroShuffle(job);
+  }
+
+  private static void configureAvroOutput(JobConf job) {
     if (job.get("mapred.output.format.class") == null)
       job.setOutputFormat(AvroOutputFormat.class);
 
+    if (job.getReducerClass() == IdentityReducer.class)
+      job.setReducerClass(HadoopReducer.class);
+
     job.setOutputKeyClass(AvroWrapper.class);
+    configureAvroShuffle(job);
+  }
+
+  private static void configureAvroShuffle(JobConf job) {
     job.setOutputKeyComparatorClass(AvroKeyComparator.class);
     job.setMapOutputKeyClass(AvroKey.class);
     job.setMapOutputValueClass(AvroValue.class);
 
-
-    job.setMapperClass(HadoopMapper.class);
-    job.setReducerClass(HadoopReducer.class);
-
     // add AvroSerialization to io.serializations
     Collection<String> serializations =
       job.getStringCollection("io.serializations");

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html Tue Nov 23 20:10:02 2010
@@ -23,10 +23,20 @@ Avro data, with map and reduce functions
 
 <p>Avro data files do not contain key/value pairs as expected by
   Hadoop's MapReduce API, but rather just a sequence of values.  Thus
-  we provide here a layer on top of Hadoop's MapReduce API which
-  eliminates the key/value distinction.</p>
+  we provide here a layer on top of Hadoop's MapReduce API.</p>
 
-<p>To use this for jobs whose input and output are Avro data files:
+<p>In all cases, input and output paths are set and jobs are submitted
+  as with standard Hadoop jobs:
+ <ul>
+   <li>Specify input files with {@link
+   org.apache.hadoop.mapred.FileInputFormat#setInputPaths}</li>
+   <li>Specify an output directory with {@link
+   org.apache.hadoop.mapred.FileOutputFormat#setOutputPath}</li>
+   <li>Run your job with {@link org.apache.hadoop.mapred.JobClient#runJob}</li>
+ </ul>
+</p>
+
+<p>For jobs whose input and output are Avro data files:
  <ul>
    <li>Call {@link org.apache.avro.mapred.AvroJob#setInputSchema} and
    {@link org.apache.avro.mapred.AvroJob#setOutputSchema} with your
@@ -38,11 +48,53 @@ Avro data, with map and reduce functions
    this as your job's reducer and perhaps combiner, with {@link
    org.apache.avro.mapred.AvroJob#setReducerClass} and {@link
    org.apache.avro.mapred.AvroJob#setCombinerClass}</li>
-   <li>Specify input files with {@link org.apache.hadoop.mapred.FileInputFormat#setInputPaths}</li>
-   <li>Specify an output directory with {@link
-   org.apache.hadoop.mapred.FileOutputFormat#setOutputPath}</li>
-   <li>Run your job with {@link org.apache.hadoop.mapred.JobClient#runJob}</li>
  </ul>
 </p>
+
+<p>For jobs whose input is an Avro data file and which use an {@link
+  org.apache.avro.mapred.AvroMapper}, but whose reducer is a non-Avro
+  {@link org.apache.hadoop.mapred.Reducer} and whose output is a
+  non-Avro format:
+ <ul>
+   <li>Call {@link org.apache.avro.mapred.AvroJob#setInputSchema} with your
+   job's input schema.</li>
+   <li>Subclass {@link org.apache.avro.mapred.AvroMapper} and specify
+   this as your job's mapper with {@link
+   org.apache.avro.mapred.AvroJob#setMapperClass}</li>
+   <li>Implement {@link org.apache.hadoop.mapred.Reducer} and specify
+   your job's reducer and combiner with {@link
+   org.apache.hadoop.mapred.JobConf#setReducerClass} and {@link
+   org.apache.hadoop.mapred.JobConf#setCombinerClass}.  The input key
+   and value types should be {@link org.apache.avro.mapred.AvroKey} and {@link
+   org.apache.avro.mapred.AvroValue}.</li>
+   <li>Specify your job's output key and value types {@link
+   org.apache.hadoop.mapred.JobConf#setOutputKeyClass} and {@link
+   org.apache.hadoop.mapred.JobConf#setOutputValueClass}.</li>
+   <li>Specify your job's output format {@link
+   org.apache.hadoop.mapred.JobConf#setOutputFormat}.</li>
+ </ul>
+</p>
+
+<p>For jobs whose input is non-Avro data file and which use a
+  non-Avro {@link org.apache.hadoop.mapred.Mapper}, but whose reducer
+  is an {@link org.apache.avro.mapred.AvroReducer} and whose output is
+  an Avro data file:
+ <ul>
+   <li>Set your input file format with {@link
+   org.apache.hadoop.mapred.JobConf#setInputFormat}.</li>
+   <li>Implement {@link org.apache.hadoop.mapred.Mapper} and specify
+   your job's mapper with {@link
+   org.apache.hadoop.mapred.JobConf#setMapperClass}.  The output key
+   and value type should be {@link org.apache.avro.mapred.AvroKey} and
+   {@link org.apache.avro.mapred.AvroValue}.</li>
+   <li>Subclass {@link org.apache.avro.mapred.AvroReducer} and specify
+   this as your job's reducer and perhaps combiner, with {@link
+   org.apache.avro.mapred.AvroJob#setReducerClass} and {@link
+   org.apache.avro.mapred.AvroJob#setCombinerClass}</li>
+   <li>Call {@link org.apache.avro.mapred.AvroJob#setOutputSchema} with your
+   job's output schema.</li>
+ </ul>
+</p>
+
 </body>
 </html>

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java Tue Nov 23 20:10:02 2010
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEqu
 import java.io.IOException;
 import java.io.File;
 import java.net.URI;
+import java.util.Iterator;
 
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.conf.Configuration;
@@ -33,11 +34,19 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.FileReader;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -49,6 +58,10 @@ public class TestSequenceFileReader {
     = new File(System.getProperty("test.dir", "/tmp"));
   private static final File FILE = new File(DIR, "test.seq");
 
+  private static final Schema SCHEMA
+    = Pair.getPairSchema(Schema.create(Schema.Type.LONG),
+                         Schema.create(Schema.Type.STRING));
+
   @BeforeClass
   public static void testWriteSequenceFile() throws IOException {
     FILE.delete();
@@ -91,15 +104,16 @@ public class TestSequenceFileReader {
 
     output.getFileSystem(job).delete(output);
     
-    Schema schema = Pair.getPairSchema(Schema.create(Schema.Type.LONG),
-                                       Schema.create(Schema.Type.STRING));
-
+    // configure input for Avro from sequence file
     AvroJob.setInputSequenceFile(job);
+    FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+    AvroJob.setInputSchema(job, SCHEMA);
 
-    AvroJob.setInputSchema(job, schema);
-    AvroJob.setOutputSchema(job, schema);
+    // mapper is default, identity
+    // reducer is default, identity
 
-    FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+    // configure output for avro
+    AvroJob.setOutputSchema(job, SCHEMA);
     FileOutputFormat.setOutputPath(job, output);
     
     JobClient.runJob(job);
@@ -109,4 +123,88 @@ public class TestSequenceFileReader {
                new SpecificDatumReader<Pair<Long,CharSequence>>()));
   }
 
+  private static class NonAvroMapper
+    extends MapReduceBase
+    implements Mapper<LongWritable,Text,AvroKey<Long>,AvroValue<Utf8>> {
+    
+    public void map(LongWritable key, Text value, 
+                  OutputCollector<AvroKey<Long>,AvroValue<Utf8>> out, 
+                  Reporter reporter) throws IOException {
+      out.collect(new AvroKey<Long>(key.get()),
+                  new AvroValue<Utf8>(new Utf8(value.toString())));
+    }
+  }
+
+  @Test
+  public void testNonAvroMapper() throws Exception {
+    JobConf job = new JobConf();
+    Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
+
+    output.getFileSystem(job).delete(output);
+    
+    // configure input for non-Avro sequence file
+    job.setInputFormat(SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+
+    // use a hadoop mapper that emits Avro output
+    job.setMapperClass(NonAvroMapper.class);
+
+    // reducer is default, identity
+
+    // configure output for avro
+    FileOutputFormat.setOutputPath(job, output);
+    AvroJob.setOutputSchema(job, SCHEMA);
+
+    JobClient.runJob(job);
+
+    checkFile(new DataFileReader<Pair<Long,CharSequence>>
+              (new File(output.toString()+"/part-00000.avro"),
+               new SpecificDatumReader<Pair<Long,CharSequence>>()));
+  }
+
+  private static class NonAvroReducer
+    extends MapReduceBase
+    implements Reducer<AvroKey<Long>,AvroValue<Utf8>,LongWritable,Text> {
+    
+    public void reduce(AvroKey<Long> key, Iterator<AvroValue<Utf8>> values,
+                       OutputCollector<LongWritable, Text> out, 
+                       Reporter reporter) throws IOException {
+      while (values.hasNext()) {
+        AvroValue<Utf8> value = values.next();
+        out.collect(new LongWritable(key.datum()),
+                    new Text(value.datum().toString()));
+      }
+    }
+  }
+
+  @Test
+  public void testNonAvroReducer() throws Exception {
+    JobConf job = new JobConf();
+    Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
+
+    output.getFileSystem(job).delete(output);
+    
+    // configure input for Avro from sequence file
+    AvroJob.setInputSequenceFile(job);
+    AvroJob.setInputSchema(job, SCHEMA);
+    FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+
+    // mapper is default, identity
+
+    // use a hadoop reducer that consumes Avro input
+    AvroJob.setMapOutputSchema(job, SCHEMA);
+    job.setReducerClass(NonAvroReducer.class);
+
+    // configure output for non-Avro SequenceFile
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(job, output);
+
+    // output key/value classes are default, LongWritable/Text
+
+    JobClient.runJob(job);
+
+    checkFile(new SequenceFileReader<Long,CharSequence>
+              (new File(output.toString()+"/part-00000")));
+  }
+
 }

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java Tue Nov 23 20:10:02 2010
@@ -56,7 +56,7 @@ public class TestWeather {
     job.setJobName("identity map weather");
     
     AvroJob.setInputSchema(job, Weather.SCHEMA$);
-    AvroJob.setMapOutputSchema(job, Weather.SCHEMA$);
+    AvroJob.setOutputSchema(job, Weather.SCHEMA$);
 
     FileInputFormat.setInputPaths(job, input);
     FileOutputFormat.setOutputPath(job, output);