You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/07/21 23:14:31 UTC

svn commit: r966422 [1/2] - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/generic/ lang/java/src/java/org/apache/avro/mapred/ lang/java/src/java/org/apache/avro/mapred/tether/ lang/java/src/java/org/apache/avro/reflect/ lang/java/src/java/org/a...

Author: cutting
Date: Wed Jul 21 21:14:29 2010
New Revision: 966422

URL: http://svn.apache.org/viewvc?rev=966422&view=rev
Log:
AVRO-580, AVRO-581.  Java: Update MapReduce APIs to use key/value pairs for intermediate data.

Added:
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc
    avro/trunk/share/test/data/
    avro/trunk/share/test/data/weather-sorted.avro   (with props)
    avro/trunk/share/test/data/weather.avro   (with props)
    avro/trunk/share/test/data/weather.json
    avro/trunk/share/test/schemas/weather.avsc
Removed:
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
    avro/trunk/share/test/schemas/WordCount.avsc
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java
    avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java
    avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java
    avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html
    avro/trunk/lang/java/src/test/bin/test_tools.sh
    avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/specific/TestSpecificData.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Jul 21 21:14:29 2010
@@ -32,6 +32,10 @@ Avro 1.4.0 (unreleased)
     AVRO-405: Java: Add Netty-based RPC transceiver and server
     implementation. (Harry Wang via cutting)
 
+    AVRO-580. Permit intermixing of generic and specific data.
+    SpecificDatumReader and SpecificDatumWriter will now use generic
+    types when no specific class is available.  (cutting)
+
   IMPROVEMENTS
     AVRO-584. Update Histogram for Stats Plugin
     (Patrick Wendell via philz)
@@ -69,6 +73,9 @@ Avro 1.4.0 (unreleased)
     AVRO-596. Start Netty server eagerly in constructor.
     (Patrick Linehan via cutting)
 
+    AVRO-581. Java: Update MapReduce APIs to use key/value pairs for
+    intermediate data.  (cutting)
+
   BUG FIXES
 
     AVRO-502. Memory leak from parsing JSON schema.

Modified: avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java Wed Jul 21 21:14:29 2010
@@ -19,8 +19,12 @@ package org.apache.avro.generic;
 
 /** A record implementation that permits field access by integer index.*/
 public interface IndexedRecord extends GenericContainer {
-  /** Set the value of a field given its position in the schema. */
+  /** Set the value of a field given its position in the schema.
+   * <p>This method is not meant to be called by user code, but only by {@link
+   * org.apache.avro.io.DatumReader} implementations. */
   void put(int i, Object v);
-  /** Return the value of a field given its position in the schema. */
+  /** Return the value of a field given its position in the schema.
+   * <p>This method is not meant to be called by user code, but only by {@link
+   * org.apache.avro.io.DatumWriter} implementations. */
   Object get(int i);
 }

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configured;
+
+/** A collector for map and reduce output. */
+public abstract class AvroCollector<T> extends Configured {
+  public abstract void collect(T datum) throws IOException;
+}

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Wed Jul 21 21:14:29 2010
@@ -20,7 +20,6 @@ package org.apache.avro.mapred;
 
 import java.util.Collection;
 
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.avro.Schema;
@@ -29,12 +28,9 @@ import org.apache.avro.Schema;
 public class AvroJob {
   private AvroJob() {}                            // no public ctor
 
-  static final String API_GENERIC = "generic";
-  static final String API_SPECIFIC = "specific";
-
-  static final String INPUT_API = "avro.input.api";
-  static final String OUTPUT_API = "avro.output.api";
-  static final String MAP_OUTPUT_API = "avro.map.output.api";
+  static final String MAPPER = "avro.mapper";
+  static final String COMBINER = "avro.combiner";
+  static final String REDUCER = "avro.reducer";
 
   /** The configuration key for a job's input schema. */
   public static final String INPUT_SCHEMA = "avro.input.schema";
@@ -43,86 +39,86 @@ public class AvroJob {
   /** The configuration key for a job's output schema. */
   public static final String OUTPUT_SCHEMA = "avro.output.schema";
 
-  /** Configure a job's map input to use Avro's generic API. */
-  public static void setInputGeneric(JobConf job, Schema s) {
-    job.set(INPUT_API, API_GENERIC);
-    configureAvroInput(job, s);
-  }
-
-  /** Configure a job's map input to use Avro's specific API. */
-  public static void setInputSpecific(JobConf job, Schema s) {
-    job.set(INPUT_API, API_SPECIFIC);
-    configureAvroInput(job, s);
-  }
-
-  private static void configureAvroInput(JobConf job, Schema s) {
+  /** Configure a job's map input schema. */
+  public static void setInputSchema(JobConf job, Schema s) {
     job.set(INPUT_SCHEMA, s.toString());
-    job.setInputFormat(AvroInputFormat.class);
-  }
-
-  /** Configure a job's map output key schema using Avro's generic API. */
-  public static void setMapOutputGeneric(JobConf job, Schema s) {
-    job.set(MAP_OUTPUT_API, API_GENERIC);
-    setMapOutputSchema(job, s);
-    configureAvroOutput(job);
-  }
-
-  /** Configure a job's map output key schema using Avro's specific API. */
-  public static void setMapOutputSpecific(JobConf job, Schema s) {
-    job.set(MAP_OUTPUT_API, API_SPECIFIC);
-    setMapOutputSchema(job, s);
-    configureAvroOutput(job);
-  }
-
-  /** Configure a job's output key schema using Avro's generic API. */
-  public static void setOutputGeneric(JobConf job, Schema s) {
-    job.set(OUTPUT_API, API_GENERIC);
-    setOutputSchema(job, s);
-    configureAvroOutput(job);
+    configureAvroJob(job);
   }
 
-  /** Configure a job's output key schema using Avro's specific API. */
-  public static void setOutputSpecific(JobConf job, Schema s) {
-    job.set(OUTPUT_API, API_SPECIFIC);
-    setOutputSchema(job, s);
-    configureAvroOutput(job);
+  /** Return a job's map input schema. */
+  public static Schema getInputSchema(Configuration job) {
+    return Schema.parse(job.get(INPUT_SCHEMA));
   }
 
-  /** Set a job's map output key schema. */
+  /** Configure a job's map output schema.  The map output schema defaults to
+   * the output schema and need only be specified when it differs.  Thus must
+   * be a {@link Pair} schema. */
   public static void setMapOutputSchema(JobConf job, Schema s) {
     job.set(MAP_OUTPUT_SCHEMA, s.toString());
+    configureAvroJob(job);
   }
 
   /** Return a job's map output key schema. */
   public static Schema getMapOutputSchema(Configuration job) {
-    return Schema.parse(job.get(AvroJob.MAP_OUTPUT_SCHEMA,
-                                job.get(AvroJob.OUTPUT_SCHEMA)));
+    return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA)));
   }
 
-  /** Set a job's output key schema. */
+  /** Configure a job's output schema.  Unless this is a map-only job, this
+   * must be a {@link Pair} schema. */
   public static void setOutputSchema(JobConf job, Schema s) {
     job.set(OUTPUT_SCHEMA, s.toString());
+    configureAvroJob(job);
   }
 
   /** Return a job's output key schema. */
   public static Schema getOutputSchema(Configuration job) {
-    return Schema.parse(job.get(AvroJob.OUTPUT_SCHEMA));
+    return Schema.parse(job.get(OUTPUT_SCHEMA));
   }
 
-  private static void configureAvroOutput(JobConf job) {
+  private static void configureAvroJob(JobConf job) {
+    if (job.get("mapred.input.format.class") == null)
+      job.setInputFormat(AvroInputFormat.class);
+    if (job.get("mapred.output.format.class") == null)
+      job.setOutputFormat(AvroOutputFormat.class);
+
     job.setOutputKeyClass(AvroWrapper.class);
     job.setOutputKeyComparatorClass(AvroKeyComparator.class);
-    job.setMapOutputValueClass(NullWritable.class);
-    job.setOutputFormat(AvroOutputFormat.class);
+    job.setMapOutputKeyClass(AvroKey.class);
+    job.setMapOutputValueClass(AvroValue.class);
+
+
+    job.setMapperClass(HadoopMapper.class);
+    job.setReducerClass(HadoopReducer.class);
 
-    // add AvroKeySerialization to io.serializations
+    // add AvroSerialization to io.serializations
     Collection<String> serializations =
       job.getStringCollection("io.serializations");
-    if (!serializations.contains(AvroKeySerialization.class.getName())) {
-      serializations.add(AvroKeySerialization.class.getName());
+    if (!serializations.contains(AvroSerialization.class.getName())) {
+      serializations.add(AvroSerialization.class.getName());
       job.setStrings("io.serializations",
                      serializations.toArray(new String[0]));
     }
   }
 
+  /** Configure a job's mapper implementation. */
+  public static void setMapperClass(JobConf job,
+                                    Class<? extends AvroMapper> c) {
+    job.set(MAPPER, c.getName());
+  }
+
+  /** Configure a job's combiner implementation. */
+  public static void setCombinerClass(JobConf job,
+                                      Class<? extends AvroReducer> c) {
+    job.set(COMBINER, c.getName());
+    job.setCombinerClass(HadoopCombiner.class);
+  }
+
+  /** Configure a job's reducer implementation. */
+  public static void setReducerClass(JobConf job,
+                                     Class<? extends AvroReducer> c) {
+    job.set(REDUCER, c.getName());
+  }
+
+  
+
 }

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+/** The wrapper of keys for jobs configured with {@link AvroJob} . */
+public class AvroKey<T> extends AvroWrapper<T> {
+  /** Wrap a key. */
+  public AvroKey(T datum) { super(datum); }
+}

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java Wed Jul 21 21:14:29 2010
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configurat
 
 import org.apache.avro.Schema;
 import org.apache.avro.io.BinaryData;
-import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificData;
 
 /** The {@link RawComparator} used by jobs configured with {@link AvroJob}. */
@@ -32,29 +31,20 @@ public class AvroKeyComparator<T>
   extends Configured implements RawComparator<AvroWrapper<T>> {
 
   private Schema schema;
-  private GenericData model;
 
   @Override
   public void setConf(Configuration conf) {
     super.setConf(conf);
-    if (conf != null) {
-      schema = AvroJob.getMapOutputSchema(conf);
-      String api = getConf().get(AvroJob.MAP_OUTPUT_API,
-                                 getConf().get(AvroJob.OUTPUT_API));
-      model = AvroJob.API_SPECIFIC.equals(api)
-        ? SpecificData.get()
-        : GenericData.get();
-    }
+    if (conf != null)
+      schema = Pair.getKeySchema(AvroJob.getMapOutputSchema(conf));
   }
 
   public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-    int diff = BinaryData.compare(b1, s1, b2, s2, schema);
-    return diff == 0 ? -1 : diff;
+    return BinaryData.compare(b1, s1, b2, s2, schema);
   }
 
   public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
-    int diff = model.compare(x.datum(), y.datum(), schema);
-    return diff == 0 ? -1 : diff;
+    return SpecificData.get().compare(x.datum(), y.datum(), schema);
   }
 
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java Wed Jul 21 21:14:29 2010
@@ -20,47 +20,22 @@ package org.apache.avro.mapred;
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.conf.Configured;
 
-/** A {@link Mapper} for Avro data.
+/** A mapper for Avro data.
  *
- * <p>Applications should subclass this class and pass their subclass to {@link
- * org.apache.hadoop.mapred.JobConf#setMapperClass(Class)}.  Subclasses must
- * override {@link #map} and may call {@link #collect} to generate output.
+ * <p>Applications subclass this class and pass their subclass to {@link
+ * AvroJob#setMapperClass}, overriding {@link #map}.
  */
-public abstract class AvroMapper<IN,OUT> extends MapReduceBase
-  implements Mapper<AvroWrapper<IN>, NullWritable,
-                    AvroWrapper<OUT>, NullWritable> {
-    
-  private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
-  private Reporter reporter;
-
-  public void map(AvroWrapper<IN> wrapper, NullWritable value, 
-                  OutputCollector<AvroWrapper<OUT>, NullWritable> output, 
-                  Reporter reporter) throws IOException {
-    if (this.out == null) {
-      this.out = output;
-      this.reporter = reporter;
-    }
-    map(wrapper.datum());
-  }
-
-  /** Return the {@link Reporter} to permit status updates. */
-  public Reporter getReporter() { return reporter; }
-
-  /** Called with each map input datum. */
-  public abstract void map(IN datum) throws IOException;
+public class AvroMapper<IN,OUT> extends Configured {
 
-  private final AvroWrapper<OUT> outputWrapper = new AvroWrapper<OUT>(null);
-
-  /** Call with each map output datum. */
-  public void collect(OUT datum) throws IOException {
-    outputWrapper.datum(datum);
-    out.collect(outputWrapper, NullWritable.get());
+  /** Called with each map input datum.  By default, collects inputs. */
+  @SuppressWarnings("unchecked")
+  public void map(IN datum, AvroCollector<OUT> collector, Reporter reporter)
+    throws IOException {
+    collector.collect((OUT)datum);
   }
 
+
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java Wed Jul 21 21:14:29 2010
@@ -30,8 +30,6 @@ import org.apache.hadoop.mapred.RecordWr
 import org.apache.hadoop.util.Progressable;
 
 import org.apache.avro.Schema;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.CodecFactory;
@@ -60,14 +58,13 @@ public class AvroOutputFormat <T>
                     String name, Progressable prog)
     throws IOException {
 
-    Schema schema = AvroJob.getOutputSchema(job);
+    boolean isMapOnly = job.getNumReduceTasks() == 0;
+    Schema schema = isMapOnly
+      ? AvroJob.getMapOutputSchema(job)
+      : AvroJob.getOutputSchema(job);
 
-    DatumWriter<T> datumWriter =
-      AvroJob.API_SPECIFIC.equals(job.get(AvroJob.OUTPUT_API))
-      ? new SpecificDatumWriter<T>()
-      : new GenericDatumWriter<T>();
-
-    final DataFileWriter<T> writer = new DataFileWriter<T>(datumWriter);
+    final DataFileWriter<T> writer =
+      new DataFileWriter<T>(new SpecificDatumWriter<T>());
 
     if (FileOutputFormat.getCompressOutput(job)) {
       int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java Wed Jul 21 21:14:29 2010
@@ -25,9 +25,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.RecordReader;
 
+import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 
 /** An {@link RecordReader} for Avro data files. */
@@ -42,12 +41,9 @@ public class AvroRecordReader<T>
   public AvroRecordReader(JobConf job, FileSplit split)
     throws IOException {
     this.in = new FsInput(split.getPath(), job);
-    DatumReader<T> datumReader =
-      AvroJob.API_SPECIFIC.equals(job.get(AvroJob.INPUT_API))
-      ? new SpecificDatumReader<T>()
-      : new GenericDatumReader<T>();
 
-    this.reader = new DataFileReader<T>(in, datumReader);
+    Schema s = AvroJob.getInputSchema(job);
+    this.reader = new DataFileReader<T>(in, new SpecificDatumReader<T>(s));
 
     reader.sync(split.getStart());                    // sync to start
     this.start = in.tell();

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java Wed Jul 21 21:14:29 2010
@@ -19,54 +19,33 @@
 package org.apache.avro.mapred;
 
 import java.io.IOException;
-import java.util.Iterator;
 
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.conf.Configured;
 
-/** A {@link Reducer} for Avro data.
+/** A reducer for Avro data.
  *
  * <p>Applications should subclass this class and pass their subclass to {@link
- * org.apache.hadoop.mapred.JobConf#setReducerClass(Class)} and perhaps {@link
- * org.apache.hadoop.mapred.JobConf#setCombinerClass(Class)} Subclasses must
- * override {@link #reduce} and may call {@link #collect} to generate output.
- *
- * <p>Note that reducers here are not passed an iterator of all matching
- * values.  Rather, the reducer is called with every value.  If values are to
- * be combined then the reducer must maintain state accordingly.  The final
- * value may be flushed by overriding {@link #close} to call {@link #collect}.
+ * AvroJob#setReducerClass} and perhaps {@link AvroJob#setCombinerClass}.
+ * Subclasses override {@link #reduce}.
  */
-public abstract class AvroReducer<IN,OUT> extends MapReduceBase
-  implements Reducer<AvroWrapper<IN>, NullWritable,
-                     AvroWrapper<OUT>, NullWritable> {
-    
-  private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
-  private Reporter reporter;
 
-  private final AvroWrapper<OUT> outputWrapper = new AvroWrapper<OUT>(null);
+public class AvroReducer<K,V,OUT> extends Configured {
+
+  private Pair<K,V> outputPair;
 
-  public void reduce(AvroWrapper<IN> wrapper, Iterator<NullWritable> ignore,
-                     OutputCollector<AvroWrapper<OUT>,NullWritable> output, 
+  /** Called with all map output values with a given key.  By default, pairs
+   * key with each value, collecting {@link Pair} instances. */
+  @SuppressWarnings("unchecked")
+  public void reduce(K key, Iterable<V> values,
+                     AvroCollector<OUT> collector,
                      Reporter reporter) throws IOException {
-    if (this.out == null) {
-      this.out = output;
-      this.reporter = reporter;
+    if (outputPair == null)
+      outputPair = new Pair<K,V>(AvroJob.getOutputSchema(getConf()));
+    for (V value : values) {
+      outputPair.set(key, value);
+      collector.collect((OUT)outputPair);
     }
-    reduce(wrapper.datum());
-  }
-    
-  /** Return the {@link Reporter} to permit status updates. */
-  public Reporter getReporter() { return reporter; }
-
-  /** Called with each reduce input datum for this partition, in order. */
-  public abstract void reduce(IN datum) throws IOException;
-
-  /** Call with each final output datum. */
-  public void collect(OUT datum) throws IOException {
-    outputWrapper.datum(datum);
-    out.collect(outputWrapper, NullWritable.get());
   }
+
 }

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+/** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
+public class AvroSerialization<T> extends Configured 
+  implements Serialization<AvroWrapper<T>> {
+
+  public boolean accept(Class<?> c) {
+    return AvroWrapper.class.isAssignableFrom(c);
+  }
+  
+  /** Returns the specified map output deserializer.  Defaults to the final
+   * output deserializer if no map output schema was specified. */
+  public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
+    //  We need not rely on mapred.task.is.map here to determine whether map
+    //  output or final output is desired, since the mapreduce framework never
+    //  creates a deserializer for final output, only for map output.
+    boolean isKey = AvroKey.class.isAssignableFrom(c);
+    Schema schema = isKey
+      ? Pair.getKeySchema(AvroJob.getMapOutputSchema(getConf()))
+      : Pair.getValueSchema(AvroJob.getMapOutputSchema(getConf()));
+    return new AvroWrapperDeserializer(new SpecificDatumReader<T>(schema),
+                                       isKey);
+  }
+  
+  private static final DecoderFactory FACTORY = new DecoderFactory();
+  static { FACTORY.configureDirectDecoder(true); }
+
+  private class AvroWrapperDeserializer
+    implements Deserializer<AvroWrapper<T>> {
+
+    private DatumReader<T> reader;
+    private BinaryDecoder decoder;
+    private boolean isKey;
+    
+    public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) {
+      this.reader = reader;
+      this.isKey = isKey;
+    }
+    
+    public void open(InputStream in) {
+      this.decoder = FACTORY.createBinaryDecoder(in, decoder);
+    }
+    
+    public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper)
+      throws IOException {
+      T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
+      if (wrapper == null) {
+        wrapper = isKey? new AvroKey<T>(datum) : new AvroValue<T>(datum);
+      } else {
+        wrapper.datum(datum);
+      }
+      return wrapper;
+    }
+
+    public void close() throws IOException {
+      decoder.inputStream().close();
+    }
+    
+  }
+  
+  /** Returns the specified output serializer. */
+  public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
+    // Here we must rely on mapred.task.is.map to tell whether the map output
+    // or final output is needed.
+    boolean isMap = getConf().getBoolean("mapred.task.is.map", false);
+    Schema schema = !isMap
+      ? AvroJob.getOutputSchema(getConf())
+      : (AvroKey.class.isAssignableFrom(c)
+         ? Pair.getKeySchema(AvroJob.getMapOutputSchema(getConf()))
+         : Pair.getValueSchema(AvroJob.getMapOutputSchema(getConf())));
+    return new AvroWrapperSerializer(new SpecificDatumWriter<T>(schema));
+  }
+
+  private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {
+
+    private DatumWriter<T> writer;
+    private OutputStream out;
+    private BinaryEncoder encoder;
+    
+    public AvroWrapperSerializer(DatumWriter<T> writer) {
+      this.writer = writer;
+    }
+
+    public void open(OutputStream out) {
+      this.out = out;
+      this.encoder = new BinaryEncoder(out);
+    }
+
+    public void serialize(AvroWrapper<T> wrapper) throws IOException {
+      writer.write(wrapper.datum(), encoder);
+    }
+
+    public void close() throws IOException {
+      out.close();
+    }
+
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+/** The wrapper of values for jobs configured with {@link AvroJob} . */
+public class AvroValue<T> extends AvroWrapper<T> {
+  /** Wrap a value. */
+  public AvroValue(T datum) { super(datum); }
+}

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java Wed Jul 21 21:14:29 2010
@@ -18,11 +18,11 @@
 
 package org.apache.avro.mapred;
 
-/** The wrapper of values for jobs configured with {@link AvroJob} . */
+/** The wrapper of data for jobs configured with {@link AvroJob} . */
 public class AvroWrapper<T> {
   private T datum;
 
-  /** Wrap a value datum. */
+  /** Wrap a datum. */
   public AvroWrapper(T datum) { this.datum = datum; }
 
   /** Return the wrapped datum. */

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** Bridge between a {@link org.apache.hadoop.mapred.Reducer} and an {@link
+ * AvroReducer} used when combining.  When combining, map output pairs must be
+ * split before they're collected. */
+class HadoopCombiner<K,V>
+  extends HadoopReducerBase<K,V,Pair<K,V>,AvroKey<K>,AvroValue<V>> {
+
+  @Override @SuppressWarnings("unchecked")
+  protected AvroReducer<K,V,Pair<K,V>> getReducer(JobConf conf) {
+    return ReflectionUtils.newInstance
+      (conf.getClass(AvroJob.COMBINER, AvroReducer.class, AvroReducer.class),
+       conf);
+  }
+
+  private class PairCollector extends AvroCollector<Pair<K,V>> {
+    private final AvroKey<K> keyWrapper = new AvroKey<K>(null);
+    private final AvroValue<V> valueWrapper = new AvroValue<V>(null);
+    private OutputCollector<AvroKey<K>,AvroValue<V>> collector;
+  
+    public PairCollector(OutputCollector<AvroKey<K>,AvroValue<V>> collector) {
+      this.collector = collector;
+    }
+
+    public void collect(Pair<K,V> datum) throws IOException {
+      keyWrapper.datum(datum.key());              // split the Pair
+      valueWrapper.datum(datum.value());
+      collector.collect(keyWrapper, valueWrapper);
+    }
+  }
+
+  @Override
+  protected AvroCollector<Pair<K,V>>
+    getCollector(OutputCollector<AvroKey<K>,AvroValue<V>> collector) {
+    return new PairCollector(collector);
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** Bridge between a {@link org.apache.hadoop.mapred.Mapper} and an {@link
+ * AvroMapper}.  Outputs are written directly when a job is map-only, but are
+ * otherwise assumed to be pairs that are split. */
+class HadoopMapper<IN,OUT,K,V,KO,VO> extends MapReduceBase
+  implements Mapper<AvroWrapper<IN>, NullWritable, KO, VO> {
+    
+  private AvroMapper<IN,OUT> mapper;
+  private MapCollector out;
+  private boolean isMapOnly;
+
+  @Override @SuppressWarnings("unchecked")
+  public void configure(JobConf conf) {
+    this.mapper =
+      ReflectionUtils.newInstance
+      (conf.getClass(AvroJob.MAPPER, AvroMapper.class, AvroMapper.class),
+       conf);
+    this.isMapOnly = conf.getNumReduceTasks() == 0;
+  }
+
+  @SuppressWarnings("unchecked")
+  private class MapCollector extends AvroCollector<OUT> {
+    private final AvroWrapper<OUT> wrapper = new AvroWrapper<OUT>(null);
+    private final AvroKey<K> keyWrapper = new AvroKey(null);
+    private final AvroValue<V> valueWrapper = new AvroValue(null);
+    private OutputCollector<KO,VO> collector;
+
+    public MapCollector(OutputCollector<KO,VO> collector) {
+      this.collector = collector;
+    }
+
+    public void collect(OUT datum) throws IOException {
+      if (isMapOnly) {
+        wrapper.datum(datum);
+        collector.collect((KO)wrapper, (VO)NullWritable.get());
+      } else {                                    // split a pair
+        Pair<K,V> pair = (Pair<K,V>)datum;
+        keyWrapper.datum(pair.key());
+        valueWrapper.datum(pair.value());
+        collector.collect((KO)keyWrapper, (VO)valueWrapper);
+      }
+    }
+  }
+
+  @Override
+  public void map(AvroWrapper<IN> wrapper, NullWritable value, 
+                  OutputCollector<KO,VO> collector, 
+                  Reporter reporter) throws IOException {
+    if (this.out == null)
+      this.out = new MapCollector(collector);
+    mapper.map(wrapper.datum(), out, reporter);
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** Bridge between a {@link org.apache.hadoop.mapred.Reducer} and an {@link
+ * AvroReducer}. */
+class HadoopReducer<K,V,OUT>
+  extends HadoopReducerBase<K,V, OUT, AvroWrapper<OUT>, NullWritable> {
+
+  @Override @SuppressWarnings("unchecked")
+  protected AvroReducer<K,V,OUT> getReducer(JobConf conf) {
+    return ReflectionUtils.newInstance
+      (conf.getClass(AvroJob.REDUCER, AvroReducer.class, AvroReducer.class),
+       conf);
+  }
+
+  private class ReduceCollector extends AvroCollector<OUT> {
+    private final AvroWrapper<OUT> wrapper = new AvroWrapper<OUT>(null);
+    private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
+
+    public ReduceCollector(OutputCollector<AvroWrapper<OUT>,NullWritable> out) {
+      this.out = out;
+    }
+
+    public void collect(OUT datum) throws IOException {
+      wrapper.datum(datum);
+      out.collect(wrapper, NullWritable.get());
+    }
+  }
+
+  @Override
+  protected AvroCollector<OUT>
+    getCollector(OutputCollector<AvroWrapper<OUT>, NullWritable> collector) {
+    return new ReduceCollector(collector);
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Reducer;
+
+abstract class HadoopReducerBase<K,V,OUT,KO,VO> extends MapReduceBase
+  implements Reducer<AvroKey<K>, AvroValue<V>, KO, VO> {
+  
+  private AvroReducer<K,V,OUT> reducer;
+  private AvroCollector<OUT> collector;
+  
+  protected abstract AvroReducer<K,V,OUT> getReducer(JobConf conf);
+  protected abstract AvroCollector<OUT> getCollector(OutputCollector<KO,VO> c);
+
+  @Override
+  public void configure(JobConf conf) {
+    this.reducer = getReducer(conf);
+  }
+
+  class ReduceIterable implements Iterable<V>, Iterator<V> {
+    private Iterator<AvroValue<V>> values;
+    public boolean hasNext() { return values.hasNext(); }
+    public V next() { return values.next().datum(); }
+    public void remove() { throw new UnsupportedOperationException(); }
+    public Iterator<V> iterator() { return this; }
+  }
+  private ReduceIterable reduceIterable = new ReduceIterable();
+
+  @Override
+  public final void reduce(AvroKey<K> key, Iterator<AvroValue<V>> values,
+                           OutputCollector<KO, VO> out, 
+                           Reporter reporter) throws IOException {
+    if (this.collector == null) 
+      this.collector = getCollector(out);
+    reduceIterable.values = values;
+    reducer.reduce(key.datum(), reduceIterable, collector, reporter);
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificDatumReader.SchemaConstructable;
+import org.apache.avro.util.Utf8;
+
+/** A key/value pair. */
+public class Pair<K,V>
+  implements IndexedRecord, Comparable<Pair>, SchemaConstructable {
+
+  private static final String PAIR = Pair.class.getName();
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+
+  private Schema schema;
+  private K key;
+  private V value;
+
+  public Pair(Schema schema) {
+    checkIsPairSchema(schema);
+    this.schema = schema;
+  }
+
+  public Pair(K key, Schema keySchema, V value, Schema valueSchema) {
+    this.schema = getPairSchema(keySchema, valueSchema);
+    this.key = key;
+    this.value = value;
+  }
+
+  private static void checkIsPairSchema(Schema schema) {
+    if (!PAIR.equals(schema.getFullName()))
+      throw new IllegalArgumentException("Not a Pair schema: "+schema);
+  }
+
+  /** Return a pair's key schema. */
+  public static Schema getKeySchema(Schema pair) {
+    checkIsPairSchema(pair);
+    return pair.getField(KEY).schema();
+  }
+
+  /** Return a pair's value schema. */
+  public static Schema getValueSchema(Schema pair) {
+    checkIsPairSchema(pair);
+    return pair.getField(VALUE).schema();
+  }
+
+  private static final Map<Schema,Map<Schema,Schema>> SCHEMA_CACHE = 
+    new WeakHashMap<Schema,Map<Schema,Schema>>();
+
+  /** Get a pair schema. */
+  public static Schema getPairSchema(Schema key, Schema value) {
+    Map<Schema,Schema> valueSchemas;
+    synchronized (SCHEMA_CACHE) {
+      valueSchemas = SCHEMA_CACHE.get(key);
+      if (valueSchemas == null) {
+        valueSchemas = new WeakHashMap<Schema,Schema>();
+        SCHEMA_CACHE.put(key, valueSchemas);
+      }
+      Schema result;
+      result = valueSchemas.get(value);
+      if (result == null) {
+        result = makePairSchema(key, value);
+        valueSchemas.put(value, result);
+      }
+      return result;
+    }
+  }
+
+  private static Schema makePairSchema(Schema key, Schema value) {
+    Schema pair = Schema.createRecord(PAIR, null, null, false);
+    List<Field> fields = new ArrayList<Field>();
+    fields.add(new Field(KEY, key, "", null));
+    fields.add(new Field(VALUE, value, "", null, Field.Order.IGNORE));
+    pair.setFields(fields);
+    return pair;
+  }
+
+  @Override public Schema getSchema() { return schema; }
+
+  /** Get the key. */
+  public K key() { return key; }
+  /** Set the key. */
+  public void key(K key) { this.key = key; }
+
+  /** Get the value. */
+  public V value() { return value; }
+  /** Set the value. */
+  public void value(V value) { this.value = value; }
+
+  /** Set both the key and value. */
+  public void set(K key, V value) { this.key = key; this.value = value; }
+
+  @Override public boolean equals(Object o) {
+    if (o == this) return true;                 // identical object
+    if (!(o instanceof Pair)) return false;     // not a pair
+    Pair that = (Pair)o;
+    if (this.schema != that.schema)
+      return false;                             // not the same schema
+    return this.compareTo(that) == 0;
+  }
+  @Override public int hashCode() {
+    return GenericData.get().hashCode(this, schema);
+  }
+  @Override public int compareTo(Pair that) {
+    return GenericData.get().compare(this, that, schema);
+  }
+  @Override public String toString() {
+    return GenericData.get().toString(this);
+  }
+
+  @Override
+  public Object get(int i) {
+    switch (i) {
+    case 0: return key;
+    case 1: return value;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index: "+i);
+    } 
+  }
+
+  @Override @SuppressWarnings("unchecked")
+  public void put(int i, Object o) {
+    switch (i) {
+    case 0: this.key = (K)o;    break;
+    case 1: this.value = (V)o;  break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index: "+i);
+    } 
+  }
+
+  private static final Schema STRING_SCHEMA = Schema.create(Type.STRING);
+  private static final Schema BYTES_SCHEMA = Schema.create(Type.BYTES);
+  private static final Schema INT_SCHEMA = Schema.create(Type.INT);
+  private static final Schema LONG_SCHEMA = Schema.create(Type.LONG);
+  private static final Schema FLOAT_SCHEMA = Schema.create(Type.FLOAT);
+  private static final Schema DOUBLE_SCHEMA = Schema.create(Type.DOUBLE);
+  private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
+
+  @SuppressWarnings("unchecked")
+  public Pair(GenericContainer key, GenericContainer value) {
+    this((K)key, key.getSchema(), (V)value, value.getSchema());
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(GenericContainer key, Utf8 value) {
+    this((K)key, key.getSchema(), (V)value, STRING_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(GenericContainer key, ByteBuffer value) {
+    this((K)key, key.getSchema(), (V)value, BYTES_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(GenericContainer key, Integer value) {
+    this((K)key, key.getSchema(), (V)value, INT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(GenericContainer key, Long value) {
+    this((K)key, key.getSchema(), (V)value, LONG_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(GenericContainer key, Float value) {
+    this((K)key, key.getSchema(), (V)value, FLOAT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(GenericContainer key, Double value) {
+    this((K)key, key.getSchema(), (V)value, DOUBLE_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(GenericContainer key, Void value) {
+    this((K)key, key.getSchema(), (V)value, NULL_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Utf8 key, GenericContainer value) {
+    this((K)key, STRING_SCHEMA, (V)value, value.getSchema());
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Utf8 key, Utf8 value) {
+    this((K)key, STRING_SCHEMA, (V)value, STRING_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Utf8 key, ByteBuffer value) {
+    this((K)key, STRING_SCHEMA, (V)value, BYTES_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Utf8 key, Integer value) {
+    this((K)key, STRING_SCHEMA, (V)value, INT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Utf8 key, Long value) {
+    this((K)key, STRING_SCHEMA, (V)value, LONG_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Utf8 key, Float value) {
+    this((K)key, STRING_SCHEMA, (V)value, FLOAT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Utf8 key, Double value) {
+    this((K)key, STRING_SCHEMA, (V)value, DOUBLE_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Utf8 key, Void value) {
+    this((K)key, STRING_SCHEMA, (V)value, NULL_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(ByteBuffer key, GenericContainer value) {
+    this((K)key, BYTES_SCHEMA, (V)value, value.getSchema());
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(ByteBuffer key, Utf8 value) {
+    this((K)key, BYTES_SCHEMA, (V)value, STRING_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(ByteBuffer key, ByteBuffer value) {
+    this((K)key, BYTES_SCHEMA, (V)value, BYTES_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(ByteBuffer key, Integer value) {
+    this((K)key, BYTES_SCHEMA, (V)value, INT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(ByteBuffer key, Long value) {
+    this((K)key, BYTES_SCHEMA, (V)value, LONG_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(ByteBuffer key, Float value) {
+    this((K)key, BYTES_SCHEMA, (V)value, FLOAT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(ByteBuffer key, Double value) {
+    this((K)key, BYTES_SCHEMA, (V)value, DOUBLE_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(ByteBuffer key, Void value) {
+    this((K)key, BYTES_SCHEMA, (V)value, NULL_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Integer key, GenericContainer value) {
+    this((K)key, INT_SCHEMA, (V)value, value.getSchema());
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Integer key, Utf8 value) {
+    this((K)key, INT_SCHEMA, (V)value, STRING_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Integer key, ByteBuffer value) {
+    this((K)key, INT_SCHEMA, (V)value, BYTES_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Integer key, Integer value) {
+    this((K)key, INT_SCHEMA, (V)value, INT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Integer key, Long value) {
+    this((K)key, INT_SCHEMA, (V)value, LONG_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Integer key, Float value) {
+    this((K)key, INT_SCHEMA, (V)value, FLOAT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Integer key, Double value) {
+    this((K)key, INT_SCHEMA, (V)value, DOUBLE_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Integer key, Void value) {
+    this((K)key, INT_SCHEMA, (V)value, NULL_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Long key, GenericContainer value) {
+    this((K)key, LONG_SCHEMA, (V)value, value.getSchema());
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Long key, Utf8 value) {
+    this((K)key, LONG_SCHEMA, (V)value, STRING_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Long key, ByteBuffer value) {
+    this((K)key, LONG_SCHEMA, (V)value, BYTES_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Long key, Integer value) {
+    this((K)key, LONG_SCHEMA, (V)value, INT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Long key, Long value) {
+    this((K)key, LONG_SCHEMA, (V)value, LONG_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Long key, Float value) {
+    this((K)key, LONG_SCHEMA, (V)value, FLOAT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Long key, Double value) {
+    this((K)key, LONG_SCHEMA, (V)value, DOUBLE_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Long key, Void value) {
+    this((K)key, LONG_SCHEMA, (V)value, NULL_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Float key, GenericContainer value) {
+    this((K)key, FLOAT_SCHEMA, (V)value, value.getSchema());
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Float key, Utf8 value) {
+    this((K)key, FLOAT_SCHEMA, (V)value, STRING_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Float key, ByteBuffer value) {
+    this((K)key, FLOAT_SCHEMA, (V)value, BYTES_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Float key, Integer value) {
+    this((K)key, FLOAT_SCHEMA, (V)value, INT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Float key, Long value) {
+    this((K)key, FLOAT_SCHEMA, (V)value, LONG_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Float key, Float value) {
+    this((K)key, FLOAT_SCHEMA, (V)value, FLOAT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Float key, Double value) {
+    this((K)key, FLOAT_SCHEMA, (V)value, DOUBLE_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Float key, Void value) {
+    this((K)key, FLOAT_SCHEMA, (V)value, NULL_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Double key, GenericContainer value) {
+    this((K)key, DOUBLE_SCHEMA, (V)value, value.getSchema());
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Double key, Utf8 value) {
+    this((K)key, DOUBLE_SCHEMA, (V)value, STRING_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Double key, ByteBuffer value) {
+    this((K)key, DOUBLE_SCHEMA, (V)value, BYTES_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Double key, Integer value) {
+    this((K)key, DOUBLE_SCHEMA, (V)value, INT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Double key, Long value) {
+    this((K)key, DOUBLE_SCHEMA, (V)value, LONG_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Double key, Float value) {
+    this((K)key, DOUBLE_SCHEMA, (V)value, FLOAT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Double key, Double value) {
+    this((K)key, DOUBLE_SCHEMA, (V)value, DOUBLE_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Double key, Void value) {
+    this((K)key, DOUBLE_SCHEMA, (V)value, NULL_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Void key, GenericContainer value) {
+    this((K)key, NULL_SCHEMA, (V)value, value.getSchema());
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Void key, Utf8 value) {
+    this((K)key, NULL_SCHEMA, (V)value, STRING_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Void key, ByteBuffer value) {
+    this((K)key, NULL_SCHEMA, (V)value, BYTES_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Void key, Integer value) {
+    this((K)key, NULL_SCHEMA, (V)value, INT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Void key, Long value) {
+    this((K)key, NULL_SCHEMA, (V)value, LONG_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Void key, Float value) {
+    this((K)key, NULL_SCHEMA, (V)value, FLOAT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Void key, Double value) {
+    this((K)key, NULL_SCHEMA, (V)value, DOUBLE_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Void key, Void value) {
+    this((K)key, NULL_SCHEMA, (V)value, NULL_SCHEMA);
+  }
+
+  // private static final String[][] TABLE = new String[][] {
+  //   {"GenericContainer", "{0}.getSchema()"},
+  //   {"Utf8", "STRING_SCHEMA"},
+  //   {"ByteBuffer", "BYTES_SCHEMA"},
+  //   {"Integer", "INT_SCHEMA"},
+  //   {"Long", "LONG_SCHEMA"},
+  //   {"Float", "FLOAT_SCHEMA"},
+  //   {"Double", "DOUBLE_SCHEMA"},
+  //   {"Void", "NULL_SCHEMA"},
+  // };
+  
+  // private static String f(String pattern, String value) {
+  //   return java.text.MessageFormat.format(pattern, value);
+  // }
+  
+  // public static void main(String... args) throws Exception {
+  //   StringBuffer b = new StringBuffer();
+  //   for (String[] k : TABLE) {
+  //     for (String[] v : TABLE) {
+  //       b.append("@SuppressWarnings(\"unchecked\")\n");
+  //       b.append("public Pair("+k[0]+" key, "+v[0]+" value) {\n");
+  //       b.append("  this((K)key, "+f(k[1],"key")
+  //                +", (V)value, "+f(v[1],"value")+");\n");
+  //       b.append("}\n");
+  //     }
+  //   }
+  //   System.out.println(b);
+  // }
+
+
+}

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html Wed Jul 21 21:14:29 2010
@@ -28,19 +28,16 @@ Avro data, with map and reduce functions
 
 <p>To use this for jobs whose input and output are Avro data files:
  <ul>
+   <li>Call {@link org.apache.avro.mapred.AvroJob#setInputSchema} and
+   {@link org.apache.avro.mapred.AvroJob#setOutputSchema} with your
+   job's input and output schemas.</li>
    <li>Subclass {@link org.apache.avro.mapred.AvroMapper} and specify
-   this as your job's mapper.</li>
+   this as your job's mapper with {@link
+   org.apache.avro.mapred.AvroJob#setMapperClass}</li>
    <li>Subclass {@link org.apache.avro.mapred.AvroReducer} and specify
-   this as your job's reducer and perhaps combiner.</li>
-   <li>Depending on whether your mapper uses Avro's specific or
-   generic API for inputs, call one of {@link
-   org.apache.avro.mapred.AvroJob#setInputSpecific} or {@link
-   org.apache.avro.mapred.AvroJob#setInputGeneric} with your input schema.</li>
-   <li>Depending on whether your job uses Avro's specific or generic
-   API for outputs, call one of {@link
-   org.apache.avro.mapred.AvroJob#setOutputSpecific} or {@link
-   org.apache.avro.mapred.AvroJob#setOutputGeneric} with your output
-   schema.</li>
+   this as your job's reducer and perhaps combiner, with {@link
+   org.apache.avro.mapred.AvroJob#setReducerClass} and {@link
+   org.apache.avro.mapred.AvroJob#setCombinerClass}</li>
    <li>Specify input files with {@link org.apache.hadoop.mapred.FileInputFormat#setInputPaths}</li>
    <li>Specify an output directory with {@link
    org.apache.hadoop.mapred.FileOutputFormat#setOutputPath}</li>

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java Wed Jul 21 21:14:29 2010
@@ -137,9 +137,11 @@ public class TetherJob extends Configure
       FileInputFormat.addInputPaths(job, in.value(opts));
       FileOutputFormat.setOutputPath(job, out.value(opts));
       TetherJob.setExecutable(job, exec.value(opts));
-      AvroJob.setOutputSchema(job, Schema.parse(outSchema.value(opts)));
+      job.set(AvroJob.OUTPUT_SCHEMA,
+              Schema.parse(outSchema.value(opts)).toString());
       if (opts.hasArgument(mapOutSchema))
-        AvroJob.setMapOutputSchema(job, Schema.parse(mapOutSchema.value(opts)));
+        job.set(AvroJob.MAP_OUTPUT_SCHEMA,
+                Schema.parse(mapOutSchema.value(opts)).toString());
       if (opts.hasArgument(reduces))
         job.setNumReduceTasks(reduces.value(opts));
     } catch (Exception e) {

Modified: avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java Wed Jul 21 21:14:29 2010
@@ -74,7 +74,7 @@ public class ReflectDatumReader<T> exten
       }
       if (collectionClass.isAssignableFrom(ArrayList.class))
         return new ArrayList();
-      return newInstance(collectionClass);
+      return newInstance(collectionClass, schema);
     }
     Class elementClass = ReflectData.getClassProp(schema, ReflectData.ELEMENT_PROP);
     if (elementClass == null)

Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java Wed Jul 21 21:14:29 2010
@@ -72,9 +72,10 @@ public class SpecificData extends Generi
 
   private Map<String,Class> classCache = new ConcurrentHashMap<String,Class>();
 
+  private static final Class NO_CLASS = new Object(){}.getClass();
   private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
 
-  /** Return the class that implements a schema. */
+  /** Return the class that implements a schema, or null if none exists. */
   public Class getClass(Schema schema) {
     switch (schema.getType()) {
     case FIXED:
@@ -85,12 +86,12 @@ public class SpecificData extends Generi
       if (c == null) {
         try {
           c = Class.forName(getClassName(schema));
-          classCache.put(name, c);
         } catch (ClassNotFoundException e) {
-          throw new AvroRuntimeException(e);
+          c = NO_CLASS;
         }
+        classCache.put(name, c);
       }
-      return c;
+      return c == NO_CLASS ? null : c;
     case ARRAY:   return GenericArray.class;
     case MAP:     return Map.class;
     case UNION:
@@ -203,14 +204,11 @@ public class SpecificData extends Generi
   public int compare(Object o1, Object o2, Schema s) {
     switch (s.getType()) {
     case ENUM:
-      return ((Enum)o1).ordinal() - ((Enum)o2).ordinal();
+      if (!(o1 instanceof String))                // not generic
+        return ((Enum)o1).ordinal() - ((Enum)o2).ordinal();
     default:
       return super.compare(o1, o2, s);
     }
   }
 
 }
-
-
-
-

Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java Wed Jul 21 21:14:29 2010
@@ -39,46 +39,51 @@ public class SpecificDatumReader<T> exte
   @Override
   protected Object newRecord(Object old, Schema schema) {
     Class c = SpecificData.get().getClass(schema);
-    return (c.isInstance(old) ? old : newInstance(c));
-  }
-
-  @Override
-  protected void setField(Object record, String name, int position, Object o) {
-    ((SpecificRecord)record).put(position, o);
-  }
-  @Override
-  protected Object getField(Object record, String name, int position) {
-    return ((SpecificRecord)record).get(position);
+    if (c == null) return super.newRecord(old, schema); // punt to generic
+    return (c.isInstance(old) ? old : newInstance(c, schema));
   }
 
   @Override
   @SuppressWarnings("unchecked")
   protected Object createEnum(String symbol, Schema schema) {
-    return Enum.valueOf(SpecificData.get().getClass(schema), symbol);
+    Class c = SpecificData.get().getClass(schema);
+    if (c == null) return super.createEnum(symbol, schema); // punt to generic
+    return Enum.valueOf(c, symbol);
   }
 
   @Override
   protected Object createFixed(Object old, Schema schema) {
     Class c = SpecificData.get().getClass(schema);
-    return c.isInstance(old) ? old : newInstance(c);
+    if (c == null) return super.createFixed(old, schema); // punt to generic
+    return c.isInstance(old) ? old : newInstance(c, schema);
   }
 
-  private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
+  private static final Class<?>[] NO_ARG = new Class[]{};
+  private static final Class<?>[] SCHEMA_ARG = new Class[]{Schema.class};
   private static final Map<Class,Constructor> CTOR_CACHE =
     new ConcurrentHashMap<Class,Constructor>();
 
-  /** Create an instance of a class. */
+  /** Tag interface that indicates that a class has a one-argument constructor
+   * that accepts a Schema.
+   * @see SpecificDatumReader#newInstance
+   */
+  public interface SchemaConstructable {}
+
+  /** Create an instance of a class.  If the class implements {@link
+   * SchemaConstructable}, call a constructor with a {@link
+   * org.apache.avro.Schema} parameter, otherwise use a no-arg constructor. */
   @SuppressWarnings("unchecked")
-  protected static Object newInstance(Class c) {
+  protected static Object newInstance(Class c, Schema s) {
+    boolean useSchema = SchemaConstructable.class.isAssignableFrom(c);
     Object result;
     try {
       Constructor meth = (Constructor)CTOR_CACHE.get(c);
       if (meth == null) {
-        meth = c.getDeclaredConstructor(EMPTY_ARRAY);
+        meth = c.getDeclaredConstructor(useSchema ? SCHEMA_ARG : NO_ARG);
         meth.setAccessible(true);
         CTOR_CACHE.put(c, meth);
       }
-      result = meth.newInstance();
+      result = meth.newInstance(useSchema ? new Object[]{s} : (Object[])null);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java Wed Jul 21 21:14:29 2010
@@ -44,14 +44,12 @@ public class SpecificDatumWriter<T> exte
   }
   
   @Override
-  protected Object getField(Object record, String name, int position) {
-    return ((SpecificRecord)record).get(position);
-  }
-
-  @Override
   protected void writeEnum(Schema schema, Object datum, Encoder out)
     throws IOException {
-    out.writeEnum(((Enum)datum).ordinal());
+    if (!(datum instanceof Enum))
+      super.writeEnum(schema, datum, out);        // punt to generic
+    else
+      out.writeEnum(((Enum)datum).ordinal());
   }
 
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html Wed Jul 21 21:14:29 2010
@@ -36,5 +36,10 @@ Generate specific Java classes for schem
 
 </ul>
 
+<p>Note that when a generated class is not found corresponding to a
+  record, enum or fixed schema, a {@link org.apache.avro.generic
+  generic} representation is used.  This permits generated classes to
+  be nested within generic data structures.
+
 </body>
 </html>

Modified: avro/trunk/lang/java/src/test/bin/test_tools.sh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/bin/test_tools.sh?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/bin/test_tools.sh (original)
+++ avro/trunk/lang/java/src/test/bin/test_tools.sh Wed Jul 21 21:14:29 2010
@@ -81,7 +81,7 @@ $CMD getschema $TMPDIR/data_file_write.a
   | cmp -s - <(echo '"string"')
 ######################################################################
 # Test tethered mapred
-$CMD tether --in build/test/mapred/in --out build/test/mapred/tout --outschema ../../share/test/schemas/WordCount.avsc --program build/test/wordcount.jar
+$CMD tether --in build/test/mapred/in --out build/test/mapred/tout --outschema src/test/java/org/apache/avro/mapred/tether/WordCount.avsc --program build/test/wordcount.jar
 $CMD tojson build/test/mapred/tout/part-00000.avro \
   | cmp -s - <($CMD tojson build/test/mapred/out/part-00000.avro)
 

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java Wed Jul 21 21:14:29 2010
@@ -412,9 +412,9 @@ public class TestSchema {
     assertFalse(s0.equals(s2));
   }
 
-  private static void checkBinary(Schema schema, Object datum,
-                                  DatumWriter<Object> writer,
-                                  DatumReader<Object> reader)
+  public static void checkBinary(Schema schema, Object datum,
+                                 DatumWriter<Object> writer,
+                                 DatumReader<Object> reader)
     throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     writer.setSchema(schema);

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.file.DataFileReader;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import test.Weather;
+
+/** Tests mapred API with a specific record. */
+public class TestWeather {
+
+  /** Uses default mapper with no reduces for a map-only identity job. */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testMapOnly() throws Exception {
+    JobConf job = new JobConf();
+    String inDir = System.getProperty("share.dir",".")+"/test/data";
+    Path input = new Path(inDir+"/weather.avro");
+    Path output = new Path(System.getProperty("test.dir",".")+"/weather-ident");
+    
+    output.getFileSystem(job).delete(output);
+    
+    job.setJobName("identity map weather");
+    
+    AvroJob.setInputSchema(job, Weather.SCHEMA$);
+    AvroJob.setMapOutputSchema(job, Weather.SCHEMA$);
+
+    FileInputFormat.setInputPaths(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+    FileOutputFormat.setCompressOutput(job, true);
+    
+    job.setNumReduceTasks(0);                     // map-only
+    
+    JobClient.runJob(job);
+
+    // check output is correct
+    DatumReader<Weather> reader = new SpecificDatumReader<Weather>();
+    DataFileReader<Weather> check = new DataFileReader<Weather>
+      (new File(inDir+"/weather.avro"), reader);
+    DataFileReader<Weather> sorted = new DataFileReader<Weather>
+      (new File(output.toString()+"/part-00000.avro"), reader);
+
+    for (Weather w : sorted)
+      assertEquals(check.next(), w);
+
+    check.close();
+    sorted.close();
+  }
+
+  // maps input Weather to Pair<Weather,Void>, to sort by Weather
+  public static class SortMapper extends AvroMapper<Weather,Pair<Weather,Void>>{
+    @Override
+    public void map(Weather w, AvroCollector<Pair<Weather,Void>> collector,
+                      Reporter reporter) throws IOException {
+      collector.collect(new Pair<Weather,Void>(w, (Void)null));
+    }
+  }
+  
+  // output keys only, since values are empty
+  public static class SortReducer
+    extends AvroReducer<Weather, Void, Weather> {
+    @Override
+    public void reduce(Weather w, Iterable<Void> ignore,
+                       AvroCollector<Weather> collector,
+                       Reporter reporter) throws IOException {
+      collector.collect(w);
+    }
+  }    
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testSort() throws Exception {
+    JobConf job = new JobConf();
+    String inDir = System.getProperty("share.dir",".")+"/test/data";
+    Path input = new Path(inDir+"/weather.avro");
+    Path output = new Path(System.getProperty("test.dir",".")+"/weather-sort");
+    
+    output.getFileSystem(job).delete(output);
+    
+    job.setJobName("sort weather");
+    
+    AvroJob.setInputSchema(job, Weather.SCHEMA$);
+    AvroJob.setMapOutputSchema
+      (job, Pair.getPairSchema(Weather.SCHEMA$, Schema.create(Type.NULL)));
+    AvroJob.setOutputSchema(job, Weather.SCHEMA$);
+    
+    AvroJob.setMapperClass(job, SortMapper.class);        
+    AvroJob.setReducerClass(job, SortReducer.class);
+
+    FileInputFormat.setInputPaths(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+    FileOutputFormat.setCompressOutput(job, true);
+    
+    JobClient.runJob(job);
+
+    // check output is correct
+    DatumReader<Weather> reader = new SpecificDatumReader<Weather>();
+    DataFileReader<Weather> check = new DataFileReader<Weather>
+      (new File(inDir+"/weather-sorted.avro"), reader);
+    DataFileReader<Weather> sorted = new DataFileReader<Weather>
+      (new File(output.toString()+"/part-00000.avro"), reader);
+
+    for (Weather w : sorted)
+      assertEquals(check.next(), w);
+
+    check.close();
+    sorted.close();
+  }
+
+
+}

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+public class TestWordCount {
+
+  public static class MapImpl extends AvroMapper<Utf8, Pair<Utf8, Long> > {
+    @Override
+      public void map(Utf8 text, AvroCollector<Pair<Utf8,Long>> collector,
+                      Reporter reporter) throws IOException {
+      StringTokenizer tokens = new StringTokenizer(text.toString());
+      while (tokens.hasMoreTokens())
+        collector.collect(new Pair<Utf8,Long>(new Utf8(tokens.nextToken()),1L));
+    }
+  }
+  
+  public static class ReduceImpl
+    extends AvroReducer<Utf8, Long, Pair<Utf8, Long> > {
+    @Override
+    public void reduce(Utf8 word, Iterable<Long> counts,
+                       AvroCollector<Pair<Utf8,Long>> collector,
+                       Reporter reporter) throws IOException {
+      long sum = 0;
+      for (long count : counts)
+        sum += count;
+      collector.collect(new Pair<Utf8,Long>(word, sum));
+    }
+  }    
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testJob() throws Exception {
+    JobConf job = new JobConf();
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out");
+    
+    outputPath.getFileSystem(job).delete(outputPath);
+    WordCountUtil.writeLinesFile();
+    
+    job.setJobName("wordcount");
+    
+    AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
+    AvroJob.setOutputSchema(job,
+                            new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());
+    
+    AvroJob.setMapperClass(job, MapImpl.class);        
+    AvroJob.setCombinerClass(job, ReduceImpl.class);
+    AvroJob.setReducerClass(job, ReduceImpl.class);
+    
+    FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+    FileOutputFormat.setOutputPath(job, outputPath);
+    FileOutputFormat.setCompressOutput(job, true);
+    
+    JobClient.runJob(job);
+    
+    WordCountUtil.validateCountsFile();
+  }
+
+}

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java Wed Jul 21 21:14:29 2010
@@ -58,14 +58,14 @@ class WordCountUtil {
     "the rain in spain falls mainly on the plains"
   };
 
-  private static final Map<String,Integer> COUNTS =
-    new TreeMap<String,Integer>();
+  private static final Map<String,Long> COUNTS =
+    new TreeMap<String,Long>();
   static {
     for (String line : LINES) {
       StringTokenizer tokens = new StringTokenizer(line);
       while (tokens.hasMoreTokens()) {
         String word = tokens.nextToken();
-        int count = COUNTS.containsKey(word) ? COUNTS.get(word) : 0;
+        long count = COUNTS.containsKey(word) ? COUNTS.get(word) : 0L;
         count++;
         COUNTS.put(word, count);
       }
@@ -93,13 +93,15 @@ class WordCountUtil {
   }
 
   public static void validateCountsFile() throws IOException {
-    DatumReader<WordCount> reader = new SpecificDatumReader<WordCount>();
+    DatumReader<Pair<Utf8,Long>> reader
+      = new SpecificDatumReader<Pair<Utf8,Long>>();
     InputStream in = new BufferedInputStream(new FileInputStream(COUNTS_FILE));
-    DataFileStream<WordCount> counts = new DataFileStream<WordCount>(in,reader);
+    DataFileStream<Pair<Utf8,Long>> counts
+      = new DataFileStream<Pair<Utf8,Long>>(in,reader);
     int numWords = 0;
-    for (WordCount wc : counts) {
-      assertEquals(wc.word.toString(),
-                   (int)COUNTS.get(wc.word.toString()), wc.count);
+    for (Pair<Utf8,Long> wc : counts) {
+      assertEquals(wc.key().toString(),
+                   COUNTS.get(wc.key().toString()), wc.value());
       numWords++;
     }
     in.close();

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc Wed Jul 21 21:14:29 2010
@@ -0,0 +1,6 @@
+{"type":"record",
+ "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
+     {"name":"key","type":"string"},
+     {"name":"value","type":"long","order":"ignore"}
+ ]
+}