You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/07/21 23:14:31 UTC
svn commit: r966422 [1/2] - in /avro/trunk: ./
lang/java/src/java/org/apache/avro/generic/
lang/java/src/java/org/apache/avro/mapred/
lang/java/src/java/org/apache/avro/mapred/tether/
lang/java/src/java/org/apache/avro/reflect/ lang/java/src/java/org/a...
Author: cutting
Date: Wed Jul 21 21:14:29 2010
New Revision: 966422
URL: http://svn.apache.org/viewvc?rev=966422&view=rev
Log:
AVRO-580, AVRO-581. Java: Update MapReduce APIs to use key/value pairs for intermediate data.
Added:
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc
avro/trunk/share/test/data/
avro/trunk/share/test/data/weather-sorted.avro (with props)
avro/trunk/share/test/data/weather.avro (with props)
avro/trunk/share/test/data/weather.json
avro/trunk/share/test/schemas/weather.avsc
Removed:
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
avro/trunk/share/test/schemas/WordCount.avsc
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java
avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java
avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java
avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java
avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java
avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html
avro/trunk/lang/java/src/test/bin/test_tools.sh
avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
avro/trunk/lang/java/src/test/java/org/apache/avro/specific/TestSpecificData.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Jul 21 21:14:29 2010
@@ -32,6 +32,10 @@ Avro 1.4.0 (unreleased)
AVRO-405: Java: Add Netty-based RPC transceiver and server
implementation. (Harry Wang via cutting)
+ AVRO-580. Permit intermixing of generic and specific data.
+ SpecificDatumReader and SpecificDatumWriter will now use generic
+ types when no specific class is available. (cutting)
+
IMPROVEMENTS
AVRO-584. Update Histogram for Stats Plugin
(Patrick Wendell via philz)
@@ -69,6 +73,9 @@ Avro 1.4.0 (unreleased)
AVRO-596. Start Netty server eagerly in constructor.
(Patrick Linehan via cutting)
+ AVRO-581. Java: Update MapReduce APIs to use key/value pairs for
+ intermediate data. (cutting)
+
BUG FIXES
AVRO-502. Memory leak from parsing JSON schema.
Modified: avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/generic/IndexedRecord.java Wed Jul 21 21:14:29 2010
@@ -19,8 +19,12 @@ package org.apache.avro.generic;
/** A record implementation that permits field access by integer index.*/
public interface IndexedRecord extends GenericContainer {
- /** Set the value of a field given its position in the schema. */
+ /** Set the value of a field given its position in the schema.
+ * <p>This method is not meant to be called by user code, but only by {@link
+ * org.apache.avro.io.DatumReader} implementations. */
void put(int i, Object v);
- /** Return the value of a field given its position in the schema. */
+ /** Return the value of a field given its position in the schema.
+ * <p>This method is not meant to be called by user code, but only by {@link
+ * org.apache.avro.io.DatumWriter} implementations. */
Object get(int i);
}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroCollector.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configured;
+
+/** A collector for map and reduce output. */
+public abstract class AvroCollector<T> extends Configured {
+ public abstract void collect(T datum) throws IOException;
+}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Wed Jul 21 21:14:29 2010
@@ -20,7 +20,6 @@ package org.apache.avro.mapred;
import java.util.Collection;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.avro.Schema;
@@ -29,12 +28,9 @@ import org.apache.avro.Schema;
public class AvroJob {
private AvroJob() {} // no public ctor
- static final String API_GENERIC = "generic";
- static final String API_SPECIFIC = "specific";
-
- static final String INPUT_API = "avro.input.api";
- static final String OUTPUT_API = "avro.output.api";
- static final String MAP_OUTPUT_API = "avro.map.output.api";
+ static final String MAPPER = "avro.mapper";
+ static final String COMBINER = "avro.combiner";
+ static final String REDUCER = "avro.reducer";
/** The configuration key for a job's input schema. */
public static final String INPUT_SCHEMA = "avro.input.schema";
@@ -43,86 +39,86 @@ public class AvroJob {
/** The configuration key for a job's output schema. */
public static final String OUTPUT_SCHEMA = "avro.output.schema";
- /** Configure a job's map input to use Avro's generic API. */
- public static void setInputGeneric(JobConf job, Schema s) {
- job.set(INPUT_API, API_GENERIC);
- configureAvroInput(job, s);
- }
-
- /** Configure a job's map input to use Avro's specific API. */
- public static void setInputSpecific(JobConf job, Schema s) {
- job.set(INPUT_API, API_SPECIFIC);
- configureAvroInput(job, s);
- }
-
- private static void configureAvroInput(JobConf job, Schema s) {
+ /** Configure a job's map input schema. */
+ public static void setInputSchema(JobConf job, Schema s) {
job.set(INPUT_SCHEMA, s.toString());
- job.setInputFormat(AvroInputFormat.class);
- }
-
- /** Configure a job's map output key schema using Avro's generic API. */
- public static void setMapOutputGeneric(JobConf job, Schema s) {
- job.set(MAP_OUTPUT_API, API_GENERIC);
- setMapOutputSchema(job, s);
- configureAvroOutput(job);
- }
-
- /** Configure a job's map output key schema using Avro's specific API. */
- public static void setMapOutputSpecific(JobConf job, Schema s) {
- job.set(MAP_OUTPUT_API, API_SPECIFIC);
- setMapOutputSchema(job, s);
- configureAvroOutput(job);
- }
-
- /** Configure a job's output key schema using Avro's generic API. */
- public static void setOutputGeneric(JobConf job, Schema s) {
- job.set(OUTPUT_API, API_GENERIC);
- setOutputSchema(job, s);
- configureAvroOutput(job);
+ configureAvroJob(job);
}
- /** Configure a job's output key schema using Avro's specific API. */
- public static void setOutputSpecific(JobConf job, Schema s) {
- job.set(OUTPUT_API, API_SPECIFIC);
- setOutputSchema(job, s);
- configureAvroOutput(job);
+ /** Return a job's map input schema. */
+ public static Schema getInputSchema(Configuration job) {
+ return Schema.parse(job.get(INPUT_SCHEMA));
}
- /** Set a job's map output key schema. */
+ /** Configure a job's map output schema. The map output schema defaults to
+ * the output schema and need only be specified when it differs. Thus must
+ * be a {@link Pair} schema. */
public static void setMapOutputSchema(JobConf job, Schema s) {
job.set(MAP_OUTPUT_SCHEMA, s.toString());
+ configureAvroJob(job);
}
/** Return a job's map output key schema. */
public static Schema getMapOutputSchema(Configuration job) {
- return Schema.parse(job.get(AvroJob.MAP_OUTPUT_SCHEMA,
- job.get(AvroJob.OUTPUT_SCHEMA)));
+ return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA)));
}
- /** Set a job's output key schema. */
+ /** Configure a job's output schema. Unless this is a map-only job, this
+ * must be a {@link Pair} schema. */
public static void setOutputSchema(JobConf job, Schema s) {
job.set(OUTPUT_SCHEMA, s.toString());
+ configureAvroJob(job);
}
/** Return a job's output key schema. */
public static Schema getOutputSchema(Configuration job) {
- return Schema.parse(job.get(AvroJob.OUTPUT_SCHEMA));
+ return Schema.parse(job.get(OUTPUT_SCHEMA));
}
- private static void configureAvroOutput(JobConf job) {
+ private static void configureAvroJob(JobConf job) {
+ if (job.get("mapred.input.format.class") == null)
+ job.setInputFormat(AvroInputFormat.class);
+ if (job.get("mapred.output.format.class") == null)
+ job.setOutputFormat(AvroOutputFormat.class);
+
job.setOutputKeyClass(AvroWrapper.class);
job.setOutputKeyComparatorClass(AvroKeyComparator.class);
- job.setMapOutputValueClass(NullWritable.class);
- job.setOutputFormat(AvroOutputFormat.class);
+ job.setMapOutputKeyClass(AvroKey.class);
+ job.setMapOutputValueClass(AvroValue.class);
+
+
+ job.setMapperClass(HadoopMapper.class);
+ job.setReducerClass(HadoopReducer.class);
- // add AvroKeySerialization to io.serializations
+ // add AvroSerialization to io.serializations
Collection<String> serializations =
job.getStringCollection("io.serializations");
- if (!serializations.contains(AvroKeySerialization.class.getName())) {
- serializations.add(AvroKeySerialization.class.getName());
+ if (!serializations.contains(AvroSerialization.class.getName())) {
+ serializations.add(AvroSerialization.class.getName());
job.setStrings("io.serializations",
serializations.toArray(new String[0]));
}
}
+ /** Configure a job's mapper implementation. */
+ public static void setMapperClass(JobConf job,
+ Class<? extends AvroMapper> c) {
+ job.set(MAPPER, c.getName());
+ }
+
+ /** Configure a job's combiner implementation. */
+ public static void setCombinerClass(JobConf job,
+ Class<? extends AvroReducer> c) {
+ job.set(COMBINER, c.getName());
+ job.setCombinerClass(HadoopCombiner.class);
+ }
+
+ /** Configure a job's reducer implementation. */
+ public static void setReducerClass(JobConf job,
+ Class<? extends AvroReducer> c) {
+ job.set(REDUCER, c.getName());
+ }
+
+
+
}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKey.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+/** The wrapper of keys for jobs configured with {@link AvroJob} . */
+public class AvroKey<T> extends AvroWrapper<T> {
+ /** Wrap a key. */
+ public AvroKey(T datum) { super(datum); }
+}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java Wed Jul 21 21:14:29 2010
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryData;
-import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificData;
/** The {@link RawComparator} used by jobs configured with {@link AvroJob}. */
@@ -32,29 +31,20 @@ public class AvroKeyComparator<T>
extends Configured implements RawComparator<AvroWrapper<T>> {
private Schema schema;
- private GenericData model;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
- if (conf != null) {
- schema = AvroJob.getMapOutputSchema(conf);
- String api = getConf().get(AvroJob.MAP_OUTPUT_API,
- getConf().get(AvroJob.OUTPUT_API));
- model = AvroJob.API_SPECIFIC.equals(api)
- ? SpecificData.get()
- : GenericData.get();
- }
+ if (conf != null)
+ schema = Pair.getKeySchema(AvroJob.getMapOutputSchema(conf));
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- int diff = BinaryData.compare(b1, s1, b2, s2, schema);
- return diff == 0 ? -1 : diff;
+ return BinaryData.compare(b1, s1, b2, s2, schema);
}
public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
- int diff = model.compare(x.datum(), y.datum(), schema);
- return diff == 0 ? -1 : diff;
+ return SpecificData.get().compare(x.datum(), y.datum(), schema);
}
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroMapper.java Wed Jul 21 21:14:29 2010
@@ -20,47 +20,22 @@ package org.apache.avro.mapred;
import java.io.IOException;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.conf.Configured;
-/** A {@link Mapper} for Avro data.
+/** A mapper for Avro data.
*
- * <p>Applications should subclass this class and pass their subclass to {@link
- * org.apache.hadoop.mapred.JobConf#setMapperClass(Class)}. Subclasses must
- * override {@link #map} and may call {@link #collect} to generate output.
+ * <p>Applications subclass this class and pass their subclass to {@link
+ * AvroJob#setMapperClass}, overriding {@link #map}.
*/
-public abstract class AvroMapper<IN,OUT> extends MapReduceBase
- implements Mapper<AvroWrapper<IN>, NullWritable,
- AvroWrapper<OUT>, NullWritable> {
-
- private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
- private Reporter reporter;
-
- public void map(AvroWrapper<IN> wrapper, NullWritable value,
- OutputCollector<AvroWrapper<OUT>, NullWritable> output,
- Reporter reporter) throws IOException {
- if (this.out == null) {
- this.out = output;
- this.reporter = reporter;
- }
- map(wrapper.datum());
- }
-
- /** Return the {@link Reporter} to permit status updates. */
- public Reporter getReporter() { return reporter; }
-
- /** Called with each map input datum. */
- public abstract void map(IN datum) throws IOException;
+public class AvroMapper<IN,OUT> extends Configured {
- private final AvroWrapper<OUT> outputWrapper = new AvroWrapper<OUT>(null);
-
- /** Call with each map output datum. */
- public void collect(OUT datum) throws IOException {
- outputWrapper.datum(datum);
- out.collect(outputWrapper, NullWritable.get());
+ /** Called with each map input datum. By default, collects inputs. */
+ @SuppressWarnings("unchecked")
+ public void map(IN datum, AvroCollector<OUT> collector, Reporter reporter)
+ throws IOException {
+ collector.collect((OUT)datum);
}
+
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java Wed Jul 21 21:14:29 2010
@@ -30,8 +30,6 @@ import org.apache.hadoop.mapred.RecordWr
import org.apache.hadoop.util.Progressable;
import org.apache.avro.Schema;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.CodecFactory;
@@ -60,14 +58,13 @@ public class AvroOutputFormat <T>
String name, Progressable prog)
throws IOException {
- Schema schema = AvroJob.getOutputSchema(job);
+ boolean isMapOnly = job.getNumReduceTasks() == 0;
+ Schema schema = isMapOnly
+ ? AvroJob.getMapOutputSchema(job)
+ : AvroJob.getOutputSchema(job);
- DatumWriter<T> datumWriter =
- AvroJob.API_SPECIFIC.equals(job.get(AvroJob.OUTPUT_API))
- ? new SpecificDatumWriter<T>()
- : new GenericDatumWriter<T>();
-
- final DataFileWriter<T> writer = new DataFileWriter<T>(datumWriter);
+ final DataFileWriter<T> writer =
+ new DataFileWriter<T>(new SpecificDatumWriter<T>());
if (FileOutputFormat.getCompressOutput(job)) {
int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java Wed Jul 21 21:14:29 2010
@@ -25,9 +25,8 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
/** An {@link RecordReader} for Avro data files. */
@@ -42,12 +41,9 @@ public class AvroRecordReader<T>
public AvroRecordReader(JobConf job, FileSplit split)
throws IOException {
this.in = new FsInput(split.getPath(), job);
- DatumReader<T> datumReader =
- AvroJob.API_SPECIFIC.equals(job.get(AvroJob.INPUT_API))
- ? new SpecificDatumReader<T>()
- : new GenericDatumReader<T>();
- this.reader = new DataFileReader<T>(in, datumReader);
+ Schema s = AvroJob.getInputSchema(job);
+ this.reader = new DataFileReader<T>(in, new SpecificDatumReader<T>(s));
reader.sync(split.getStart()); // sync to start
this.start = in.tell();
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroReducer.java Wed Jul 21 21:14:29 2010
@@ -19,54 +19,33 @@
package org.apache.avro.mapred;
import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.conf.Configured;
-/** A {@link Reducer} for Avro data.
+/** A reducer for Avro data.
*
* <p>Applications should subclass this class and pass their subclass to {@link
- * org.apache.hadoop.mapred.JobConf#setReducerClass(Class)} and perhaps {@link
- * org.apache.hadoop.mapred.JobConf#setCombinerClass(Class)} Subclasses must
- * override {@link #reduce} and may call {@link #collect} to generate output.
- *
- * <p>Note that reducers here are not passed an iterator of all matching
- * values. Rather, the reducer is called with every value. If values are to
- * be combined then the reducer must maintain state accordingly. The final
- * value may be flushed by overriding {@link #close} to call {@link #collect}.
+ * AvroJob#setReducerClass} and perhaps {@link AvroJob#setCombinerClass}.
+ * Subclasses override {@link #reduce}.
*/
-public abstract class AvroReducer<IN,OUT> extends MapReduceBase
- implements Reducer<AvroWrapper<IN>, NullWritable,
- AvroWrapper<OUT>, NullWritable> {
-
- private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
- private Reporter reporter;
- private final AvroWrapper<OUT> outputWrapper = new AvroWrapper<OUT>(null);
+public class AvroReducer<K,V,OUT> extends Configured {
+
+ private Pair<K,V> outputPair;
- public void reduce(AvroWrapper<IN> wrapper, Iterator<NullWritable> ignore,
- OutputCollector<AvroWrapper<OUT>,NullWritable> output,
+ /** Called with all map output values with a given key. By default, pairs
+ * key with each value, collecting {@link Pair} instances. */
+ @SuppressWarnings("unchecked")
+ public void reduce(K key, Iterable<V> values,
+ AvroCollector<OUT> collector,
Reporter reporter) throws IOException {
- if (this.out == null) {
- this.out = output;
- this.reporter = reporter;
+ if (outputPair == null)
+ outputPair = new Pair<K,V>(AvroJob.getOutputSchema(getConf()));
+ for (V value : values) {
+ outputPair.set(key, value);
+ collector.collect((OUT)outputPair);
}
- reduce(wrapper.datum());
- }
-
- /** Return the {@link Reporter} to permit status updates. */
- public Reporter getReporter() { return reporter; }
-
- /** Called with each reduce input datum for this partition, in order. */
- public abstract void reduce(IN datum) throws IOException;
-
- /** Call with each final output datum. */
- public void collect(OUT datum) throws IOException {
- outputWrapper.datum(datum);
- out.collect(outputWrapper, NullWritable.get());
}
+
}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroSerialization.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+/** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
+public class AvroSerialization<T> extends Configured
+ implements Serialization<AvroWrapper<T>> {
+
+ public boolean accept(Class<?> c) {
+ return AvroWrapper.class.isAssignableFrom(c);
+ }
+
+ /** Returns the specified map output deserializer. Defaults to the final
+ * output deserializer if no map output schema was specified. */
+ public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
+ // We need not rely on mapred.task.is.map here to determine whether map
+ // output or final output is desired, since the mapreduce framework never
+ // creates a deserializer for final output, only for map output.
+ boolean isKey = AvroKey.class.isAssignableFrom(c);
+ Schema schema = isKey
+ ? Pair.getKeySchema(AvroJob.getMapOutputSchema(getConf()))
+ : Pair.getValueSchema(AvroJob.getMapOutputSchema(getConf()));
+ return new AvroWrapperDeserializer(new SpecificDatumReader<T>(schema),
+ isKey);
+ }
+
+ private static final DecoderFactory FACTORY = new DecoderFactory();
+ static { FACTORY.configureDirectDecoder(true); }
+
+ private class AvroWrapperDeserializer
+ implements Deserializer<AvroWrapper<T>> {
+
+ private DatumReader<T> reader;
+ private BinaryDecoder decoder;
+ private boolean isKey;
+
+ public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) {
+ this.reader = reader;
+ this.isKey = isKey;
+ }
+
+ public void open(InputStream in) {
+ this.decoder = FACTORY.createBinaryDecoder(in, decoder);
+ }
+
+ public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper)
+ throws IOException {
+ T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
+ if (wrapper == null) {
+ wrapper = isKey? new AvroKey<T>(datum) : new AvroValue<T>(datum);
+ } else {
+ wrapper.datum(datum);
+ }
+ return wrapper;
+ }
+
+ public void close() throws IOException {
+ decoder.inputStream().close();
+ }
+
+ }
+
+ /** Returns the specified output serializer. */
+ public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
+ // Here we must rely on mapred.task.is.map to tell whether the map output
+ // or final output is needed.
+ boolean isMap = getConf().getBoolean("mapred.task.is.map", false);
+ Schema schema = !isMap
+ ? AvroJob.getOutputSchema(getConf())
+ : (AvroKey.class.isAssignableFrom(c)
+ ? Pair.getKeySchema(AvroJob.getMapOutputSchema(getConf()))
+ : Pair.getValueSchema(AvroJob.getMapOutputSchema(getConf())));
+ return new AvroWrapperSerializer(new SpecificDatumWriter<T>(schema));
+ }
+
+ private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {
+
+ private DatumWriter<T> writer;
+ private OutputStream out;
+ private BinaryEncoder encoder;
+
+ public AvroWrapperSerializer(DatumWriter<T> writer) {
+ this.writer = writer;
+ }
+
+ public void open(OutputStream out) {
+ this.out = out;
+ this.encoder = new BinaryEncoder(out);
+ }
+
+ public void serialize(AvroWrapper<T> wrapper) throws IOException {
+ writer.write(wrapper.datum(), encoder);
+ }
+
+ public void close() throws IOException {
+ out.close();
+ }
+
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroValue.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+/** The wrapper of values for jobs configured with {@link AvroJob} . */
+public class AvroValue<T> extends AvroWrapper<T> {
+ /** Wrap a value. */
+ public AvroValue(T datum) { super(datum); }
+}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroWrapper.java Wed Jul 21 21:14:29 2010
@@ -18,11 +18,11 @@
package org.apache.avro.mapred;
-/** The wrapper of values for jobs configured with {@link AvroJob} . */
+/** The wrapper of data for jobs configured with {@link AvroJob} . */
public class AvroWrapper<T> {
private T datum;
- /** Wrap a value datum. */
+ /** Wrap a datum. */
public AvroWrapper(T datum) { this.datum = datum; }
/** Return the wrapped datum. */
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopCombiner.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** Bridge between a {@link org.apache.hadoop.mapred.Reducer} and an {@link
+ * AvroReducer} used when combining. When combining, map output pairs must be
+ * split before they're collected. */
+class HadoopCombiner<K,V>
+ extends HadoopReducerBase<K,V,Pair<K,V>,AvroKey<K>,AvroValue<V>> {
+
+ @Override @SuppressWarnings("unchecked")
+ protected AvroReducer<K,V,Pair<K,V>> getReducer(JobConf conf) {
+ return ReflectionUtils.newInstance
+ (conf.getClass(AvroJob.COMBINER, AvroReducer.class, AvroReducer.class),
+ conf);
+ }
+
+ private class PairCollector extends AvroCollector<Pair<K,V>> {
+ private final AvroKey<K> keyWrapper = new AvroKey<K>(null);
+ private final AvroValue<V> valueWrapper = new AvroValue<V>(null);
+ private OutputCollector<AvroKey<K>,AvroValue<V>> collector;
+
+ public PairCollector(OutputCollector<AvroKey<K>,AvroValue<V>> collector) {
+ this.collector = collector;
+ }
+
+ public void collect(Pair<K,V> datum) throws IOException {
+ keyWrapper.datum(datum.key()); // split the Pair
+ valueWrapper.datum(datum.value());
+ collector.collect(keyWrapper, valueWrapper);
+ }
+ }
+
+ @Override
+ protected AvroCollector<Pair<K,V>>
+ getCollector(OutputCollector<AvroKey<K>,AvroValue<V>> collector) {
+ return new PairCollector(collector);
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopMapper.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** Bridge between a {@link org.apache.hadoop.mapred.Mapper} and an {@link
+ * AvroMapper}. Outputs are written directly when a job is map-only, but are
+ * otherwise assumed to be pairs that are split. */
+class HadoopMapper<IN,OUT,K,V,KO,VO> extends MapReduceBase
+ implements Mapper<AvroWrapper<IN>, NullWritable, KO, VO> {
+
+ private AvroMapper<IN,OUT> mapper;
+ private MapCollector out;
+ private boolean isMapOnly;
+
+ @Override @SuppressWarnings("unchecked")
+ public void configure(JobConf conf) {
+ this.mapper =
+ ReflectionUtils.newInstance
+ (conf.getClass(AvroJob.MAPPER, AvroMapper.class, AvroMapper.class),
+ conf);
+ this.isMapOnly = conf.getNumReduceTasks() == 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ private class MapCollector extends AvroCollector<OUT> {
+ private final AvroWrapper<OUT> wrapper = new AvroWrapper<OUT>(null);
+ private final AvroKey<K> keyWrapper = new AvroKey(null);
+ private final AvroValue<V> valueWrapper = new AvroValue(null);
+ private OutputCollector<KO,VO> collector;
+
+ public MapCollector(OutputCollector<KO,VO> collector) {
+ this.collector = collector;
+ }
+
+ public void collect(OUT datum) throws IOException {
+ if (isMapOnly) {
+ wrapper.datum(datum);
+ collector.collect((KO)wrapper, (VO)NullWritable.get());
+ } else { // split a pair
+ Pair<K,V> pair = (Pair<K,V>)datum;
+ keyWrapper.datum(pair.key());
+ valueWrapper.datum(pair.value());
+ collector.collect((KO)keyWrapper, (VO)valueWrapper);
+ }
+ }
+ }
+
+ @Override
+ public void map(AvroWrapper<IN> wrapper, NullWritable value,
+ OutputCollector<KO,VO> collector,
+ Reporter reporter) throws IOException {
+ if (this.out == null)
+ this.out = new MapCollector(collector);
+ mapper.map(wrapper.datum(), out, reporter);
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducer.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** Bridge between a {@link org.apache.hadoop.mapred.Reducer} and an {@link
+ * AvroReducer}. */
+class HadoopReducer<K,V,OUT>
+ extends HadoopReducerBase<K,V, OUT, AvroWrapper<OUT>, NullWritable> {
+
+ @Override @SuppressWarnings("unchecked")
+ protected AvroReducer<K,V,OUT> getReducer(JobConf conf) {
+ return ReflectionUtils.newInstance
+ (conf.getClass(AvroJob.REDUCER, AvroReducer.class, AvroReducer.class),
+ conf);
+ }
+
+ private class ReduceCollector extends AvroCollector<OUT> {
+ private final AvroWrapper<OUT> wrapper = new AvroWrapper<OUT>(null);
+ private OutputCollector<AvroWrapper<OUT>, NullWritable> out;
+
+ public ReduceCollector(OutputCollector<AvroWrapper<OUT>,NullWritable> out) {
+ this.out = out;
+ }
+
+ public void collect(OUT datum) throws IOException {
+ wrapper.datum(datum);
+ out.collect(wrapper, NullWritable.get());
+ }
+ }
+
+ @Override
+ protected AvroCollector<OUT>
+ getCollector(OutputCollector<AvroWrapper<OUT>, NullWritable> collector) {
+ return new ReduceCollector(collector);
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/HadoopReducerBase.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Reducer;
+
+abstract class HadoopReducerBase<K,V,OUT,KO,VO> extends MapReduceBase
+ implements Reducer<AvroKey<K>, AvroValue<V>, KO, VO> {
+
+ private AvroReducer<K,V,OUT> reducer;
+ private AvroCollector<OUT> collector;
+
+ protected abstract AvroReducer<K,V,OUT> getReducer(JobConf conf);
+ protected abstract AvroCollector<OUT> getCollector(OutputCollector<KO,VO> c);
+
+ @Override
+ public void configure(JobConf conf) {
+ this.reducer = getReducer(conf);
+ }
+
+ class ReduceIterable implements Iterable<V>, Iterator<V> {
+ private Iterator<AvroValue<V>> values;
+ public boolean hasNext() { return values.hasNext(); }
+ public V next() { return values.next().datum(); }
+ public void remove() { throw new UnsupportedOperationException(); }
+ public Iterator<V> iterator() { return this; }
+ }
+ private ReduceIterable reduceIterable = new ReduceIterable();
+
+ @Override
+ public final void reduce(AvroKey<K> key, Iterator<AvroValue<V>> values,
+ OutputCollector<KO, VO> out,
+ Reporter reporter) throws IOException {
+ if (this.collector == null)
+ this.collector = getCollector(out);
+ reduceIterable.values = values;
+ reducer.reduce(key.datum(), reduceIterable, collector, reporter);
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/Pair.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificDatumReader.SchemaConstructable;
+import org.apache.avro.util.Utf8;
+
+/** A key/value pair. */
+public class Pair<K,V>
+ implements IndexedRecord, Comparable<Pair>, SchemaConstructable {
+
+ private static final String PAIR = Pair.class.getName();
+ private static final String KEY = "key";
+ private static final String VALUE = "value";
+
+ private Schema schema;
+ private K key;
+ private V value;
+
+ public Pair(Schema schema) {
+ checkIsPairSchema(schema);
+ this.schema = schema;
+ }
+
+ public Pair(K key, Schema keySchema, V value, Schema valueSchema) {
+ this.schema = getPairSchema(keySchema, valueSchema);
+ this.key = key;
+ this.value = value;
+ }
+
+ private static void checkIsPairSchema(Schema schema) {
+ if (!PAIR.equals(schema.getFullName()))
+ throw new IllegalArgumentException("Not a Pair schema: "+schema);
+ }
+
+ /** Return a pair's key schema. */
+ public static Schema getKeySchema(Schema pair) {
+ checkIsPairSchema(pair);
+ return pair.getField(KEY).schema();
+ }
+
+ /** Return a pair's value schema. */
+ public static Schema getValueSchema(Schema pair) {
+ checkIsPairSchema(pair);
+ return pair.getField(VALUE).schema();
+ }
+
+ private static final Map<Schema,Map<Schema,Schema>> SCHEMA_CACHE =
+ new WeakHashMap<Schema,Map<Schema,Schema>>();
+
+ /** Get a pair schema. */
+ public static Schema getPairSchema(Schema key, Schema value) {
+ Map<Schema,Schema> valueSchemas;
+ synchronized (SCHEMA_CACHE) {
+ valueSchemas = SCHEMA_CACHE.get(key);
+ if (valueSchemas == null) {
+ valueSchemas = new WeakHashMap<Schema,Schema>();
+ SCHEMA_CACHE.put(key, valueSchemas);
+ }
+ Schema result;
+ result = valueSchemas.get(value);
+ if (result == null) {
+ result = makePairSchema(key, value);
+ valueSchemas.put(value, result);
+ }
+ return result;
+ }
+ }
+
+ private static Schema makePairSchema(Schema key, Schema value) {
+ Schema pair = Schema.createRecord(PAIR, null, null, false);
+ List<Field> fields = new ArrayList<Field>();
+ fields.add(new Field(KEY, key, "", null));
+ fields.add(new Field(VALUE, value, "", null, Field.Order.IGNORE));
+ pair.setFields(fields);
+ return pair;
+ }
+
+ @Override public Schema getSchema() { return schema; }
+
+ /** Get the key. */
+ public K key() { return key; }
+ /** Set the key. */
+ public void key(K key) { this.key = key; }
+
+ /** Get the value. */
+ public V value() { return value; }
+ /** Set the value. */
+ public void value(V value) { this.value = value; }
+
+ /** Set both the key and value. */
+ public void set(K key, V value) { this.key = key; this.value = value; }
+
+ @Override public boolean equals(Object o) {
+ if (o == this) return true; // identical object
+ if (!(o instanceof Pair)) return false; // not a pair
+ Pair that = (Pair)o;
+ if (this.schema != that.schema)
+ return false; // not the same schema
+ return this.compareTo(that) == 0;
+ }
+ @Override public int hashCode() {
+ return GenericData.get().hashCode(this, schema);
+ }
+ @Override public int compareTo(Pair that) {
+ return GenericData.get().compare(this, that, schema);
+ }
+ @Override public String toString() {
+ return GenericData.get().toString(this);
+ }
+
+ @Override
+ public Object get(int i) {
+ switch (i) {
+ case 0: return key;
+ case 1: return value;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index: "+i);
+ }
+ }
+
+ @Override @SuppressWarnings("unchecked")
+ public void put(int i, Object o) {
+ switch (i) {
+ case 0: this.key = (K)o; break;
+ case 1: this.value = (V)o; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index: "+i);
+ }
+ }
+
+ private static final Schema STRING_SCHEMA = Schema.create(Type.STRING);
+ private static final Schema BYTES_SCHEMA = Schema.create(Type.BYTES);
+ private static final Schema INT_SCHEMA = Schema.create(Type.INT);
+ private static final Schema LONG_SCHEMA = Schema.create(Type.LONG);
+ private static final Schema FLOAT_SCHEMA = Schema.create(Type.FLOAT);
+ private static final Schema DOUBLE_SCHEMA = Schema.create(Type.DOUBLE);
+ private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
+
+ @SuppressWarnings("unchecked")
+ public Pair(GenericContainer key, GenericContainer value) {
+ this((K)key, key.getSchema(), (V)value, value.getSchema());
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(GenericContainer key, Utf8 value) {
+ this((K)key, key.getSchema(), (V)value, STRING_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(GenericContainer key, ByteBuffer value) {
+ this((K)key, key.getSchema(), (V)value, BYTES_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(GenericContainer key, Integer value) {
+ this((K)key, key.getSchema(), (V)value, INT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(GenericContainer key, Long value) {
+ this((K)key, key.getSchema(), (V)value, LONG_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(GenericContainer key, Float value) {
+ this((K)key, key.getSchema(), (V)value, FLOAT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(GenericContainer key, Double value) {
+ this((K)key, key.getSchema(), (V)value, DOUBLE_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(GenericContainer key, Void value) {
+ this((K)key, key.getSchema(), (V)value, NULL_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Utf8 key, GenericContainer value) {
+ this((K)key, STRING_SCHEMA, (V)value, value.getSchema());
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Utf8 key, Utf8 value) {
+ this((K)key, STRING_SCHEMA, (V)value, STRING_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Utf8 key, ByteBuffer value) {
+ this((K)key, STRING_SCHEMA, (V)value, BYTES_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Utf8 key, Integer value) {
+ this((K)key, STRING_SCHEMA, (V)value, INT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Utf8 key, Long value) {
+ this((K)key, STRING_SCHEMA, (V)value, LONG_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Utf8 key, Float value) {
+ this((K)key, STRING_SCHEMA, (V)value, FLOAT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Utf8 key, Double value) {
+ this((K)key, STRING_SCHEMA, (V)value, DOUBLE_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Utf8 key, Void value) {
+ this((K)key, STRING_SCHEMA, (V)value, NULL_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(ByteBuffer key, GenericContainer value) {
+ this((K)key, BYTES_SCHEMA, (V)value, value.getSchema());
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(ByteBuffer key, Utf8 value) {
+ this((K)key, BYTES_SCHEMA, (V)value, STRING_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(ByteBuffer key, ByteBuffer value) {
+ this((K)key, BYTES_SCHEMA, (V)value, BYTES_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(ByteBuffer key, Integer value) {
+ this((K)key, BYTES_SCHEMA, (V)value, INT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(ByteBuffer key, Long value) {
+ this((K)key, BYTES_SCHEMA, (V)value, LONG_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(ByteBuffer key, Float value) {
+ this((K)key, BYTES_SCHEMA, (V)value, FLOAT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(ByteBuffer key, Double value) {
+ this((K)key, BYTES_SCHEMA, (V)value, DOUBLE_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(ByteBuffer key, Void value) {
+ this((K)key, BYTES_SCHEMA, (V)value, NULL_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Integer key, GenericContainer value) {
+ this((K)key, INT_SCHEMA, (V)value, value.getSchema());
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Integer key, Utf8 value) {
+ this((K)key, INT_SCHEMA, (V)value, STRING_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Integer key, ByteBuffer value) {
+ this((K)key, INT_SCHEMA, (V)value, BYTES_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Integer key, Integer value) {
+ this((K)key, INT_SCHEMA, (V)value, INT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Integer key, Long value) {
+ this((K)key, INT_SCHEMA, (V)value, LONG_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Integer key, Float value) {
+ this((K)key, INT_SCHEMA, (V)value, FLOAT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Integer key, Double value) {
+ this((K)key, INT_SCHEMA, (V)value, DOUBLE_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Integer key, Void value) {
+ this((K)key, INT_SCHEMA, (V)value, NULL_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Long key, GenericContainer value) {
+ this((K)key, LONG_SCHEMA, (V)value, value.getSchema());
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Long key, Utf8 value) {
+ this((K)key, LONG_SCHEMA, (V)value, STRING_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Long key, ByteBuffer value) {
+ this((K)key, LONG_SCHEMA, (V)value, BYTES_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Long key, Integer value) {
+ this((K)key, LONG_SCHEMA, (V)value, INT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Long key, Long value) {
+ this((K)key, LONG_SCHEMA, (V)value, LONG_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Long key, Float value) {
+ this((K)key, LONG_SCHEMA, (V)value, FLOAT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Long key, Double value) {
+ this((K)key, LONG_SCHEMA, (V)value, DOUBLE_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Long key, Void value) {
+ this((K)key, LONG_SCHEMA, (V)value, NULL_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Float key, GenericContainer value) {
+ this((K)key, FLOAT_SCHEMA, (V)value, value.getSchema());
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Float key, Utf8 value) {
+ this((K)key, FLOAT_SCHEMA, (V)value, STRING_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Float key, ByteBuffer value) {
+ this((K)key, FLOAT_SCHEMA, (V)value, BYTES_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Float key, Integer value) {
+ this((K)key, FLOAT_SCHEMA, (V)value, INT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Float key, Long value) {
+ this((K)key, FLOAT_SCHEMA, (V)value, LONG_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Float key, Float value) {
+ this((K)key, FLOAT_SCHEMA, (V)value, FLOAT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Float key, Double value) {
+ this((K)key, FLOAT_SCHEMA, (V)value, DOUBLE_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Float key, Void value) {
+ this((K)key, FLOAT_SCHEMA, (V)value, NULL_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Double key, GenericContainer value) {
+ this((K)key, DOUBLE_SCHEMA, (V)value, value.getSchema());
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Double key, Utf8 value) {
+ this((K)key, DOUBLE_SCHEMA, (V)value, STRING_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Double key, ByteBuffer value) {
+ this((K)key, DOUBLE_SCHEMA, (V)value, BYTES_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Double key, Integer value) {
+ this((K)key, DOUBLE_SCHEMA, (V)value, INT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Double key, Long value) {
+ this((K)key, DOUBLE_SCHEMA, (V)value, LONG_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Double key, Float value) {
+ this((K)key, DOUBLE_SCHEMA, (V)value, FLOAT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Double key, Double value) {
+ this((K)key, DOUBLE_SCHEMA, (V)value, DOUBLE_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Double key, Void value) {
+ this((K)key, DOUBLE_SCHEMA, (V)value, NULL_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Void key, GenericContainer value) {
+ this((K)key, NULL_SCHEMA, (V)value, value.getSchema());
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Void key, Utf8 value) {
+ this((K)key, NULL_SCHEMA, (V)value, STRING_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Void key, ByteBuffer value) {
+ this((K)key, NULL_SCHEMA, (V)value, BYTES_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Void key, Integer value) {
+ this((K)key, NULL_SCHEMA, (V)value, INT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Void key, Long value) {
+ this((K)key, NULL_SCHEMA, (V)value, LONG_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Void key, Float value) {
+ this((K)key, NULL_SCHEMA, (V)value, FLOAT_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Void key, Double value) {
+ this((K)key, NULL_SCHEMA, (V)value, DOUBLE_SCHEMA);
+ }
+ @SuppressWarnings("unchecked")
+ public Pair(Void key, Void value) {
+ this((K)key, NULL_SCHEMA, (V)value, NULL_SCHEMA);
+ }
+
+ // private static final String[][] TABLE = new String[][] {
+ // {"GenericContainer", "{0}.getSchema()"},
+ // {"Utf8", "STRING_SCHEMA"},
+ // {"ByteBuffer", "BYTES_SCHEMA"},
+ // {"Integer", "INT_SCHEMA"},
+ // {"Long", "LONG_SCHEMA"},
+ // {"Float", "FLOAT_SCHEMA"},
+ // {"Double", "DOUBLE_SCHEMA"},
+ // {"Void", "NULL_SCHEMA"},
+ // };
+
+ // private static String f(String pattern, String value) {
+ // return java.text.MessageFormat.format(pattern, value);
+ // }
+
+ // public static void main(String... args) throws Exception {
+ // StringBuffer b = new StringBuffer();
+ // for (String[] k : TABLE) {
+ // for (String[] v : TABLE) {
+ // b.append("@SuppressWarnings(\"unchecked\")\n");
+ // b.append("public Pair("+k[0]+" key, "+v[0]+" value) {\n");
+ // b.append(" this((K)key, "+f(k[1],"key")
+ // +", (V)value, "+f(v[1],"value")+");\n");
+ // b.append("}\n");
+ // }
+ // }
+ // System.out.println(b);
+ // }
+
+
+}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html Wed Jul 21 21:14:29 2010
@@ -28,19 +28,16 @@ Avro data, with map and reduce functions
<p>To use this for jobs whose input and output are Avro data files:
<ul>
+ <li>Call {@link org.apache.avro.mapred.AvroJob#setInputSchema} and
+ {@link org.apache.avro.mapred.AvroJob#setOutputSchema} with your
+ job's input and output schemas.</li>
<li>Subclass {@link org.apache.avro.mapred.AvroMapper} and specify
- this as your job's mapper.</li>
+ this as your job's mapper with {@link
+ org.apache.avro.mapred.AvroJob#setMapperClass}</li>
<li>Subclass {@link org.apache.avro.mapred.AvroReducer} and specify
- this as your job's reducer and perhaps combiner.</li>
- <li>Depending on whether your mapper uses Avro's specific or
- generic API for inputs, call one of {@link
- org.apache.avro.mapred.AvroJob#setInputSpecific} or {@link
- org.apache.avro.mapred.AvroJob#setInputGeneric} with your input schema.</li>
- <li>Depending on whether your job uses Avro's specific or generic
- API for outputs, call one of {@link
- org.apache.avro.mapred.AvroJob#setOutputSpecific} or {@link
- org.apache.avro.mapred.AvroJob#setOutputGeneric} with your output
- schema.</li>
+ this as your job's reducer and perhaps combiner, with {@link
+ org.apache.avro.mapred.AvroJob#setReducerClass} and {@link
+ org.apache.avro.mapred.AvroJob#setCombinerClass}</li>
<li>Specify input files with {@link org.apache.hadoop.mapred.FileInputFormat#setInputPaths}</li>
<li>Specify an output directory with {@link
org.apache.hadoop.mapred.FileOutputFormat#setOutputPath}</li>
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java Wed Jul 21 21:14:29 2010
@@ -137,9 +137,11 @@ public class TetherJob extends Configure
FileInputFormat.addInputPaths(job, in.value(opts));
FileOutputFormat.setOutputPath(job, out.value(opts));
TetherJob.setExecutable(job, exec.value(opts));
- AvroJob.setOutputSchema(job, Schema.parse(outSchema.value(opts)));
+ job.set(AvroJob.OUTPUT_SCHEMA,
+ Schema.parse(outSchema.value(opts)).toString());
if (opts.hasArgument(mapOutSchema))
- AvroJob.setMapOutputSchema(job, Schema.parse(mapOutSchema.value(opts)));
+ job.set(AvroJob.MAP_OUTPUT_SCHEMA,
+ Schema.parse(mapOutSchema.value(opts)).toString());
if (opts.hasArgument(reduces))
job.setNumReduceTasks(reduces.value(opts));
} catch (Exception e) {
Modified: avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/reflect/ReflectDatumReader.java Wed Jul 21 21:14:29 2010
@@ -74,7 +74,7 @@ public class ReflectDatumReader<T> exten
}
if (collectionClass.isAssignableFrom(ArrayList.class))
return new ArrayList();
- return newInstance(collectionClass);
+ return newInstance(collectionClass, schema);
}
Class elementClass = ReflectData.getClassProp(schema, ReflectData.ELEMENT_PROP);
if (elementClass == null)
Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificData.java Wed Jul 21 21:14:29 2010
@@ -72,9 +72,10 @@ public class SpecificData extends Generi
private Map<String,Class> classCache = new ConcurrentHashMap<String,Class>();
+ private static final Class NO_CLASS = new Object(){}.getClass();
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
- /** Return the class that implements a schema. */
+ /** Return the class that implements a schema, or null if none exists. */
public Class getClass(Schema schema) {
switch (schema.getType()) {
case FIXED:
@@ -85,12 +86,12 @@ public class SpecificData extends Generi
if (c == null) {
try {
c = Class.forName(getClassName(schema));
- classCache.put(name, c);
} catch (ClassNotFoundException e) {
- throw new AvroRuntimeException(e);
+ c = NO_CLASS;
}
+ classCache.put(name, c);
}
- return c;
+ return c == NO_CLASS ? null : c;
case ARRAY: return GenericArray.class;
case MAP: return Map.class;
case UNION:
@@ -203,14 +204,11 @@ public class SpecificData extends Generi
public int compare(Object o1, Object o2, Schema s) {
switch (s.getType()) {
case ENUM:
- return ((Enum)o1).ordinal() - ((Enum)o2).ordinal();
+ if (!(o1 instanceof String)) // not generic
+ return ((Enum)o1).ordinal() - ((Enum)o2).ordinal();
default:
return super.compare(o1, o2, s);
}
}
}
-
-
-
-
Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumReader.java Wed Jul 21 21:14:29 2010
@@ -39,46 +39,51 @@ public class SpecificDatumReader<T> exte
@Override
protected Object newRecord(Object old, Schema schema) {
Class c = SpecificData.get().getClass(schema);
- return (c.isInstance(old) ? old : newInstance(c));
- }
-
- @Override
- protected void setField(Object record, String name, int position, Object o) {
- ((SpecificRecord)record).put(position, o);
- }
- @Override
- protected Object getField(Object record, String name, int position) {
- return ((SpecificRecord)record).get(position);
+ if (c == null) return super.newRecord(old, schema); // punt to generic
+ return (c.isInstance(old) ? old : newInstance(c, schema));
}
@Override
@SuppressWarnings("unchecked")
protected Object createEnum(String symbol, Schema schema) {
- return Enum.valueOf(SpecificData.get().getClass(schema), symbol);
+ Class c = SpecificData.get().getClass(schema);
+ if (c == null) return super.createEnum(symbol, schema); // punt to generic
+ return Enum.valueOf(c, symbol);
}
@Override
protected Object createFixed(Object old, Schema schema) {
Class c = SpecificData.get().getClass(schema);
- return c.isInstance(old) ? old : newInstance(c);
+ if (c == null) return super.createFixed(old, schema); // punt to generic
+ return c.isInstance(old) ? old : newInstance(c, schema);
}
- private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
+ private static final Class<?>[] NO_ARG = new Class[]{};
+ private static final Class<?>[] SCHEMA_ARG = new Class[]{Schema.class};
private static final Map<Class,Constructor> CTOR_CACHE =
new ConcurrentHashMap<Class,Constructor>();
- /** Create an instance of a class. */
+ /** Tag interface that indicates that a class has a one-argument constructor
+ * that accepts a Schema.
+ * @see SpecificDatumReader#newInstance
+ */
+ public interface SchemaConstructable {}
+
+ /** Create an instance of a class. If the class implements {@link
+ * SchemaConstructable}, call a constructor with a {@link
+ * org.apache.avro.Schema} parameter, otherwise use a no-arg constructor. */
@SuppressWarnings("unchecked")
- protected static Object newInstance(Class c) {
+ protected static Object newInstance(Class c, Schema s) {
+ boolean useSchema = SchemaConstructable.class.isAssignableFrom(c);
Object result;
try {
Constructor meth = (Constructor)CTOR_CACHE.get(c);
if (meth == null) {
- meth = c.getDeclaredConstructor(EMPTY_ARRAY);
+ meth = c.getDeclaredConstructor(useSchema ? SCHEMA_ARG : NO_ARG);
meth.setAccessible(true);
CTOR_CACHE.put(c, meth);
}
- result = meth.newInstance();
+ result = meth.newInstance(useSchema ? new Object[]{s} : (Object[])null);
} catch (Exception e) {
throw new RuntimeException(e);
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificDatumWriter.java Wed Jul 21 21:14:29 2010
@@ -44,14 +44,12 @@ public class SpecificDatumWriter<T> exte
}
@Override
- protected Object getField(Object record, String name, int position) {
- return ((SpecificRecord)record).get(position);
- }
-
- @Override
protected void writeEnum(Schema schema, Object datum, Encoder out)
throws IOException {
- out.writeEnum(((Enum)datum).ordinal());
+ if (!(datum instanceof Enum))
+ super.writeEnum(schema, datum, out); // punt to generic
+ else
+ out.writeEnum(((Enum)datum).ordinal());
}
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/package.html Wed Jul 21 21:14:29 2010
@@ -36,5 +36,10 @@ Generate specific Java classes for schem
</ul>
+<p>Note that when a generated class is not found corresponding to a
+ record, enum or fixed schema, a {@link org.apache.avro.generic
+ generic} representation is used. This permits generated classes to
+ be nested within generic data structures.
+
</body>
</html>
Modified: avro/trunk/lang/java/src/test/bin/test_tools.sh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/bin/test_tools.sh?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/bin/test_tools.sh (original)
+++ avro/trunk/lang/java/src/test/bin/test_tools.sh Wed Jul 21 21:14:29 2010
@@ -81,7 +81,7 @@ $CMD getschema $TMPDIR/data_file_write.a
| cmp -s - <(echo '"string"')
######################################################################
# Test tethered mapred
-$CMD tether --in build/test/mapred/in --out build/test/mapred/tout --outschema ../../share/test/schemas/WordCount.avsc --program build/test/wordcount.jar
+$CMD tether --in build/test/mapred/in --out build/test/mapred/tout --outschema src/test/java/org/apache/avro/mapred/tether/WordCount.avsc --program build/test/wordcount.jar
$CMD tojson build/test/mapred/tout/part-00000.avro \
| cmp -s - <($CMD tojson build/test/mapred/out/part-00000.avro)
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/TestSchema.java Wed Jul 21 21:14:29 2010
@@ -412,9 +412,9 @@ public class TestSchema {
assertFalse(s0.equals(s2));
}
- private static void checkBinary(Schema schema, Object datum,
- DatumWriter<Object> writer,
- DatumReader<Object> reader)
+ public static void checkBinary(Schema schema, Object datum,
+ DatumWriter<Object> writer,
+ DatumReader<Object> reader)
throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
writer.setSchema(schema);
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.file.DataFileReader;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import test.Weather;
+
+/** Tests mapred API with a specific record. */
+public class TestWeather {
+
+ /** Uses default mapper with no reduces for a map-only identity job. */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testMapOnly() throws Exception {
+ JobConf job = new JobConf();
+ String inDir = System.getProperty("share.dir",".")+"/test/data";
+ Path input = new Path(inDir+"/weather.avro");
+ Path output = new Path(System.getProperty("test.dir",".")+"/weather-ident");
+
+ output.getFileSystem(job).delete(output);
+
+ job.setJobName("identity map weather");
+
+ AvroJob.setInputSchema(job, Weather.SCHEMA$);
+ AvroJob.setMapOutputSchema(job, Weather.SCHEMA$);
+
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, output);
+ FileOutputFormat.setCompressOutput(job, true);
+
+ job.setNumReduceTasks(0); // map-only
+
+ JobClient.runJob(job);
+
+ // check output is correct
+ DatumReader<Weather> reader = new SpecificDatumReader<Weather>();
+ DataFileReader<Weather> check = new DataFileReader<Weather>
+ (new File(inDir+"/weather.avro"), reader);
+ DataFileReader<Weather> sorted = new DataFileReader<Weather>
+ (new File(output.toString()+"/part-00000.avro"), reader);
+
+ for (Weather w : sorted)
+ assertEquals(check.next(), w);
+
+ check.close();
+ sorted.close();
+ }
+
+ // maps input Weather to Pair<Weather,Void>, to sort by Weather
+ public static class SortMapper extends AvroMapper<Weather,Pair<Weather,Void>>{
+ @Override
+ public void map(Weather w, AvroCollector<Pair<Weather,Void>> collector,
+ Reporter reporter) throws IOException {
+ collector.collect(new Pair<Weather,Void>(w, (Void)null));
+ }
+ }
+
+ // output keys only, since values are empty
+ public static class SortReducer
+ extends AvroReducer<Weather, Void, Weather> {
+ @Override
+ public void reduce(Weather w, Iterable<Void> ignore,
+ AvroCollector<Weather> collector,
+ Reporter reporter) throws IOException {
+ collector.collect(w);
+ }
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testSort() throws Exception {
+ JobConf job = new JobConf();
+ String inDir = System.getProperty("share.dir",".")+"/test/data";
+ Path input = new Path(inDir+"/weather.avro");
+ Path output = new Path(System.getProperty("test.dir",".")+"/weather-sort");
+
+ output.getFileSystem(job).delete(output);
+
+ job.setJobName("sort weather");
+
+ AvroJob.setInputSchema(job, Weather.SCHEMA$);
+ AvroJob.setMapOutputSchema
+ (job, Pair.getPairSchema(Weather.SCHEMA$, Schema.create(Type.NULL)));
+ AvroJob.setOutputSchema(job, Weather.SCHEMA$);
+
+ AvroJob.setMapperClass(job, SortMapper.class);
+ AvroJob.setReducerClass(job, SortReducer.class);
+
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, output);
+ FileOutputFormat.setCompressOutput(job, true);
+
+ JobClient.runJob(job);
+
+ // check output is correct
+ DatumReader<Weather> reader = new SpecificDatumReader<Weather>();
+ DataFileReader<Weather> check = new DataFileReader<Weather>
+ (new File(inDir+"/weather-sorted.avro"), reader);
+ DataFileReader<Weather> sorted = new DataFileReader<Weather>
+ (new File(output.toString()+"/part-00000.avro"), reader);
+
+ for (Weather w : sorted)
+ assertEquals(check.next(), w);
+
+ check.close();
+ sorted.close();
+ }
+
+
+}
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java Wed Jul 21 21:14:29 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+public class TestWordCount {
+
+ public static class MapImpl extends AvroMapper<Utf8, Pair<Utf8, Long> > {
+ @Override
+ public void map(Utf8 text, AvroCollector<Pair<Utf8,Long>> collector,
+ Reporter reporter) throws IOException {
+ StringTokenizer tokens = new StringTokenizer(text.toString());
+ while (tokens.hasMoreTokens())
+ collector.collect(new Pair<Utf8,Long>(new Utf8(tokens.nextToken()),1L));
+ }
+ }
+
+ public static class ReduceImpl
+ extends AvroReducer<Utf8, Long, Pair<Utf8, Long> > {
+ @Override
+ public void reduce(Utf8 word, Iterable<Long> counts,
+ AvroCollector<Pair<Utf8,Long>> collector,
+ Reporter reporter) throws IOException {
+ long sum = 0;
+ for (long count : counts)
+ sum += count;
+ collector.collect(new Pair<Utf8,Long>(word, sum));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testJob() throws Exception {
+ JobConf job = new JobConf();
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out");
+
+ outputPath.getFileSystem(job).delete(outputPath);
+ WordCountUtil.writeLinesFile();
+
+ job.setJobName("wordcount");
+
+ AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
+ AvroJob.setOutputSchema(job,
+ new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());
+
+ AvroJob.setMapperClass(job, MapImpl.class);
+ AvroJob.setCombinerClass(job, ReduceImpl.class);
+ AvroJob.setReducerClass(job, ReduceImpl.class);
+
+ FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+ FileOutputFormat.setOutputPath(job, outputPath);
+ FileOutputFormat.setCompressOutput(job, true);
+
+ JobClient.runJob(job);
+
+ WordCountUtil.validateCountsFile();
+ }
+
+}
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=966422&r1=966421&r2=966422&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java Wed Jul 21 21:14:29 2010
@@ -58,14 +58,14 @@ class WordCountUtil {
"the rain in spain falls mainly on the plains"
};
- private static final Map<String,Integer> COUNTS =
- new TreeMap<String,Integer>();
+ private static final Map<String,Long> COUNTS =
+ new TreeMap<String,Long>();
static {
for (String line : LINES) {
StringTokenizer tokens = new StringTokenizer(line);
while (tokens.hasMoreTokens()) {
String word = tokens.nextToken();
- int count = COUNTS.containsKey(word) ? COUNTS.get(word) : 0;
+ long count = COUNTS.containsKey(word) ? COUNTS.get(word) : 0L;
count++;
COUNTS.put(word, count);
}
@@ -93,13 +93,15 @@ class WordCountUtil {
}
public static void validateCountsFile() throws IOException {
- DatumReader<WordCount> reader = new SpecificDatumReader<WordCount>();
+ DatumReader<Pair<Utf8,Long>> reader
+ = new SpecificDatumReader<Pair<Utf8,Long>>();
InputStream in = new BufferedInputStream(new FileInputStream(COUNTS_FILE));
- DataFileStream<WordCount> counts = new DataFileStream<WordCount>(in,reader);
+ DataFileStream<Pair<Utf8,Long>> counts
+ = new DataFileStream<Pair<Utf8,Long>>(in,reader);
int numWords = 0;
- for (WordCount wc : counts) {
- assertEquals(wc.word.toString(),
- (int)COUNTS.get(wc.word.toString()), wc.count);
+ for (Pair<Utf8,Long> wc : counts) {
+ assertEquals(wc.key().toString(),
+ COUNTS.get(wc.key().toString()), wc.value());
numWords++;
}
in.close();
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc?rev=966422&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCount.avsc Wed Jul 21 21:14:29 2010
@@ -0,0 +1,6 @@
+{"type":"record",
+ "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
+ {"name":"key","type":"string"},
+ {"name":"value","type":"long","order":"ignore"}
+ ]
+}