You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/11/23 21:10:02 UTC
svn commit: r1038314 - in /avro/trunk: ./ lang/java/
lang/java/src/java/org/apache/avro/mapred/
lang/java/src/test/java/org/apache/avro/mapred/
Author: cutting
Date: Tue Nov 23 20:10:02 2010
New Revision: 1038314
URL: http://svn.apache.org/viewvc?rev=1038314&view=rev
Log:
AVRO-698. Java: Add MapReduce tests and documentation for jobs that mix Avro and non-Avro data.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/build.xml
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Nov 23 20:10:02 2010
@@ -36,6 +36,9 @@ Avro 1.5.0 (unreleased)
AVRO-648. Java: Use Velocity templates to generate specific code.
(philz via cutting)
+ AVRO-698. Java: Add MapReduce tests and documentation for jobs
+ that mix Avro and non-Avro data. (cutting)
+
BUG FIXES
AVRO-675. C: Bytes and fixed setters don't update datum size.
Modified: avro/trunk/lang/java/build.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/build.xml?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/build.xml (original)
+++ avro/trunk/lang/java/build.xml Tue Nov 23 20:10:02 2010
@@ -75,7 +75,7 @@
<property name="javadoc.link.servlet"
value="http://java.sun.com/products/servlet/2.3/javadoc/"/>
<property name="javadoc.link.hadoop"
- value="http://hadoop.apache.org/common/docs/current/api/"/>
+ value="http://hadoop.apache.org/common/docs/r0.20.0/api/"/>
<property name="javadoc.packages" value="org.${org}.${name}.*"/>
<property name="javac.encoding" value="ISO-8859-1"/>
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Tue Nov 23 20:10:02 2010
@@ -24,6 +24,8 @@ import java.io.UnsupportedEncodingExcept
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.avro.Schema;
/** Setters to configure jobs for Avro data. */
@@ -48,7 +50,7 @@ public class AvroJob {
/** Configure a job's map input schema. */
public static void setInputSchema(JobConf job, Schema s) {
job.set(INPUT_SCHEMA, s.toString());
- configureAvroJob(job);
+ configureAvroInput(job);
}
/** Return a job's map input schema. */
@@ -61,7 +63,7 @@ public class AvroJob {
* be a {@link Pair} schema. */
public static void setMapOutputSchema(JobConf job, Schema s) {
job.set(MAP_OUTPUT_SCHEMA, s.toString());
- configureAvroJob(job);
+ configureAvroShuffle(job);
}
/** Return a job's map output key schema. */
@@ -73,7 +75,7 @@ public class AvroJob {
* must be a {@link Pair} schema. */
public static void setOutputSchema(JobConf job, Schema s) {
job.set(OUTPUT_SCHEMA, s.toString());
- configureAvroJob(job);
+ configureAvroOutput(job);
}
/** Add metadata to job output files.*/
@@ -105,21 +107,32 @@ public class AvroJob {
return Schema.parse(job.get(OUTPUT_SCHEMA));
}
- private static void configureAvroJob(JobConf job) {
+ private static void configureAvroInput(JobConf job) {
if (job.get("mapred.input.format.class") == null)
job.setInputFormat(AvroInputFormat.class);
+
+ if (job.getMapperClass() == IdentityMapper.class)
+ job.setMapperClass(HadoopMapper.class);
+
+ configureAvroShuffle(job);
+ }
+
+ private static void configureAvroOutput(JobConf job) {
if (job.get("mapred.output.format.class") == null)
job.setOutputFormat(AvroOutputFormat.class);
+ if (job.getReducerClass() == IdentityReducer.class)
+ job.setReducerClass(HadoopReducer.class);
+
job.setOutputKeyClass(AvroWrapper.class);
+ configureAvroShuffle(job);
+ }
+
+ private static void configureAvroShuffle(JobConf job) {
job.setOutputKeyComparatorClass(AvroKeyComparator.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
-
- job.setMapperClass(HadoopMapper.class);
- job.setReducerClass(HadoopReducer.class);
-
// add AvroSerialization to io.serializations
Collection<String> serializations =
job.getStringCollection("io.serializations");
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html Tue Nov 23 20:10:02 2010
@@ -23,10 +23,20 @@ Avro data, with map and reduce functions
<p>Avro data files do not contain key/value pairs as expected by
Hadoop's MapReduce API, but rather just a sequence of values. Thus
- we provide here a layer on top of Hadoop's MapReduce API which
- eliminates the key/value distinction.</p>
+ we provide here a layer on top of Hadoop's MapReduce API.</p>
-<p>To use this for jobs whose input and output are Avro data files:
+<p>In all cases, input and output paths are set and jobs are submitted
+ as with standard Hadoop jobs:
+ <ul>
+ <li>Specify input files with {@link
+ org.apache.hadoop.mapred.FileInputFormat#setInputPaths}</li>
+ <li>Specify an output directory with {@link
+ org.apache.hadoop.mapred.FileOutputFormat#setOutputPath}</li>
+ <li>Run your job with {@link org.apache.hadoop.mapred.JobClient#runJob}</li>
+ </ul>
+</p>
+
+<p>For jobs whose input and output are Avro data files:
<ul>
<li>Call {@link org.apache.avro.mapred.AvroJob#setInputSchema} and
{@link org.apache.avro.mapred.AvroJob#setOutputSchema} with your
@@ -38,11 +48,53 @@ Avro data, with map and reduce functions
this as your job's reducer and perhaps combiner, with {@link
org.apache.avro.mapred.AvroJob#setReducerClass} and {@link
org.apache.avro.mapred.AvroJob#setCombinerClass}</li>
- <li>Specify input files with {@link org.apache.hadoop.mapred.FileInputFormat#setInputPaths}</li>
- <li>Specify an output directory with {@link
- org.apache.hadoop.mapred.FileOutputFormat#setOutputPath}</li>
- <li>Run your job with {@link org.apache.hadoop.mapred.JobClient#runJob}</li>
</ul>
</p>
+
+<p>For jobs whose input is an Avro data file and which use an {@link
+ org.apache.avro.mapred.AvroMapper}, but whose reducer is a non-Avro
+ {@link org.apache.hadoop.mapred.Reducer} and whose output is a
+ non-Avro format:
+ <ul>
+ <li>Call {@link org.apache.avro.mapred.AvroJob#setInputSchema} with your
+ job's input schema.</li>
+ <li>Subclass {@link org.apache.avro.mapred.AvroMapper} and specify
+ this as your job's mapper with {@link
+ org.apache.avro.mapred.AvroJob#setMapperClass}</li>
+ <li>Implement {@link org.apache.hadoop.mapred.Reducer} and specify
+ your job's reducer and combiner with {@link
+ org.apache.hadoop.mapred.JobConf#setReducerClass} and {@link
+ org.apache.hadoop.mapred.JobConf#setCombinerClass}. The input key
+ and value types should be {@link org.apache.avro.mapred.AvroKey} and {@link
+ org.apache.avro.mapred.AvroValue}.</li>
+ <li>Specify your job's output key and value types {@link
+ org.apache.hadoop.mapred.JobConf#setOutputKeyClass} and {@link
+ org.apache.hadoop.mapred.JobConf#setOutputValueClass}.</li>
+ <li>Specify your job's output format {@link
+ org.apache.hadoop.mapred.JobConf#setOutputFormat}.</li>
+ </ul>
+</p>
+
+<p>For jobs whose input is non-Avro data file and which use a
+ non-Avro {@link org.apache.hadoop.mapred.Mapper}, but whose reducer
+ is an {@link org.apache.avro.mapred.AvroReducer} and whose output is
+ an Avro data file:
+ <ul>
+ <li>Set your input file format with {@link
+ org.apache.hadoop.mapred.JobConf#setInputFormat}.</li>
+ <li>Implement {@link org.apache.hadoop.mapred.Mapper} and specify
+ your job's mapper with {@link
+ org.apache.hadoop.mapred.JobConf#setMapperClass}. The output key
+ and value type should be {@link org.apache.avro.mapred.AvroKey} and
+ {@link org.apache.avro.mapred.AvroValue}.</li>
+ <li>Subclass {@link org.apache.avro.mapred.AvroReducer} and specify
+ this as your job's reducer and perhaps combiner, with {@link
+ org.apache.avro.mapred.AvroJob#setReducerClass} and {@link
+ org.apache.avro.mapred.AvroJob#setCombinerClass}</li>
+ <li>Call {@link org.apache.avro.mapred.AvroJob#setOutputSchema} with your
+ job's output schema.</li>
+ </ul>
+</p>
+
</body>
</html>
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java Tue Nov 23 20:10:02 2010
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEqu
import java.io.IOException;
import java.io.File;
import java.net.URI;
+import java.util.Iterator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.conf.Configuration;
@@ -33,11 +34,19 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.avro.Schema;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -49,6 +58,10 @@ public class TestSequenceFileReader {
= new File(System.getProperty("test.dir", "/tmp"));
private static final File FILE = new File(DIR, "test.seq");
+ private static final Schema SCHEMA
+ = Pair.getPairSchema(Schema.create(Schema.Type.LONG),
+ Schema.create(Schema.Type.STRING));
+
@BeforeClass
public static void testWriteSequenceFile() throws IOException {
FILE.delete();
@@ -91,15 +104,16 @@ public class TestSequenceFileReader {
output.getFileSystem(job).delete(output);
- Schema schema = Pair.getPairSchema(Schema.create(Schema.Type.LONG),
- Schema.create(Schema.Type.STRING));
-
+ // configure input for Avro from sequence file
AvroJob.setInputSequenceFile(job);
+ FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+ AvroJob.setInputSchema(job, SCHEMA);
- AvroJob.setInputSchema(job, schema);
- AvroJob.setOutputSchema(job, schema);
+ // mapper is default, identity
+ // reducer is default, identity
- FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+ // configure output for avro
+ AvroJob.setOutputSchema(job, SCHEMA);
FileOutputFormat.setOutputPath(job, output);
JobClient.runJob(job);
@@ -109,4 +123,88 @@ public class TestSequenceFileReader {
new SpecificDatumReader<Pair<Long,CharSequence>>()));
}
+ private static class NonAvroMapper
+ extends MapReduceBase
+ implements Mapper<LongWritable,Text,AvroKey<Long>,AvroValue<Utf8>> {
+
+ public void map(LongWritable key, Text value,
+ OutputCollector<AvroKey<Long>,AvroValue<Utf8>> out,
+ Reporter reporter) throws IOException {
+ out.collect(new AvroKey<Long>(key.get()),
+ new AvroValue<Utf8>(new Utf8(value.toString())));
+ }
+ }
+
+ @Test
+ public void testNonAvroMapper() throws Exception {
+ JobConf job = new JobConf();
+ Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
+
+ output.getFileSystem(job).delete(output);
+
+ // configure input for non-Avro sequence file
+ job.setInputFormat(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+
+ // use a hadoop mapper that emits Avro output
+ job.setMapperClass(NonAvroMapper.class);
+
+ // reducer is default, identity
+
+ // configure output for avro
+ FileOutputFormat.setOutputPath(job, output);
+ AvroJob.setOutputSchema(job, SCHEMA);
+
+ JobClient.runJob(job);
+
+ checkFile(new DataFileReader<Pair<Long,CharSequence>>
+ (new File(output.toString()+"/part-00000.avro"),
+ new SpecificDatumReader<Pair<Long,CharSequence>>()));
+ }
+
+ private static class NonAvroReducer
+ extends MapReduceBase
+ implements Reducer<AvroKey<Long>,AvroValue<Utf8>,LongWritable,Text> {
+
+ public void reduce(AvroKey<Long> key, Iterator<AvroValue<Utf8>> values,
+ OutputCollector<LongWritable, Text> out,
+ Reporter reporter) throws IOException {
+ while (values.hasNext()) {
+ AvroValue<Utf8> value = values.next();
+ out.collect(new LongWritable(key.datum()),
+ new Text(value.datum().toString()));
+ }
+ }
+ }
+
+ @Test
+ public void testNonAvroReducer() throws Exception {
+ JobConf job = new JobConf();
+ Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
+
+ output.getFileSystem(job).delete(output);
+
+ // configure input for Avro from sequence file
+ AvroJob.setInputSequenceFile(job);
+ AvroJob.setInputSchema(job, SCHEMA);
+ FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+
+ // mapper is default, identity
+
+ // use a hadoop reducer that consumes Avro input
+ AvroJob.setMapOutputSchema(job, SCHEMA);
+ job.setReducerClass(NonAvroReducer.class);
+
+ // configure output for non-Avro SequenceFile
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, output);
+
+ // output key/value classes are default, LongWritable/Text
+
+ JobClient.runJob(job);
+
+ checkFile(new SequenceFileReader<Long,CharSequence>
+ (new File(output.toString()+"/part-00000")));
+ }
+
}
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=1038314&r1=1038313&r2=1038314&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWeather.java Tue Nov 23 20:10:02 2010
@@ -56,7 +56,7 @@ public class TestWeather {
job.setJobName("identity map weather");
AvroJob.setInputSchema(job, Weather.SCHEMA$);
- AvroJob.setMapOutputSchema(job, Weather.SCHEMA$);
+ AvroJob.setOutputSchema(job, Weather.SCHEMA$);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);