You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/02/05 00:34:03 UTC
svn commit: r1564562 - in /avro/trunk: ./
lang/java/mapred/src/main/java/org/apache/avro/mapred/
lang/java/mapred/src/test/java/org/apache/avro/mapred/
Author: cutting
Date: Tue Feb 4 23:34:02 2014
New Revision: 1564562
URL: http://svn.apache.org/r1564562
Log:
AVRO-1439. Java: Add AvroMultipleInputs for mapred. Contributed by Harsh J.
Added:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java (with props)
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java (with props)
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java (with props)
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java (with props)
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java (with props)
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java (with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1564562&r1=1564561&r2=1564562&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Feb 4 23:34:02 2014
@@ -4,6 +4,8 @@ Trunk (not yet released)
NEW FEATURES
+ AVRO-1439. Java: Add AvroMultipleInputs for mapred. (Harsh J via cutting)
+
OPTIMIZATIONS
IMPROVEMENTS
Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java Tue Feb 4 23:34:02 2014
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class supports Avro-MapReduce jobs that have multiple input paths with
+ * a different {@link Schema} and {@link AvroMapper} for each path.
+ *
+ * <p>
+ * Usage:
+ * </p>
+ * <p>
+ * <strong>Case 1: (ReflectData based inputs)</strong>
+ * </p>
+ * <pre>
+ * // Enable ReflectData usage across job.
+ * AvroJob.setReflect(job);
+ *
+ * Schema type1Schema = ReflectData.get().getSchema(Type1Record.class)
+ * AvroMultipleInputs.addInputPath(job, inputPath1, type1Schema, Type1AvroMapper.class);
+ * </pre>
+ *
+ * Where Type1AvroMapper would be implemented as
+ * <pre>
+ * class Type1AvroMapper extends AvroMapper<Type1Record, Pair<ComparingKeyRecord, CommonValueRecord>>
+ * </pre>
+ *
+ * <pre>
+ * Schema type2Schema = ReflectData.get().getSchema(Type2Record.class)
+ * AvroMultipleInputs.addInputPath(job, inputPath2, type2Schema, Type2AvroMapper.class);
+ * </pre>
+ *
+ * Where Type2AvroMapper would be implemented as
+ * <pre>
+ * class Type2AvroMapper extends AvroMapper<Type2Record, Pair<ComparingKeyRecord, CommonValueRecord>>
+ * </pre>
+ *
+ * <p>
+ * <strong>Case 2: (SpecificData based inputs)</strong>
+ * </p>
+ *
+ * <pre>
+ * Schema type1Schema = Type1Record.SCHEMA$;
+ * AvroMultipleInputs.addInputPath(job, inputPath1, type1Schema, Type1AvroMapper.class);
+ * </pre>
+ *
+ * Where Type1AvroMapper would be implemented as
+ * <pre>
+ * class Type1AvroMapper extends AvroMapper<Type1Record, Pair<ComparingKeyRecord, CommonValueRecord>>
+ * </pre>
+ *
+ * <pre>
+ * Schema type2Schema = Type2Record.SCHEMA$;
+ * AvroMultipleInputs.addInputPath(job, inputPath2, type2Schema, Type2AvroMapper.class);
+ * </pre>
+ *
+ * Where Type2AvroMapper would be implemented as
+ * <pre>
+ * class Type2AvroMapper extends AvroMapper<Type2Record, Pair<ComparingKeyRecord, CommonValueRecord>>
+ * </pre>
+ *
+ * <p>
+ * <strong>Note on InputFormat:</strong>
+ * The InputFormat used will always be {@link AvroInputFormat} when using this class.
+ * </p>
+ * <p>
+ * <strong>Note on collector outputs:</strong>
+ * When using this class, you will need to ensure that the mapper implementations
+ * involved must all emit the same Key type and Value
+ * record types, as set by {@link AvroJob#setOutputSchema(JobConf, Schema)}
+ * or {@link AvroJob#setMapOutputSchema(JobConf, Schema)}.
+ * </p>
+ */
+public class AvroMultipleInputs {
+ private static String schemaKey =
+ "avro.mapreduce.input.multipleinputs.dir.schemas";
+ private static String mappersKey =
+ "avro.mapreduce.input.multipleinputs.dir.mappers";
+ /**
+ * Add a {@link Path} with a custom {@link Schema} to the list of
+ * inputs for the map-reduce job.
+ *
+ * @param conf The configuration of the job
+ * @param path {@link Path} to be added to the list of inputs for the job
+ * @param inputSchema {@link Schema} class to use for this path
+ */
+ private static void addInputPath(JobConf conf, Path path,
+ Schema inputSchema) {
+
+ String schemaMapping = path.toString() + ";"
+ + toBase64(inputSchema.toString());
+
+ String schemas = conf.get(schemaKey);
+ conf.set(schemaKey,
+ schemas == null ? schemaMapping : schemas + ","
+ + schemaMapping);
+
+ conf.setInputFormat(DelegatingInputFormat.class);
+ }
+
+ /**
+ * Add a {@link Path} with a custom {@link Schema} and
+ * {@link AvroMapper} to the list of inputs for the map-reduce job.
+ *
+ * @param conf The configuration of the job
+ * @param path {@link Path} to be added to the list of inputs for the job
+ * @param inputSchema {@link Schema} to use for this path
+ * @param mapperClass {@link AvroMapper} class to use for this path
+ */
+ public static void addInputPath(JobConf conf, Path path,
+ Class<? extends AvroMapper> mapperClass,
+ Schema inputSchema) {
+
+ addInputPath(conf, path, inputSchema);
+
+ String mapperMapping = path.toString() + ";" + mapperClass.getName();
+ System.out.println(mapperMapping);
+ String mappers = conf.get(mappersKey);
+ conf.set(mappersKey, mappers == null ? mapperMapping
+ : mappers + "," + mapperMapping);
+
+ conf.setMapperClass(DelegatingMapper.class);
+ }
+
+ /**
+ * Retrieves a map of {@link Path}s to the {@link AvroMapper} class that
+ * should be used for them.
+ *
+ * @param conf The configuration of the job
+ * @see #addInputPath(JobConf, Path, Class, Schema)
+ * @return A map of paths-to-mappers for the job
+ */
+ @SuppressWarnings("unchecked")
+ static Map<Path, Class<? extends AvroMapper>> getMapperTypeMap(JobConf conf) {
+ if (conf.get(mappersKey) == null) {
+ return Collections.emptyMap();
+ }
+ Map<Path, Class<? extends AvroMapper>> m = new HashMap<Path, Class<? extends AvroMapper>>();
+ String[] pathMappings = conf.get(mappersKey).split(",");
+ for (String pathMapping : pathMappings) {
+ String[] split = pathMapping.split(";");
+ Class<? extends AvroMapper> mapClass;
+ try {
+ mapClass = (Class<? extends AvroMapper>) conf.getClassByName(split[1]);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ m.put(new Path(split[0]), mapClass);
+ }
+ return m;
+ }
+
+ /**
+ * Retrieves a map of {@link Path}s to the {@link Schema} that
+ * should be used for them.
+ *
+ * @param conf The configuration of the job
+ * @see #addInputPath(JobConf, Path, Class, Schema)
+ * @return A map of paths to schemas for the job
+ */
+ static Map<Path, Schema> getInputSchemaMap(JobConf conf) {
+ if (conf.get(schemaKey) == null) {
+ return Collections.emptyMap();
+ }
+ Map<Path, Schema> m = new HashMap<Path, Schema>();
+ String[] schemaMappings =
+ conf.get(schemaKey).split(",");
+ Schema.Parser schemaParser = new Schema.Parser();
+ for (String schemaMapping : schemaMappings) {
+ String[] split = schemaMapping.split(";");
+ String schemaString = fromBase64(split[1]);
+ Schema inputSchema;
+ try {
+ inputSchema = schemaParser.parse(schemaString);
+ } catch (SchemaParseException e) {
+ throw new RuntimeException(e);
+ }
+ m.put(new Path(split[0]), inputSchema);
+ }
+ return m;
+ }
+
+ private static String toBase64(String rawString) {
+ Base64 base64decoder = new Base64();
+ return new String(base64decoder.encode(rawString.getBytes()));
+ }
+
+ private static String fromBase64(String base64String) {
+ Base64 base64decoder = new Base64();
+ return new String(base64decoder.decode(base64String));
+ }
+
+}
Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java Tue Feb 4 23:34:02 2014
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputFormat} that delegates read behavior of paths based on
+ * their associated avro schema.
+ * @see MultipleInputs#addInputPath(JobConf, Path, Class, Class)
+ */
+class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
+
+ public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
+
+ JobConf confCopy = new JobConf(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+
+ Map<Path, Class<? extends AvroMapper>> mapperMap = AvroMultipleInputs
+ .getMapperTypeMap(conf);
+ Map<Path, Schema> schemaMap = AvroMultipleInputs
+ .getInputSchemaMap(conf);
+ Map<Schema, List<Path>> schemaPaths
+ = new HashMap<Schema, List<Path>>();
+
+ // First, build a map of Schemas to Paths
+ for (Entry<Path, Schema> entry : schemaMap.entrySet()) {
+ if (!schemaPaths.containsKey(entry.getValue())) {
+ schemaPaths.put(entry.getValue(), new LinkedList<Path>());
+ System.out.println(entry.getValue());
+ System.out.println(entry.getKey());
+ }
+
+ schemaPaths.get(entry.getValue()).add(entry.getKey());
+ }
+
+ for (Entry<Schema, List<Path>> schemaEntry :
+ schemaPaths.entrySet()) {
+ Schema schema = schemaEntry.getKey();
+ System.out.println(schema);
+ InputFormat format = (InputFormat) ReflectionUtils.newInstance(
+ AvroInputFormat.class, conf);
+ List<Path> paths = schemaEntry.getValue();
+
+ Map<Class<? extends AvroMapper>, List<Path>> mapperPaths
+ = new HashMap<Class<? extends AvroMapper>, List<Path>>();
+
+ // Now, for each set of paths that have a common Schema, build
+ // a map of Mappers to the paths they're used for
+ for (Path path : paths) {
+ Class<? extends AvroMapper> mapperClass = mapperMap.get(path);
+ if (!mapperPaths.containsKey(mapperClass)) {
+ mapperPaths.put(mapperClass, new LinkedList<Path>());
+ }
+
+ mapperPaths.get(mapperClass).add(path);
+ }
+
+ // Now each set of paths that has a common InputFormat and Mapper can
+ // be added to the same job, and split together.
+ for (Entry<Class<? extends AvroMapper>, List<Path>> mapEntry : mapperPaths
+ .entrySet()) {
+ paths = mapEntry.getValue();
+ Class<? extends AvroMapper> mapperClass = mapEntry.getKey();
+
+ if (mapperClass == null) {
+ mapperClass = (Class<? extends AvroMapper>) conf.getMapperClass();
+ }
+
+ FileInputFormat.setInputPaths(confCopy, paths.toArray(new Path[paths
+ .size()]));
+
+ // Get splits for each input path and tag with InputFormat
+ // and Mapper types by wrapping in a TaggedInputSplit.
+ InputSplit[] pathSplits = format.getSplits(confCopy, numSplits);
+ for (InputSplit pathSplit : pathSplits) {
+ splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(),
+ mapperClass, schema));
+ }
+ }
+ }
+
+ return splits.toArray(new InputSplit[splits.size()]);
+ }
+
+ @SuppressWarnings("unchecked")
+ public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
+ Reporter reporter) throws IOException {
+
+ // Find the Schema and then build the RecordReader from the
+ // TaggedInputSplit.
+
+ TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+ Schema schema = taggedInputSplit.getSchema();
+ AvroJob.setInputSchema(conf, schema);
+ InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+ .newInstance(taggedInputSplit.getInputFormatClass(), conf);
+ return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
+ reporter);
+ }
+}
Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java Tue Feb 4 23:34:02 2014
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link Mapper} that delegates behaviour of paths to multiple other
+ * mappers. Similar to {@link HadoopMapper}, but instantiates map classes
+ * in the map() call instead of during configure(), as we rely on the split
+ * object to provide us that information.
+ *
+ * @see {@link AvroMultipleInputs#addInputPath(JobConf, Path, Class, Schema)}
+ */
+class DelegatingMapper<IN,OUT,K,V,KO,VO> extends MapReduceBase
+implements Mapper<AvroWrapper<IN>,NullWritable,KO,VO>
+{
+ AvroMapper<IN, OUT> mapper;
+ JobConf conf;
+ boolean isMapOnly;
+ AvroCollector<OUT> out;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ this.isMapOnly = conf.getNumReduceTasks() == 0;
+ }
+
+ @Override
+ public void map(AvroWrapper<IN> wrapper, NullWritable value,
+ OutputCollector<KO, VO> collector, Reporter reporter)
+ throws IOException {
+ if (mapper == null) {
+ TaggedInputSplit is = (TaggedInputSplit) reporter.getInputSplit();
+ Class<? extends AvroMapper> mapperClass = is.getMapperClass();
+ mapper = (AvroMapper<IN,OUT>)
+ ReflectionUtils.newInstance(mapperClass, conf);
+ }
+ if (out == null)
+ out = new MapCollector<OUT,K,V,KO,VO>(collector, isMapOnly);
+ mapper.map(wrapper.datum(), out, reporter);
+ }
+}
Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java?rev=1564562&r1=1564561&r2=1564562&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java Tue Feb 4 23:34:02 2014
@@ -35,7 +35,7 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
implements Mapper<AvroWrapper<IN>, NullWritable, KO, VO> {
private AvroMapper<IN,OUT> mapper;
- private MapCollector out;
+ private MapCollector<OUT,K,V,KO,VO> out;
private boolean isMapOnly;
@Override @SuppressWarnings("unchecked")
@@ -47,36 +47,12 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
this.isMapOnly = conf.getNumReduceTasks() == 0;
}
- @SuppressWarnings("unchecked")
- private class MapCollector extends AvroCollector<OUT> {
- private final AvroWrapper<OUT> wrapper = new AvroWrapper<OUT>(null);
- private final AvroKey<K> keyWrapper = new AvroKey(null);
- private final AvroValue<V> valueWrapper = new AvroValue(null);
- private OutputCollector<KO,VO> collector;
-
- public MapCollector(OutputCollector<KO,VO> collector) {
- this.collector = collector;
- }
-
- public void collect(OUT datum) throws IOException {
- if (isMapOnly) {
- wrapper.datum(datum);
- collector.collect((KO)wrapper, (VO)NullWritable.get());
- } else { // split a pair
- Pair<K,V> pair = (Pair<K,V>)datum;
- keyWrapper.datum(pair.key());
- valueWrapper.datum(pair.value());
- collector.collect((KO)keyWrapper, (VO)valueWrapper);
- }
- }
- }
-
@Override
public void map(AvroWrapper<IN> wrapper, NullWritable value,
OutputCollector<KO,VO> collector,
Reporter reporter) throws IOException {
if (this.out == null)
- this.out = new MapCollector(collector);
+ this.out = new MapCollector<OUT,K,V,KO,VO>(collector, isMapOnly);
mapper.map(wrapper.datum(), out, reporter);
}
Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java Tue Feb 4 23:34:02 2014
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+
+@SuppressWarnings("unchecked")
+class MapCollector<OUT,K,V,KO,VO> extends AvroCollector<OUT> {
+ private final AvroWrapper<OUT> wrapper = new AvroWrapper<OUT>(null);
+ private final AvroKey<K> keyWrapper = new AvroKey<K>(null);
+ private final AvroValue<V> valueWrapper = new AvroValue<V>(null);
+ private OutputCollector<KO,VO> collector;
+ private boolean isMapOnly;
+
+ public MapCollector(OutputCollector<KO,VO> collector, boolean isMapOnly) {
+ this.collector = collector;
+ this.isMapOnly = isMapOnly;
+ }
+
+ public void collect(OUT datum) throws IOException {
+ if (isMapOnly) {
+ wrapper.datum(datum);
+ collector.collect((KO)wrapper, (VO)NullWritable.get());
+ } else {
+ // split a pair
+ Pair<K,V> pair = (Pair<K,V>)datum;
+ keyWrapper.datum(pair.key());
+ valueWrapper.datum(pair.value());
+ collector.collect((KO)keyWrapper, (VO)valueWrapper);
+ }
+ }
+}
Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java Tue Feb 4 23:34:02 2014
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputSplit} that tags another InputSplit with extra data for use
+ * by {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
+ */
+class TaggedInputSplit implements Configurable, InputSplit {
+
+ private Class<? extends InputSplit> inputSplitClass;
+
+ private InputSplit inputSplit;
+
+ private Class<? extends InputFormat> inputFormatClass;
+
+ private Class<? extends AvroMapper> mapperClass;
+
+ private Schema schema;
+
+ private Schema.Parser schemaParser = new Schema.Parser();
+
+ private Configuration conf;
+
+ public TaggedInputSplit() {
+ // Default constructor.
+ }
+
+ /**
+ * Creates a new TaggedInputSplit.
+ *
+ * @param inputSplit The InputSplit to be tagged
+ * @param conf The configuration to use
+ * @param inputFormatClass The InputFormat class to use for this job
+ * @param mapperClass The Mapper class to use for this job
+ */
+ public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
+ Class<? extends InputFormat> inputFormatClass,
+ Class<? extends AvroMapper> mapperClass,
+ Schema inputSchema) {
+ this.inputSplitClass = inputSplit.getClass();
+ this.inputSplit = inputSplit;
+ this.conf = conf;
+ this.inputFormatClass = inputFormatClass;
+ this.mapperClass = mapperClass;
+ this.schema = inputSchema;
+ }
+
+ /**
+ * Retrieves the original InputSplit.
+ *
+ * @return The InputSplit that was tagged
+ */
+ public InputSplit getInputSplit() {
+ return inputSplit;
+ }
+
+ /**
+ * Retrieves the InputFormat class to use for this split.
+ *
+ * @return The InputFormat class to use
+ */
+ public Class<? extends InputFormat> getInputFormatClass() {
+ return inputFormatClass;
+ }
+
+ /**
+ * Retrieves the Mapper class to use for this split.
+ *
+ * @return The Mapper class to use
+ */
+ public Class<? extends AvroMapper> getMapperClass() {
+ return mapperClass;
+ }
+
+ /**
+ * Retrieves the Schema to use for this split.
+ *
+ * @return The schema for record readers to use
+ */
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public long getLength() throws IOException {
+ return inputSplit.getLength();
+ }
+
+ public String[] getLocations() throws IOException {
+ return inputSplit.getLocations();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void readFields(DataInput in) throws IOException {
+ inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+ inputSplit = (InputSplit) ReflectionUtils
+ .newInstance(inputSplitClass, conf);
+ inputSplit.readFields(in);
+ inputFormatClass = (Class<? extends InputFormat>) readClass(in);
+ mapperClass = (Class<? extends AvroMapper>) readClass(in);
+ String schemaString = Text.readString(in);
+ schema = schemaParser.parse(schemaString);
+ }
+
+ private Class<?> readClass(DataInput in) throws IOException {
+ String className = Text.readString(in);
+ try {
+ return conf.getClassByName(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("readObject can't find class", e);
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, inputSplitClass.getName());
+ inputSplit.write(out);
+ Text.writeString(out, inputFormatClass.getName());
+ Text.writeString(out, mapperClass.getName());
+ Text.writeString(out, schema.toString());
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public String toString() {
+ return inputSplit.toString();
+ }
+}
Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java Tue Feb 4 23:34:02 2014
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.BufferedInputStream;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestAvroMultipleInputs {
+
+ /** The input-1 record. */
+ public static class NamesRecord {
+ private int id = -1;
+ private CharSequence name = "";
+
+ public NamesRecord() {}
+
+ public NamesRecord(int id, CharSequence name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return id + "\t" + name;
+ }
+ }
+
+ /** The input-2 record. */
+ public static class BalancesRecord {
+ private int id = -1;
+ private long balance = 0L;
+
+ public BalancesRecord() {}
+
+ public BalancesRecord(int id, long balance) {
+ this.id = id;
+ this.balance = balance;
+ }
+
+ @Override
+ public String toString() {
+ return id + "\t" + balance;
+ }
+ }
+
+ /** The map output key record. */
+ public static class KeyRecord {
+ private int id = -1;
+
+ public KeyRecord() {}
+
+ public KeyRecord(int id) {
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return ((Integer) id).toString();
+ }
+ }
+
+ /** The common map output value record.
+ * Carries a tag specifying what source
+ * record type was.
+ */
+ public static class JoinableRecord {
+ private int id = -1;
+ private CharSequence name = "";
+ private long balance = 0L;
+ private CharSequence recType = "";
+
+ public JoinableRecord() {}
+
+ public JoinableRecord(
+ CharSequence recType,
+ int id,
+ CharSequence name,
+ long balance) {
+ this.id = id;
+ this.recType = recType;
+ this.name = name;
+ this.balance = balance;
+ }
+
+ @Override
+ public String toString() {
+ return recType.toString();
+ }
+ }
+
+ /** The output, combined record. */
+ public static class CompleteRecord {
+ private int id = -1;
+ private CharSequence name = "";
+ private long balance = 0L;
+
+ public CompleteRecord() {}
+
+ public CompleteRecord(int id, CharSequence name, long balance) {
+ this.name = name;
+ this.id = id;
+ this.balance = balance;
+ }
+
+ void setId(int id) { this.id = id; };
+
+ void setName(CharSequence name) { this.name = name; };
+
+ void setBalance(long balance) { this.balance = balance; };
+
+ @Override
+ public String toString() {
+ return id + "\t" + name + "\t" + balance;
+ }
+ }
+
+ public static class NamesMapImpl
+ extends AvroMapper<NamesRecord, Pair<KeyRecord, JoinableRecord>> {
+
+ @Override
+ public void map(
+ NamesRecord nameRecord,
+ AvroCollector<Pair<KeyRecord, JoinableRecord>> collector,
+ Reporter reporter) throws IOException {
+ collector.collect(
+ new Pair<KeyRecord, JoinableRecord>(
+ new KeyRecord(nameRecord.id),
+ new JoinableRecord(nameRecord.getClass().getName(),
+ nameRecord.id, nameRecord.name, -1L)));
+ }
+
+ }
+
+ public static class BalancesMapImpl
+ extends AvroMapper<BalancesRecord, Pair<KeyRecord, JoinableRecord>> {
+
+ @Override
+ public void map(
+ BalancesRecord balanceRecord,
+ AvroCollector<Pair<KeyRecord, JoinableRecord>> collector,
+ Reporter reporter) throws IOException {
+ collector.collect(
+ new Pair<KeyRecord, JoinableRecord>(
+ new KeyRecord(balanceRecord.id),
+ new JoinableRecord(balanceRecord.getClass().getName(),
+ balanceRecord.id, "", balanceRecord.balance)));
+ }
+
+ }
+
+ public static class ReduceImpl
+ extends AvroReducer<KeyRecord, JoinableRecord, CompleteRecord> {
+
+ @Override
+ public void reduce(KeyRecord ID, Iterable<JoinableRecord> joinables,
+ AvroCollector<CompleteRecord> collector,
+ Reporter reporter) throws IOException {
+ CompleteRecord rec = new CompleteRecord();
+ for (JoinableRecord joinable : joinables) {
+ rec.setId(joinable.id);
+ if (joinable.recType.toString().contains("NamesRecord")) {
+ rec.setName(joinable.name);
+ } else {
+ rec.setBalance(joinable.balance);
+ }
+ }
+ collector.collect(rec);
+ }
+
+ }
+
+ @Test
+ public void testJob() throws Exception {
+ JobConf job = new JobConf();
+ String dir = System.getProperty("test.dir", ".") +
+ "target/testAvroMultipleInputs";
+ Path inputPath1 = new Path(dir + "/in1");
+ Path inputPath2 = new Path(dir + "/in2");
+ Path outputPath = new Path(dir + "/out");
+
+ outputPath.getFileSystem(job).delete(outputPath, true);
+ inputPath1.getFileSystem(job).delete(inputPath1, true);
+ inputPath2.getFileSystem(job).delete(inputPath2, true);
+
+ writeNamesFiles(new File(inputPath1.toUri().getPath()));
+ writeBalancesFiles(new File(inputPath2.toUri().getPath()));
+
+ job.setJobName("multiple-inputs-join");
+ AvroMultipleInputs.addInputPath(job, inputPath1, NamesMapImpl.class,
+ ReflectData.get().getSchema(NamesRecord.class));
+ AvroMultipleInputs.addInputPath(job, inputPath2, BalancesMapImpl.class,
+ ReflectData.get().getSchema(BalancesRecord.class));
+
+ Schema keySchema = ReflectData.get().getSchema(KeyRecord.class);
+ Schema valueSchema = ReflectData.get().getSchema(JoinableRecord.class);
+ AvroJob.setMapOutputSchema
+ (job, Pair.getPairSchema(keySchema, valueSchema));
+ AvroJob.setOutputSchema(job,
+ ReflectData.get().getSchema(CompleteRecord.class));
+
+ AvroJob.setReducerClass(job, ReduceImpl.class);
+ job.setNumReduceTasks(1);
+
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ AvroJob.setReflect(job);
+
+ JobClient.runJob(job);
+
+ validateCompleteFile(new File(new File(dir, "out"), "part-00000.avro"));
+ }
+
+ /**
+ * Writes a "names.avro" file with five sequential <id, name> pairs.
+ */
+ private void writeNamesFiles(File dir) throws IOException {
+ DatumWriter<NamesRecord> writer = new ReflectDatumWriter<NamesRecord>();
+ DataFileWriter<NamesRecord> out = new DataFileWriter<NamesRecord>(writer);
+ File namesFile = new File(dir+"/names.avro");
+ dir.mkdirs();
+ out.create(ReflectData.get().getSchema(NamesRecord.class), namesFile);
+ for (int i=0; i < 5; i++)
+ out.append(new NamesRecord(i, "record"+i));
+ out.close();
+ }
+
+ /**
+ * Writes a "balances.avro" file with five sequential <id, balance> pairs.
+ */
+ private void writeBalancesFiles(File dir) throws IOException {
+ DatumWriter<BalancesRecord> writer =
+ new ReflectDatumWriter<BalancesRecord>();
+ DataFileWriter<BalancesRecord> out =
+ new DataFileWriter<BalancesRecord>(writer);
+ File namesFile = new File(dir+"/balances.avro");
+ dir.mkdirs();
+ out.create(ReflectData.get().getSchema(BalancesRecord.class), namesFile);
+ for (int i=0; i < 5; i++)
+ out.append(new BalancesRecord(i, (long) i+100));
+ out.close();
+ }
+
+ private void validateCompleteFile(File file) throws Exception {
+ DatumReader<CompleteRecord> reader =
+ new ReflectDatumReader<CompleteRecord>();
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
+ DataFileStream<CompleteRecord> records =
+ new DataFileStream<CompleteRecord>(in,reader);
+ int numRecs = 0;
+ for (CompleteRecord rec : records) {
+ assertEquals(rec.id, numRecs);
+ assertEquals(rec.balance-100, rec.id);
+ assertEquals(rec.name, "record"+rec.id);
+ numRecs++;
+ }
+ records.close();
+ assertEquals(5, numRecs);
+ }
+
+}
Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java
------------------------------------------------------------------------------
svn:eol-style = native