You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/02/05 00:34:03 UTC

svn commit: r1564562 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/mapred/ lang/java/mapred/src/test/java/org/apache/avro/mapred/

Author: cutting
Date: Tue Feb  4 23:34:02 2014
New Revision: 1564562

URL: http://svn.apache.org/r1564562
Log:
AVRO-1439. Java: Add AvroMultipleInputs for mapred.  Contributed by Harsh J.

Added:
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java   (with props)
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java   (with props)
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java   (with props)
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java   (with props)
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java   (with props)
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1564562&r1=1564561&r2=1564562&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Feb  4 23:34:02 2014
@@ -4,6 +4,8 @@ Trunk (not yet released)
 
   NEW FEATURES
 
+    AVRO-1439. Java: Add AvroMultipleInputs for mapred. (Harsh J via cutting)
+
   OPTIMIZATIONS
 
   IMPROVEMENTS

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java Tue Feb  4 23:34:02 2014
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class supports Avro-MapReduce jobs that have multiple input paths with
+ * a different {@link Schema} and {@link AvroMapper} for each path.
+ *
+ * <p>
+ * Usage:
+ * </p>
+ * <p>
+ * <strong>Case 1: (ReflectData based inputs)</strong>
+ * </p>
+ * <pre>
+ * // Enable ReflectData usage across job.
+ * AvroJob.setReflect(job);
+ *
+ * Schema type1Schema = ReflectData.get().getSchema(Type1Record.class)
+ * AvroMultipleInputs.addInputPath(job, inputPath1, type1Schema, Type1AvroMapper.class);
+ * </pre>
+ *
+ * Where Type1AvroMapper would be implemented as
+ * <pre>
+ *  class Type1AvroMapper extends AvroMapper&lt;Type1Record, Pair&lt;ComparingKeyRecord, CommonValueRecord&gt;&gt;
+ * </pre>
+ *
+ * <pre>
+ * Schema type2Schema = ReflectData.get().getSchema(Type2Record.class)
+ * AvroMultipleInputs.addInputPath(job, inputPath2, type2Schema, Type2AvroMapper.class);
+ * </pre>
+ *
+ * Where Type2AvroMapper would be implemented as
+ * <pre>
+ *  class Type2AvroMapper extends AvroMapper&lt;Type2Record, Pair&lt;ComparingKeyRecord, CommonValueRecord&gt;&gt;
+ * </pre>
+ *
+ * <p>
+ * <strong>Case 2: (SpecificData based inputs)</strong>
+ * </p>
+ *
+ * <pre>
+ * Schema type1Schema = Type1Record.SCHEMA$;
+ * AvroMultipleInputs.addInputPath(job, inputPath1, type1Schema, Type1AvroMapper.class);
+ * </pre>
+ *
+ * Where Type1AvroMapper would be implemented as
+ * <pre>
+ *  class Type1AvroMapper extends AvroMapper&lt;Type1Record, Pair&lt;ComparingKeyRecord, CommonValueRecord&gt;&gt;
+ * </pre>
+ *
+ * <pre>
+ * Schema type2Schema = Type2Record.SCHEMA$;
+ * AvroMultipleInputs.addInputPath(job, inputPath2, type2Schema, Type2AvroMapper.class);
+ * </pre>
+ *
+ * Where Type2AvroMapper would be implemented as
+ * <pre>
+ *  class Type2AvroMapper extends AvroMapper&lt;Type2Record, Pair&lt;ComparingKeyRecord, CommonValueRecord&gt;&gt;
+ * </pre>
+ *
+ * <p>
+ * <strong>Note on InputFormat:</strong>
+ *   The InputFormat used will always be {@link AvroInputFormat} when using this class.
+ * </p>
+ * <p>
+ * <strong>Note on collector outputs:</strong>
+ *   When using this class, you will need to ensure that the mapper implementations
+ *   involved must all emit the same Key type and Value
+ *   record types, as set by {@link AvroJob#setOutputSchema(JobConf, Schema)}
+ *   or {@link AvroJob#setMapOutputSchema(JobConf, Schema)}.
+ * </p>
+ */
+public class AvroMultipleInputs {
+  private static String schemaKey =
+      "avro.mapreduce.input.multipleinputs.dir.schemas";
+  private static String mappersKey =
+      "avro.mapreduce.input.multipleinputs.dir.mappers";
+  /**
+   * Add a {@link Path} with a custom {@link Schema} to the list of
+   * inputs for the map-reduce job.
+   *
+   * @param conf The configuration of the job
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputSchema {@link Schema} class to use for this path
+   */
+  private static void addInputPath(JobConf conf, Path path,
+      Schema inputSchema) {
+
+    String schemaMapping = path.toString() + ";"
+       + toBase64(inputSchema.toString());
+
+    String schemas = conf.get(schemaKey);
+    conf.set(schemaKey,
+        schemas == null ? schemaMapping : schemas + ","
+            + schemaMapping);
+
+    conf.setInputFormat(DelegatingInputFormat.class);
+  }
+
+  /**
+   * Add a {@link Path} with a custom {@link Schema} and
+   * {@link AvroMapper} to the list of inputs for the map-reduce job.
+   *
+   * @param conf The configuration of the job
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputSchema {@link Schema} to use for this path
+   * @param mapperClass {@link AvroMapper} class to use for this path
+   */
+  public static void addInputPath(JobConf conf, Path path,
+      Class<? extends AvroMapper> mapperClass,
+      Schema inputSchema) {
+
+    addInputPath(conf, path, inputSchema);
+
+    String mapperMapping = path.toString() + ";" + mapperClass.getName();
+    System.out.println(mapperMapping);
+    String mappers = conf.get(mappersKey);
+    conf.set(mappersKey, mappers == null ? mapperMapping
+       : mappers + "," + mapperMapping);
+
+    conf.setMapperClass(DelegatingMapper.class);
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link AvroMapper} class that
+   * should be used for them.
+   *
+   * @param conf The configuration of the job
+   * @see #addInputPath(JobConf, Path, Class, Schema)
+   * @return A map of paths-to-mappers for the job
+   */
+  @SuppressWarnings("unchecked")
+  static Map<Path, Class<? extends AvroMapper>> getMapperTypeMap(JobConf conf) {
+    if (conf.get(mappersKey) == null) {
+      return Collections.emptyMap();
+    }
+    Map<Path, Class<? extends AvroMapper>> m = new HashMap<Path, Class<? extends AvroMapper>>();
+    String[] pathMappings = conf.get(mappersKey).split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      Class<? extends AvroMapper> mapClass;
+      try {
+       mapClass = (Class<? extends AvroMapper>) conf.getClassByName(split[1]);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), mapClass);
+    }
+    return m;
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link Schema} that
+   * should be used for them.
+   *
+   * @param conf The configuration of the job
+   * @see #addInputPath(JobConf, Path, Class, Schema)
+   * @return A map of paths to schemas for the job
+   */
+  static Map<Path, Schema> getInputSchemaMap(JobConf conf) {
+    if (conf.get(schemaKey) == null) {
+      return Collections.emptyMap();
+    }
+    Map<Path, Schema> m = new HashMap<Path, Schema>();
+    String[] schemaMappings =
+        conf.get(schemaKey).split(",");
+    Schema.Parser schemaParser = new Schema.Parser();
+    for (String schemaMapping : schemaMappings) {
+      String[] split = schemaMapping.split(";");
+      String schemaString = fromBase64(split[1]);
+      Schema inputSchema;
+      try {
+       inputSchema = schemaParser.parse(schemaString);
+      } catch (SchemaParseException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), inputSchema);
+    }
+    return m;
+  }
+
+  private static String toBase64(String rawString) {
+    Base64 base64decoder = new Base64();
+    return new String(base64decoder.encode(rawString.getBytes()));
+  }
+
+  private static String fromBase64(String base64String) {
+    Base64 base64decoder = new Base64();
+    return new String(base64decoder.decode(base64String));
+  }
+
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleInputs.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java Tue Feb  4 23:34:02 2014
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputFormat} that delegates read behavior of paths based on
+ * their associated avro schema.
+ * @see MultipleInputs#addInputPath(JobConf, Path, Class, Class)
+ */
+class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
+
+  public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
+
+    JobConf confCopy = new JobConf(conf);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    Map<Path, Class<? extends AvroMapper>> mapperMap = AvroMultipleInputs
+       .getMapperTypeMap(conf);
+    Map<Path, Schema> schemaMap = AvroMultipleInputs
+        .getInputSchemaMap(conf);
+    Map<Schema, List<Path>> schemaPaths
+        = new HashMap<Schema, List<Path>>();
+
+    // First, build a map of Schemas to Paths
+    for (Entry<Path, Schema> entry : schemaMap.entrySet()) {
+      if (!schemaPaths.containsKey(entry.getValue())) {
+        schemaPaths.put(entry.getValue(), new LinkedList<Path>());
+        System.out.println(entry.getValue());
+        System.out.println(entry.getKey());
+      }
+
+      schemaPaths.get(entry.getValue()).add(entry.getKey());
+    }
+
+    for (Entry<Schema, List<Path>> schemaEntry :
+        schemaPaths.entrySet()) {
+      Schema schema = schemaEntry.getKey();
+      System.out.println(schema);
+      InputFormat format = (InputFormat) ReflectionUtils.newInstance(
+         AvroInputFormat.class, conf);
+      List<Path> paths = schemaEntry.getValue();
+
+      Map<Class<? extends AvroMapper>, List<Path>> mapperPaths
+          = new HashMap<Class<? extends AvroMapper>, List<Path>>();
+
+      // Now, for each set of paths that have a common Schema, build
+      // a map of Mappers to the paths they're used for
+      for (Path path : paths) {
+       Class<? extends AvroMapper> mapperClass = mapperMap.get(path);
+       if (!mapperPaths.containsKey(mapperClass)) {
+         mapperPaths.put(mapperClass, new LinkedList<Path>());
+       }
+
+       mapperPaths.get(mapperClass).add(path);
+      }
+
+      // Now each set of paths that has a common InputFormat and Mapper can
+      // be added to the same job, and split together.
+      for (Entry<Class<? extends AvroMapper>, List<Path>> mapEntry : mapperPaths
+         .entrySet()) {
+       paths = mapEntry.getValue();
+       Class<? extends AvroMapper> mapperClass = mapEntry.getKey();
+
+       if (mapperClass == null) {
+         mapperClass = (Class<? extends AvroMapper>) conf.getMapperClass();
+       }
+
+       FileInputFormat.setInputPaths(confCopy, paths.toArray(new Path[paths
+           .size()]));
+
+       // Get splits for each input path and tag with InputFormat
+       // and Mapper types by wrapping in a TaggedInputSplit.
+       InputSplit[] pathSplits = format.getSplits(confCopy, numSplits);
+       for (InputSplit pathSplit : pathSplits) {
+         splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(),
+             mapperClass, schema));
+       }
+      }
+    }
+
+    return splits.toArray(new InputSplit[splits.size()]);
+  }
+
+  @SuppressWarnings("unchecked")
+  public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
+      Reporter reporter) throws IOException {
+
+    // Find the Schema and then build the RecordReader from the
+    // TaggedInputSplit.
+
+    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+    Schema schema = taggedInputSplit.getSchema();
+    AvroJob.setInputSchema(conf, schema);
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+       .newInstance(taggedInputSplit.getInputFormatClass(), conf);
+    return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
+       reporter);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java Tue Feb  4 23:34:02 2014
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link Mapper} that delegates behaviour of paths to multiple other
+ * mappers. Similar to {@link HadoopMapper}, but instantiates map classes
+ * in the map() call instead of during configure(), as we rely on the split
+ * object to provide us that information.
+ *
+ * @see {@link AvroMultipleInputs#addInputPath(JobConf, Path, Class, Schema)}
+ */
+class DelegatingMapper<IN,OUT,K,V,KO,VO> extends MapReduceBase
+implements Mapper<AvroWrapper<IN>,NullWritable,KO,VO>
+{
+  AvroMapper<IN, OUT> mapper;
+  JobConf conf;
+  boolean isMapOnly;
+  AvroCollector<OUT> out;
+
+  public void configure(JobConf conf) {
+    this.conf = conf;
+    this.isMapOnly = conf.getNumReduceTasks() == 0;
+  }
+
+  @Override
+  public void map(AvroWrapper<IN> wrapper, NullWritable value,
+      OutputCollector<KO, VO> collector, Reporter reporter)
+          throws IOException {
+    if (mapper == null) {
+      TaggedInputSplit is = (TaggedInputSplit) reporter.getInputSplit();
+      Class<? extends AvroMapper> mapperClass = is.getMapperClass();
+      mapper = (AvroMapper<IN,OUT>)
+          ReflectionUtils.newInstance(mapperClass, conf);
+    }
+    if (out == null)
+      out = new MapCollector<OUT,K,V,KO,VO>(collector, isMapOnly);
+    mapper.map(wrapper.datum(), out, reporter);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/DelegatingMapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java?rev=1564562&r1=1564561&r2=1564562&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java Tue Feb  4 23:34:02 2014
@@ -35,7 +35,7 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
   implements Mapper<AvroWrapper<IN>, NullWritable, KO, VO> {
     
   private AvroMapper<IN,OUT> mapper;
-  private MapCollector out;
+  private MapCollector<OUT,K,V,KO,VO> out;
   private boolean isMapOnly;
 
   @Override @SuppressWarnings("unchecked")
@@ -47,36 +47,12 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
     this.isMapOnly = conf.getNumReduceTasks() == 0;
   }
 
-  @SuppressWarnings("unchecked")
-  private class MapCollector extends AvroCollector<OUT> {
-    private final AvroWrapper<OUT> wrapper = new AvroWrapper<OUT>(null);
-    private final AvroKey<K> keyWrapper = new AvroKey(null);
-    private final AvroValue<V> valueWrapper = new AvroValue(null);
-    private OutputCollector<KO,VO> collector;
-
-    public MapCollector(OutputCollector<KO,VO> collector) {
-      this.collector = collector;
-    }
-
-    public void collect(OUT datum) throws IOException {
-      if (isMapOnly) {
-        wrapper.datum(datum);
-        collector.collect((KO)wrapper, (VO)NullWritable.get());
-      } else {                                    // split a pair
-        Pair<K,V> pair = (Pair<K,V>)datum;
-        keyWrapper.datum(pair.key());
-        valueWrapper.datum(pair.value());
-        collector.collect((KO)keyWrapper, (VO)valueWrapper);
-      }
-    }
-  }
-
   @Override
   public void map(AvroWrapper<IN> wrapper, NullWritable value, 
                   OutputCollector<KO,VO> collector, 
                   Reporter reporter) throws IOException {
     if (this.out == null)
-      this.out = new MapCollector(collector);
+      this.out = new MapCollector<OUT,K,V,KO,VO>(collector, isMapOnly);
     mapper.map(wrapper.datum(), out, reporter);
   }
 

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java Tue Feb  4 23:34:02 2014
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+
+@SuppressWarnings("unchecked")
+class MapCollector<OUT,K,V,KO,VO> extends AvroCollector<OUT> {
+  private final AvroWrapper<OUT> wrapper = new AvroWrapper<OUT>(null);
+  private final AvroKey<K> keyWrapper = new AvroKey<K>(null);
+  private final AvroValue<V> valueWrapper = new AvroValue<V>(null);
+  private OutputCollector<KO,VO> collector;
+  private boolean isMapOnly;
+
+  public MapCollector(OutputCollector<KO,VO> collector, boolean isMapOnly) {
+    this.collector = collector;
+    this.isMapOnly = isMapOnly;
+  }
+
+  public void collect(OUT datum) throws IOException {
+    if (isMapOnly) {
+      wrapper.datum(datum);
+      collector.collect((KO)wrapper, (VO)NullWritable.get());
+    } else {
+      // split a pair
+      Pair<K,V> pair = (Pair<K,V>)datum;
+      keyWrapper.datum(pair.key());
+      valueWrapper.datum(pair.value());
+      collector.collect((KO)keyWrapper, (VO)valueWrapper);
+    }
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/MapCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java Tue Feb  4 23:34:02 2014
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputSplit} that tags another InputSplit with extra data for use
+ * by {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
+ */
+class TaggedInputSplit implements Configurable, InputSplit {
+
+  private Class<? extends InputSplit> inputSplitClass;
+
+  private InputSplit inputSplit;
+
+  private Class<? extends InputFormat> inputFormatClass;
+
+  private Class<? extends AvroMapper> mapperClass;
+
+  private Schema schema;
+
+  private Schema.Parser schemaParser = new Schema.Parser();
+
+  private Configuration conf;
+
+  public TaggedInputSplit() {
+    // Default constructor.
+  }
+
+  /**
+   * Creates a new TaggedInputSplit.
+   *
+   * @param inputSplit The InputSplit to be tagged
+   * @param conf The configuration to use
+   * @param inputFormatClass The InputFormat class to use for this job
+   * @param mapperClass The Mapper class to use for this job
+   */
+  public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends AvroMapper> mapperClass,
+      Schema inputSchema) {
+    this.inputSplitClass = inputSplit.getClass();
+    this.inputSplit = inputSplit;
+    this.conf = conf;
+    this.inputFormatClass = inputFormatClass;
+    this.mapperClass = mapperClass;
+    this.schema = inputSchema;
+  }
+
+  /**
+   * Retrieves the original InputSplit.
+   *
+   * @return The InputSplit that was tagged
+   */
+  public InputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  /**
+   * Retrieves the InputFormat class to use for this split.
+   *
+   * @return The InputFormat class to use
+   */
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  /**
+   * Retrieves the Mapper class to use for this split.
+   *
+   * @return The Mapper class to use
+   */
+  public Class<? extends AvroMapper> getMapperClass() {
+    return mapperClass;
+  }
+
+  /**
+   * Retrieves the Schema to use for this split.
+   *
+   * @return The schema for record readers to use
+   */
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public long getLength() throws IOException {
+    return inputSplit.getLength();
+  }
+
+  public String[] getLocations() throws IOException {
+    return inputSplit.getLocations();
+  }
+
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+    inputSplit = (InputSplit) ReflectionUtils
+       .newInstance(inputSplitClass, conf);
+    inputSplit.readFields(in);
+    inputFormatClass = (Class<? extends InputFormat>) readClass(in);
+    mapperClass = (Class<? extends AvroMapper>) readClass(in);
+    String schemaString = Text.readString(in);
+    schema = schemaParser.parse(schemaString);
+  }
+
+  private Class<?> readClass(DataInput in) throws IOException {
+    String className = Text.readString(in);
+    try {
+      return conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("readObject can't find class", e);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, inputSplitClass.getName());
+    inputSplit.write(out);
+    Text.writeString(out, inputFormatClass.getName());
+    Text.writeString(out, mapperClass.getName());
+    Text.writeString(out, schema.toString());
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String toString() {
+    return inputSplit.toString();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/TaggedInputSplit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java?rev=1564562&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java Tue Feb  4 23:34:02 2014
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.BufferedInputStream;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestAvroMultipleInputs {
+
+  /** The input-1 record. */
+  public static class NamesRecord {
+    private int id = -1;
+    private CharSequence name = "";
+
+    public NamesRecord() {}
+
+    public NamesRecord(int id, CharSequence name) {
+      this.id = id;
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return id + "\t" + name;
+    }
+  }
+
+  /** The input-2 record. */
+  public static class BalancesRecord {
+    private int id = -1;
+    private long balance = 0L;
+
+    public BalancesRecord() {}
+
+    public BalancesRecord(int id, long balance) {
+      this.id = id;
+      this.balance = balance;
+    }
+
+    @Override
+    public String toString() {
+      return id + "\t" + balance;
+    }
+  }
+
+  /** The map output key record. */
+  public static class KeyRecord {
+    private int id = -1;
+
+    public KeyRecord() {}
+
+    public KeyRecord(int id) {
+      this.id = id;
+    }
+
+    @Override
+    public String toString() {
+      return ((Integer) id).toString();
+    }
+  }
+
+  /** The common map output value record.
+   *  Carries a tag specifying what source
+   *  record type was.
+   */
+  public static class JoinableRecord {
+    private int id = -1;
+    private CharSequence name = "";
+    private long balance = 0L;
+    private CharSequence recType = "";
+
+    public JoinableRecord() {}
+
+    public JoinableRecord(
+        CharSequence recType,
+        int id,
+        CharSequence name,
+        long balance) {
+      this.id = id;
+      this.recType = recType;
+      this.name = name;
+      this.balance = balance;
+    }
+
+    @Override
+    public String toString() {
+      return recType.toString();
+    }
+  }
+
+  /** The output, combined record. */
+  public static class CompleteRecord {
+    private int id = -1;
+    private CharSequence name = "";
+    private long balance = 0L;
+
+    public CompleteRecord() {}
+
+    public CompleteRecord(int id, CharSequence name, long balance) {
+      this.name = name;
+      this.id = id;
+      this.balance = balance;
+    }
+
+    void setId(int id) { this.id = id; };
+
+    void setName(CharSequence name) { this.name = name; };
+
+    void setBalance(long balance) { this.balance = balance; };
+
+    @Override
+    public String toString() {
+      return id + "\t" + name + "\t" + balance;
+    }
+  }
+
+  public static class NamesMapImpl
+    extends AvroMapper<NamesRecord, Pair<KeyRecord, JoinableRecord>> {
+
+    @Override
+    public void map(
+        NamesRecord nameRecord,
+        AvroCollector<Pair<KeyRecord, JoinableRecord>> collector,
+        Reporter reporter) throws IOException {
+      collector.collect(
+          new Pair<KeyRecord, JoinableRecord>(
+              new KeyRecord(nameRecord.id),
+              new JoinableRecord(nameRecord.getClass().getName(),
+                  nameRecord.id, nameRecord.name, -1L)));
+    }
+
+  }
+
+  public static class BalancesMapImpl
+    extends AvroMapper<BalancesRecord, Pair<KeyRecord, JoinableRecord>> {
+
+    @Override
+      public void map(
+          BalancesRecord balanceRecord,
+          AvroCollector<Pair<KeyRecord, JoinableRecord>> collector,
+          Reporter reporter) throws IOException {
+      collector.collect(
+          new Pair<KeyRecord, JoinableRecord>(
+              new KeyRecord(balanceRecord.id),
+              new JoinableRecord(balanceRecord.getClass().getName(),
+                  balanceRecord.id, "", balanceRecord.balance)));
+    }
+
+  }
+
+  public static class ReduceImpl
+    extends AvroReducer<KeyRecord, JoinableRecord, CompleteRecord> {
+
+    @Override
+    public void reduce(KeyRecord ID, Iterable<JoinableRecord> joinables,
+                       AvroCollector<CompleteRecord> collector,
+                       Reporter reporter) throws IOException {
+      CompleteRecord rec = new CompleteRecord();
+      for (JoinableRecord joinable : joinables) {
+        rec.setId(joinable.id);
+        if (joinable.recType.toString().contains("NamesRecord")) {
+          rec.setName(joinable.name);
+        } else {
+          rec.setBalance(joinable.balance);
+        }
+      }
+      collector.collect(rec);
+    }
+
+  }
+
+  @Test
+  public void testJob() throws Exception {
+    JobConf job = new JobConf();
+    String dir = System.getProperty("test.dir", ".") +
+        "target/testAvroMultipleInputs";
+    Path inputPath1 = new Path(dir + "/in1");
+    Path inputPath2 = new Path(dir + "/in2");
+    Path outputPath = new Path(dir + "/out");
+
+    outputPath.getFileSystem(job).delete(outputPath, true);
+    inputPath1.getFileSystem(job).delete(inputPath1, true);
+    inputPath2.getFileSystem(job).delete(inputPath2, true);
+
+    writeNamesFiles(new File(inputPath1.toUri().getPath()));
+    writeBalancesFiles(new File(inputPath2.toUri().getPath()));
+
+    job.setJobName("multiple-inputs-join");
+    AvroMultipleInputs.addInputPath(job, inputPath1, NamesMapImpl.class,
+        ReflectData.get().getSchema(NamesRecord.class));
+    AvroMultipleInputs.addInputPath(job, inputPath2, BalancesMapImpl.class,
+        ReflectData.get().getSchema(BalancesRecord.class));
+
+    Schema keySchema = ReflectData.get().getSchema(KeyRecord.class);
+    Schema valueSchema = ReflectData.get().getSchema(JoinableRecord.class);
+    AvroJob.setMapOutputSchema
+      (job, Pair.getPairSchema(keySchema, valueSchema));
+    AvroJob.setOutputSchema(job,
+        ReflectData.get().getSchema(CompleteRecord.class));
+
+    AvroJob.setReducerClass(job, ReduceImpl.class);
+    job.setNumReduceTasks(1);
+
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    AvroJob.setReflect(job);
+
+    JobClient.runJob(job);
+
+    validateCompleteFile(new File(new File(dir, "out"), "part-00000.avro"));
+  }
+
+  /**
+   * Writes a "names.avro" file with five sequential <id, name> pairs.
+   */
+  private void writeNamesFiles(File dir) throws IOException {
+    DatumWriter<NamesRecord> writer = new ReflectDatumWriter<NamesRecord>();
+    DataFileWriter<NamesRecord> out = new DataFileWriter<NamesRecord>(writer);
+    File namesFile = new File(dir+"/names.avro");
+    dir.mkdirs();
+    out.create(ReflectData.get().getSchema(NamesRecord.class), namesFile);
+    for (int i=0; i < 5; i++)
+      out.append(new NamesRecord(i, "record"+i));
+    out.close();
+  }
+
+  /**
+   * Writes a "balances.avro" file with five sequential <id, balance> pairs.
+   */
+  private void writeBalancesFiles(File dir) throws IOException {
+    DatumWriter<BalancesRecord> writer =
+        new ReflectDatumWriter<BalancesRecord>();
+    DataFileWriter<BalancesRecord> out =
+        new DataFileWriter<BalancesRecord>(writer);
+    File namesFile = new File(dir+"/balances.avro");
+    dir.mkdirs();
+    out.create(ReflectData.get().getSchema(BalancesRecord.class), namesFile);
+    for (int i=0; i < 5; i++)
+      out.append(new BalancesRecord(i, (long) i+100));
+    out.close();
+  }
+
+  private void validateCompleteFile(File file) throws Exception {
+    DatumReader<CompleteRecord> reader =
+        new ReflectDatumReader<CompleteRecord>();
+    InputStream in = new BufferedInputStream(new FileInputStream(file));
+    DataFileStream<CompleteRecord> records =
+        new DataFileStream<CompleteRecord>(in,reader);
+    int numRecs = 0;
+    for (CompleteRecord rec : records) {
+      assertEquals(rec.id, numRecs);
+      assertEquals(rec.balance-100, rec.id);
+      assertEquals(rec.name, "record"+rec.id);
+      numRecs++;
+    }
+    records.close();
+    assertEquals(5, numRecs);
+  }
+
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleInputs.java
------------------------------------------------------------------------------
    svn:eol-style = native