You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/19 18:15:23 UTC

svn commit: r1447823 - in /avro/trunk: ./ lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/ lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/ lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/

Author: cutting
Date: Tue Feb 19 17:15:22 2013
New Revision: 1447823

URL: http://svn.apache.org/r1447823
Log:
AVRO-1254. Java: Add support for new mapreduce APIs to Trevni.  Contributed by Ted Malaska.

Added:
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/
    avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java   (with props)
    avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java
    avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1447823&r1=1447822&r2=1447823&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Feb 19 17:15:22 2013
@@ -18,6 +18,9 @@ Trunk (not yet released)
     AVRO-1253. Java: Add support for bzip2 file compression to Trevni.
     (Ted Malaska via cutting)
 
+    AVRO-1254. Java: Add support for new mapreduce APIs to Trevni.
+    (Ted Malaska via cutting)
+
   IMPROVEMENTS
 
     AVRO-1211. Add MR guide to documentation. (Skye Wanderman-Milne via

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.InputFormat} for Trevni files.
+ * 
+ * This implement was modeled off
+ * {@link org.apache.avro.mapreduce.AvroKeyInputFormat} to allow for easy
+ * transition
+ * 
+ * A MapReduce InputFormat that can handle Trevni container files.
+ *
+ * <p>Keys are AvroKey wrapper objects that contain the Trevni data.  Since Trevni
+ * container files store only records (not key/value pairs), the value from
+ * this InputFormat is a NullWritable.</p>
+ * 
+ * <p>
+ * A subset schema to be read may be specified with
+ * {@link AvroJob#setInputKeySchema(Schema)}.
+ */
+public class AvroTrevniKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
+  
+  @Override
+  public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    
+    return new AvroTrevniKeyRecordReader<T>();
+  }
+
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes Avro data to
+ * Trevni files.
+ *
+ * This implement was modeled off 
+ * {@link org.apache.avro.mapreduce.AvroKeyOutputFormat} to allow for easy
+ * transition
+ * 
+ * FileOutputFormat for writing Trevni container files.
+ *
+ * <p>Since Trevni container files only contain records (not key/value pairs), this output
+ * format ignores the value.</p>
+ *
+ * @param <T> The (java) type of the Trevni data to write.
+ * 
+ * <p>Writes a directory of files per task, each comprising a single filesystem
+ * block.  To reduce the number of files, increase the default filesystem block
+ * size for the job.  Each task also requires enough memory to buffer a
+ * filesystem block.
+ */
+public class AvroTrevniKeyOutputFormat <T> extends FileOutputFormat<AvroKey<T>, NullWritable> {
+
+  @Override
+  public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    
+    return new AvroTrevniKeyRecordWriter<T>(context );
+  }
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * Reads records from an input split representing a chunk of an Trenvi container file.
+ *
+ * @param <T> The (java) type of data in Trevni container file.
+ */
+public class AvroTrevniKeyRecordReader<T> extends AvroTrevniRecordReaderBase<AvroKey<T>, NullWritable, T> {
+  
+  /** A reusable object to hold records of the Avro container file. */
+  private final AvroKey<T> mCurrentKey = new AvroKey<T>();
+  
+  /** {@inheritDoc} */
+  @Override
+  public AvroKey<T> getCurrentKey() throws IOException,
+      InterruptedException {
+    return mCurrentKey;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public NullWritable getCurrentValue() throws IOException,
+      InterruptedException {
+    return NullWritable.get();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    boolean hasNext = super.nextKeyValue();
+    mCurrentKey.datum(getCurrentRecord());
+    return hasNext;
+  }
+
+}
+

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes Trevni records to an Trevni container file output stream.
+ *
+ * @param <T> The Java type of the Trevni data to write.
+ */
+public class AvroTrevniKeyRecordWriter<T> extends AvroTrevniRecordWriterBase<AvroKey<T>, NullWritable, T> {
+
+  /**
+   * Constructor.
+   * @param context The TaskAttempContext to supply the writer with information form the job configuration
+   */
+  public AvroTrevniKeyRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    super(context);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(AvroKey<T> key, NullWritable value) throws IOException,
+      InterruptedException {
+    writer.write(key.datum());
+    if (writer.sizeEstimate() >= blockSize) // block full
+      flush();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected Schema initSchema(TaskAttemptContext context) {
+    boolean isMapOnly = context.getNumReduceTasks() == 0;
+    return isMapOnly ? AvroJob.getMapOutputKeySchema(context
+        .getConfiguration()) : AvroJob.getOutputKeySchema(context
+        .getConfiguration());
+  }
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.InputFormat} for Trevni files.
+ * 
+ * This implement was modeled off
+ * {@link org.apache.avro.mapreduce.AvroKeyValueInputFormat} to allow for easy
+ * transition
+ * 
+ * <p>
+ * A MapReduce InputFormat that reads from Trevni container files of key/value generic records.
+ *
+ * <p>
+ * Trevni container files that container generic records with the two fields 'key' and
+ * 'value' are expected.  The contents of the 'key' field will be used as the job input
+ * key, and the contents of the 'value' field will be used as the job output value.</p>
+ *
+ * @param <K> The type of the Trevni key to read.
+ * @param <V> The type of the Trevni value to read.
+ * 
+ * <p>
+ * A subset schema to be read may be specified with
+ * {@link AvroJob#setInputKeySchema(Schema)} and
+ * {@link AvroJob#setInputValueSchema(Schema)}..
+ */
+public class AvroTrevniKeyValueInputFormat<K, V>  extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
+
+  /** {@inheritDoc} */
+  @Override
+  public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    
+    return new AvroTrevniKeyValueRecordReader<K, V>();
+  }
+
+
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes Avro data to
+ * Trevni files.
+ * 
+ * This implement was modeled off 
+ * {@link org.apache.avro.mapreduce.AvroKeyValueOutputFormat} to allow for easy
+ * transition 
+ * 
+ *  * FileOutputFormat for writing Trevni container files of key/value pairs.
+ *
+ * <p>Since Trevni container files can only contain records (not key/value pairs), this
+ * output format puts the key and value into an Avro generic record with two fields, named
+ * 'key' and 'value'.</p>
+ *
+ * <p>The keys and values given to this output format may be Avro objects wrapped in
+ * <code>AvroKey</code> or <code>AvroValue</code> objects.  The basic Writable types are
+ * also supported (e.g., IntWritable, Text); they will be converted to their corresponding
+ * Avro types.</p>
+ *
+ * @param <K> The type of key. If an Avro type, it must be wrapped in an <code>AvroKey</code>.
+ * @param <V> The type of value. If an Avro type, it must be wrapped in an <code>AvroValue</code>.
+ * 
+ * <p>Writes a directory of files per task, each comprising a single filesystem
+ * block.  To reduce the number of files, increase the default filesystem block
+ * size for the job.  Each task also requires enough memory to buffer a
+ * filesystem block.
+ */
+public class AvroTrevniKeyValueOutputFormat <K, V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> { 
+  
+  /** {@inheritDoc} */
+  @Override
+  public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    
+    return new AvroTrevniKeyValueRecordWriter<K, V>(context );
+  }
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+
+/**
+ * Reads Trevni generic records from an Trevni container file, where the records contain two
+ * fields: 'key' and 'value'.
+ *
+ * <p>The contents of the 'key' field will be parsed into an AvroKey object. The contents
+ * of the 'value' field will be parsed into an AvroValue object.</p>
+ *
+ * @param <K> The type of the Avro key to read.
+ * @param <V> The type of the Avro value to read.
+ */
+public class AvroTrevniKeyValueRecordReader<K, V> extends AvroTrevniRecordReaderBase<AvroKey<K>, AvroValue<V>, GenericRecord> {
+
+  /** The current key the reader is on. */
+  private final AvroKey<K> mCurrentKey = new AvroKey<K>();
+  /** The current value the reader is on. */
+  private final AvroValue<V> mCurrentValue = new AvroValue<V>();
+  
+  /** {@inheritDoc} */
+  @Override
+  public AvroKey<K> getCurrentKey() throws IOException,
+      InterruptedException {
+    return mCurrentKey;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public AvroValue<V> getCurrentValue() throws IOException,
+      InterruptedException {
+    return mCurrentValue;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    boolean hasNext = super.nextKeyValue();
+    AvroKeyValue<K, V> avroKeyValue = new AvroKeyValue<K, V>(getCurrentRecord());
+    mCurrentKey.datum(avroKeyValue.getKey());
+    mCurrentValue.datum(avroKeyValue.getValue());
+    return hasNext;
+  }
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes key/value pairs to an Trevni container file.
+ *
+ * <p>Each entry in the Trevni container file will be a generic record with two fields,
+ * named 'key' and 'value'.  The input types may be basic Writable objects like Text or
+ * IntWritable, or they may be AvroWrapper subclasses (AvroKey or AvroValue).  Writable
+ * objects will be converted to their corresponding Avro types when written to the generic
+ * record key/value pair.</p>
+ *
+ * @param <K> The type of key to write.
+ * @param <V> The type of value to write.
+ */
+public class AvroTrevniKeyValueRecordWriter <K, V> extends AvroTrevniRecordWriterBase<AvroKey<K>, AvroValue<V>, GenericRecord> {
+
+  /** The writer schema for the generic record entries of the Trevni container file. */
+  Schema mKeyValuePairSchema;
+  
+  /** A reusable Avro generic record for writing key/value pairs to the file. */
+  AvroKeyValue<Object, Object> keyValueRecord;
+  
+  /** A helper object that converts the input key to an Avro datum. */
+  AvroDatumConverter<K, ?> keyConverter;
+  
+  /** A helper object that converts the input value to an Avro datum. */
+  AvroDatumConverter<V, ?> valueConverter;
+    
+  /**
+   * Constructor.
+   * @param context The TaskAttempContext to supply the writer with information form the job configuration
+   */
+  public AvroTrevniKeyValueRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    super(context);
+    
+    mKeyValuePairSchema = initSchema(context);
+    keyValueRecord  = new AvroKeyValue<Object, Object>(new GenericData.Record(mKeyValuePairSchema));
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public void write(AvroKey<K> key, AvroValue<V> value) throws IOException,
+      InterruptedException {
+    
+    keyValueRecord.setKey(key.datum());
+    keyValueRecord.setValue(value.datum());
+    writer.write(keyValueRecord.get());
+    if (writer.sizeEstimate() >= blockSize) // block full
+      flush();
+  }
+  
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+  @Override
+  protected Schema initSchema(TaskAttemptContext context) {
+    AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(
+        context.getConfiguration());
+    
+    keyConverter = converterFactory.create((Class<K>) context
+        .getOutputKeyClass());
+    valueConverter = converterFactory.create((Class<V>) context
+        .getOutputValueClass());
+
+    // Create the generic record schema for the key/value pair.
+    return AvroKeyValue.getSchema(
+        keyConverter.getWriterSchema(), valueConverter.getWriterSchema());
+    
+  }
+  
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.trevni.avro.AvroColumnReader;
+import org.apache.trevni.avro.HadoopInput;
+
+/**
+ * Abstract base class for <code>RecordReader</code>s that read Trevni container files.
+ *
+ * @param <K> The type of key the record reader should generate.
+ * @param <V> The type of value the record reader should generate.
+ * @param <T> The type of the entries within the Trevni container file being read.
+ */
+public abstract class AvroTrevniRecordReaderBase<K, V, T> extends RecordReader<K, V> {
+  
+  /** The Trevni file reader */
+  private AvroColumnReader<T> reader;
+  
+  /** Number of rows in the Trevni file */
+  private float rows;
+  
+  /** The current row number being read in */
+  private long row;
+  
+  /** A reusable object to hold records of the Avro container file. */
+  private T mCurrentRecord;
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    final FileSplit file = (FileSplit)inputSplit;
+    context.setStatus(file.toString());
+
+    final AvroColumnReader.Params params =
+      new AvroColumnReader.Params(new HadoopInput(file.getPath(), context.getConfiguration()));
+    params.setModel(ReflectData.get());
+    
+    if (AvroJob.getInputKeySchema(context.getConfiguration()) != null) {
+      params.setSchema(AvroJob.getInputKeySchema(context.getConfiguration()));
+    }
+    
+    reader = new AvroColumnReader<T>(params);
+    rows = reader.getRowCount();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!reader.hasNext())
+      return false;
+    mCurrentRecord = reader.next();
+    row++;
+    return true;
+  }
+  
+  /**
+   * Gets the current record read from the Trevni container file.
+   *
+   * <p>Calling <code>nextKeyValue()</code> moves this to the next record.</p>
+   *
+   * @return The current Trevni record (may be null if no record has been read).
+   */
+  protected T getCurrentRecord() {
+    return mCurrentRecord;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close() throws IOException {
+    reader.close(); 
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return row / rows;
+  }
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.trevni.ColumnFileMetaData;
+import org.apache.trevni.MetaData;
+import org.apache.trevni.avro.AvroColumnWriter;
+
+/**
+ * Abstract base class for <code>RecordWriter</code>s that writes Trevni container files.
+ *
+ * @param <K> The type of key the record writer should generate.
+ * @param <V> The type of value the record wrtier should generate.
+ * @param <T> The type of the entries within the Trevni container file being writen.
+ */
+public abstract class AvroTrevniRecordWriterBase<K,V, T> extends RecordWriter<K, V> {
+  
+  /** trevni file extension */
+  public final static String EXT = ".trv";
+  
+  /** prefix of job configs that we care about */
+  public static final String META_PREFIX = "trevni.meta.";
+  
+  /** Counter that increments as new trevni files are create because the current file 
+   * has exceeded the block size 
+   * */
+  protected int part = 0;
+
+  /** Trevni file writer */
+  protected AvroColumnWriter<T> writer;
+
+  /** This will be a unique directory linked to the task */
+  final Path dirPath;
+  
+  /** HDFS object */
+  final FileSystem fs;
+
+  /** Current configured blocksize */
+  final long blockSize;
+  
+  /** Provided avro schema from the context */
+  protected Schema schema;
+  
+  /** meta data to be stored in the output file.  */
+  protected ColumnFileMetaData meta;
+  
+  /**
+   * Constructor.
+   * @param context The TaskAttempContext to supply the writer with information form the job configuration
+   */
+  public AvroTrevniRecordWriterBase(TaskAttemptContext context) throws IOException {
+    
+    schema = initSchema(context);
+    meta = filterMetadata(context.getConfiguration());
+    writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
+
+    Path outputPath = FileOutputFormat.getOutputPath(context);
+    
+    String dir = FileOutputFormat.getUniqueFile(context, "part", "");
+    dirPath = new Path(outputPath.toString() + "/" + dir);
+    fs = dirPath.getFileSystem(context.getConfiguration());
+    fs.mkdirs(dirPath);
+
+    blockSize = fs.getDefaultBlockSize();
+  }
+
+  /**
+   * Use the task context to construct a schema for writing
+   * @throws IOException
+   */
+  abstract protected  Schema initSchema(TaskAttemptContext context); 
+  
+  /**
+   * A Trevni flush will close the current file and prep a new writer
+   * @throws IOException
+   */
+  public void flush() throws IOException {
+    OutputStream out = fs.create(new Path(dirPath, "part-" + (part++) + EXT));
+    try {
+      writer.writeTo(out);
+    } finally {
+      out.close();
+    }
+    writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public void close(TaskAttemptContext arg0) throws IOException,
+      InterruptedException {
+    flush();
+  }
+  
+  static ColumnFileMetaData filterMetadata(final Configuration configuration) {
+    final ColumnFileMetaData meta = new ColumnFileMetaData();
+    Iterator<Entry<String, String>> keyIterator = configuration.iterator();
+
+    while (keyIterator.hasNext()) {
+      Entry<String, String> confEntry = keyIterator.next();
+      if (confEntry.getKey().startsWith(META_PREFIX))
+        meta.put(confEntry.getKey().substring(META_PREFIX.length()), confEntry
+            .getValue().getBytes(MetaData.UTF8));
+    }
+
+    return meta;
+  }
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java?rev=1447823&r1=1447822&r2=1447823&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java Tue Feb 19 17:15:22 2013
@@ -50,7 +50,6 @@ import org.apache.avro.Schema;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
-import static org.apache.trevni.avro.WordCountUtil.DIR;
 
 public class TestWordCount {
 
@@ -82,14 +81,16 @@ public class TestWordCount {
     testInputFormat();
   }
 
-  private static final Schema STRING = Schema.create(Schema.Type.STRING);
+  static final Schema STRING = Schema.create(Schema.Type.STRING);
   static { GenericData.setStringType(STRING, GenericData.StringType.String); }
-  private static final Schema LONG = Schema.create(Schema.Type.LONG);
+  static final Schema LONG = Schema.create(Schema.Type.LONG);
 
   public void testOutputFormat() throws Exception {
     JobConf job = new JobConf();
     
-    WordCountUtil.writeLinesFile();
+    WordCountUtil wordCountUtil = new WordCountUtil("trevniMapredTest");
+    
+    wordCountUtil.writeLinesFile();
     
     AvroJob.setInputSchema(job, STRING);
     AvroJob.setOutputSchema(job, Pair.getPairSchema(STRING,LONG));
@@ -98,15 +99,15 @@ public class TestWordCount {
     AvroJob.setCombinerClass(job, ReduceImpl.class);
     AvroJob.setReducerClass(job, ReduceImpl.class);
     
-    FileInputFormat.setInputPaths(job, new Path(DIR + "/in"));
-    FileOutputFormat.setOutputPath(job, new Path(DIR + "/out"));
+    FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/in"));
+    FileOutputFormat.setOutputPath(job, new Path(wordCountUtil.getDir().toString() + "/out"));
     FileOutputFormat.setCompressOutput(job, true);
     
     job.setOutputFormat(AvroTrevniOutputFormat.class);
 
     JobClient.runJob(job);
     
-    WordCountUtil.validateCountsFile();
+    wordCountUtil.validateCountsFile();
   }
 
   private static long total;
@@ -121,6 +122,9 @@ public class TestWordCount {
   public void testInputFormat() throws Exception {
     JobConf job = new JobConf();
 
+    WordCountUtil wordCountUtil = new WordCountUtil("trevniMapredTest");
+    
+    
     Schema subSchema = Schema.parse("{\"type\":\"record\"," +
                                     "\"name\":\"PairValue\","+
                                     "\"fields\": [ " + 
@@ -128,7 +132,7 @@ public class TestWordCount {
                                     "]}");
     AvroJob.setInputSchema(job, subSchema);
     AvroJob.setMapperClass(job, Counter.class);        
-    FileInputFormat.setInputPaths(job, new Path(DIR + "/out/*"));
+    FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/out/*"));
     job.setInputFormat(AvroTrevniInputFormat.class);
 
     job.setNumReduceTasks(0);                     // map-only

Modified: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java?rev=1447823&r1=1447822&r2=1447823&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java Tue Feb 19 17:15:22 2013
@@ -41,9 +41,11 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JobConf;
 
 import org.apache.avro.Schema;
+import org.apache.avro.hadoop.io.AvroKeyValue;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.DataFileStream;
@@ -51,12 +53,20 @@ import org.apache.avro.mapred.Pair;
 
 public class WordCountUtil {
 
-  public static final File DIR = new File("target", "wc");
-  public static final File LINES_FILE
-    = new File(new File(DIR, "in"), "lines.avro");
-  static final File COUNTS_FILE
-    = new File(new File(DIR, "out"), "part-00000/part-0.trv");
+  public File dir;
+  public File linesFiles;
+  public File countFiles;
 
+  public WordCountUtil (String testName) {
+    this(testName, "part-00000");
+  }
+  
+  public WordCountUtil (String testName, String partDirName) {
+    dir = new File("target/wc", testName);
+    linesFiles = new File(new File(dir, "in"), "lines.avro");
+    countFiles = new File(new File(dir, "out"), partDirName + "/part-0.trv");
+  }
+  
   public static final String[] LINES = new String[] {
     "the quick brown fox jumps over the lazy dog",
     "the cow jumps over the moon",
@@ -80,21 +90,25 @@ public class WordCountUtil {
     TOTAL = total;
   }
 
-  public static void writeLinesFile() throws IOException {
-    FileUtil.fullyDelete(DIR);
+  public File getDir() {
+    return dir;
+  }
+  
+  public void writeLinesFile() throws IOException {
+    FileUtil.fullyDelete(dir);
     DatumWriter<String> writer = new GenericDatumWriter<String>();
     DataFileWriter<String> out = new DataFileWriter<String>(writer);
-    LINES_FILE.getParentFile().mkdirs();
-    out.create(Schema.create(Schema.Type.STRING), LINES_FILE);
+    linesFiles.getParentFile().mkdirs();
+    out.create(Schema.create(Schema.Type.STRING), linesFiles);
     for (String line : LINES)
       out.append(line);
     out.close();
   }
 
-  public static void validateCountsFile() throws Exception {
+  public void validateCountsFile() throws Exception {
     AvroColumnReader<Pair<String,Long>> reader =
       new AvroColumnReader<Pair<String,Long>>
-      (new AvroColumnReader.Params(COUNTS_FILE).setModel(SpecificData.get()));
+      (new AvroColumnReader.Params(countFiles).setModel(SpecificData.get()));
     int numWords = 0;
     for (Pair<String,Long> wc : reader) {
       assertEquals(wc.key(), COUNTS.get(wc.key()), wc.value());
@@ -103,5 +117,19 @@ public class WordCountUtil {
     reader.close();
     assertEquals(COUNTS.size(), numWords);
   }
+  
+  public void validateCountsFileGenericRecord() throws Exception {
+    AvroColumnReader<GenericRecord > reader =
+      new AvroColumnReader<GenericRecord >
+      (new AvroColumnReader.Params(countFiles).setModel(SpecificData.get()));
+    int numWords = 0;
+    for (GenericRecord  wc : reader) {
+      assertEquals((String)wc.get("key"), COUNTS.get(wc.get("key")), (Long)wc.get("value"));
+      //assertEquals(wc.getKey(), COUNTS.get(wc.getKey()), wc.getValue());
+      numWords++;
+    }
+    reader.close();
+    assertEquals(COUNTS.size(), numWords);
+  }
 
 }

Added: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.trevni.avro.WordCountUtil;
+import org.junit.Test;
+
+public class TestKeyValueWordCount {
+  
+  private static long total = 0;
+
+  static final Schema STRING = Schema.create(Schema.Type.STRING);
+  static { GenericData.setStringType(STRING, GenericData.StringType.String); }
+  static final Schema LONG = Schema.create(Schema.Type.LONG);
+  
+  private static class WordCountMapper extends
+      Mapper<AvroKey<String>, NullWritable, Text, LongWritable> {
+    private LongWritable mCount = new LongWritable();
+    private Text mText = new Text();
+
+    @Override
+    protected void setup(Context context) {
+      mCount.set(1);
+    }
+
+    @Override
+    protected void map(AvroKey<String> key, NullWritable value, Context context)
+        throws IOException, InterruptedException {
+
+      try {
+        StringTokenizer tokens = new StringTokenizer(key.datum());
+        while (tokens.hasMoreTokens()) {
+          mText.set(tokens.nextToken());
+          context.write(mText, mCount);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(key + " " + key.datum(), e);
+      }
+
+    }
+  }
+  
+  private static class WordCountReducer extends Reducer< Text, LongWritable, AvroKey<String>, AvroValue<Long>> {
+    
+    AvroKey<String> resultKey = new AvroKey<String>();
+    AvroValue<Long> resultValue = new AvroValue<Long>();
+    
+    @Override
+    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+      long sum = 0;
+      for (LongWritable value: values) {
+        sum += value.get();
+      }
+      resultKey.datum(key.toString());
+      resultValue.datum(sum);
+      
+      context.write(resultKey, resultValue);
+    }
+  }
+   
+  public static class Counter extends
+  Mapper<AvroKey<String>, AvroValue<Long>, NullWritable, NullWritable> {
+    @Override
+    protected void map(AvroKey<String> key, AvroValue<Long> value, Context context)
+        throws IOException, InterruptedException {
+      total += value.datum();
+    }
+  }  
+  
+  @Test
+  public void testKeyOutputFormat() throws Exception {
+    Job job = new Job();
+    
+    WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyValueTest", "part-r-00000");
+     
+    wordCountUtil.writeLinesFile();
+    
+    AvroJob.setInputKeySchema(job, STRING);
+    AvroJob.setOutputKeySchema(job, STRING);
+    AvroJob.setOutputValueSchema(job, LONG);
+    
+    job.setMapperClass(WordCountMapper.class);
+    job.setReducerClass(WordCountReducer.class);
+    
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
+    
+    FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/in"));
+    FileOutputFormat.setOutputPath(job, new Path(wordCountUtil.getDir().toString() + "/out"));
+    FileOutputFormat.setCompressOutput(job, true);
+    
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    job.setOutputFormatClass(AvroTrevniKeyValueOutputFormat.class);
+
+    job.waitForCompletion(true);
+    
+    wordCountUtil.validateCountsFileGenericRecord();
+  }
+  
+  @Test
+  public void testInputFormat() throws Exception {
+    Job job = new Job();
+    
+    WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyValueTest");
+    
+    job.setMapperClass(Counter.class);
+    
+    FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/out/*"));
+    job.setInputFormatClass(AvroTrevniKeyValueInputFormat.class);
+    
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    
+    total = 0;
+    job.waitForCompletion(true);
+    assertEquals(WordCountUtil.TOTAL, total);
+    
+  }
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.Pair;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.trevni.avro.WordCountUtil;
+import org.apache.trevni.avro.mapreduce.AvroTrevniKeyOutputFormat;
+import org.junit.Test;
+
+public class TestKeyWordCount {
+  
+  private static long total = 0;
+
+  static final Schema STRING = Schema.create(Schema.Type.STRING);
+  static { GenericData.setStringType(STRING, GenericData.StringType.String); }
+  static final Schema LONG = Schema.create(Schema.Type.LONG);
+  
+  
+  private static class WordCountMapper extends
+      Mapper<AvroKey<String>, NullWritable, Text, LongWritable> {
+    private LongWritable mCount = new LongWritable();
+    private Text mText = new Text();
+
+    @Override
+    protected void setup(Context context) {
+      mCount.set(1);
+    }
+
+    @Override
+    protected void map(AvroKey<String> key, NullWritable value, Context context)
+        throws IOException, InterruptedException {
+
+      try {
+        StringTokenizer tokens = new StringTokenizer(key.datum());
+        while (tokens.hasMoreTokens()) {
+          mText.set(tokens.nextToken());
+          context.write(mText, mCount);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(key + " "  + key.datum() , e);
+      }
+
+    }
+  }
+  
+  private static class WordCountReducer extends Reducer< Text, LongWritable, AvroKey<GenericData.Record>, NullWritable> {
+    
+    private AvroKey<GenericData.Record> result ;
+    
+    @Override
+    protected void setup(Context context) {
+      result = new AvroKey<GenericData.Record>();
+      result.datum(new Record(Pair.getPairSchema(STRING,LONG)));
+    }
+    
+    @Override
+    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+      long count = 0;
+      for (LongWritable value: values) {
+        count += value.get();
+      }
+      
+      result.datum().put("key", key.toString());
+      result.datum().put("value", count);
+      
+      context.write(result, NullWritable.get());
+    }
+  }
+   
+
+  
+  public static class Counter extends
+  Mapper<AvroKey<GenericData.Record>, NullWritable, NullWritable, NullWritable> {
+    @Override
+    protected void map(AvroKey<GenericData.Record> key, NullWritable value, Context context)
+        throws IOException, InterruptedException {
+      total += (Long)key.datum().get("value");
+    }
+  }
+  
+  
+  @Test
+  public void testKeyOutputFormat() throws Exception {
+    Job job = new Job();
+    
+    WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyTest", "part-r-00000");
+    
+    wordCountUtil.writeLinesFile();
+    
+    AvroJob.setInputKeySchema(job, STRING);
+    AvroJob.setOutputKeySchema(job, Pair.getPairSchema(STRING,LONG));
+    
+    job.setMapperClass(WordCountMapper.class);
+    job.setReducerClass(WordCountReducer.class);
+    
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
+    
+    FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/in"));
+    FileOutputFormat.setOutputPath(job, new Path(wordCountUtil.getDir().toString() + "/out"));
+    FileOutputFormat.setCompressOutput(job, true);
+    
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    job.setOutputFormatClass(AvroTrevniKeyOutputFormat.class);
+
+    job.waitForCompletion(true);
+    
+    wordCountUtil.validateCountsFile();
+  }
+  
+  @Test
+  public void testInputFormat() throws Exception {
+    Job job = new Job();
+    
+    WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyTest");
+    
+    job.setMapperClass(Counter.class);
+
+    Schema subSchema = Schema.parse("{\"type\":\"record\"," +
+                                    "\"name\":\"PairValue\","+
+                                    "\"fields\": [ " + 
+                                    "{\"name\":\"value\", \"type\":\"long\"}" + 
+                                    "]}");
+    AvroJob.setInputKeySchema(job, subSchema);
+    
+    FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/out/*"));
+    job.setInputFormatClass(AvroTrevniKeyInputFormat.class);
+    
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    
+    total = 0;
+    job.waitForCompletion(true);
+    assertEquals(WordCountUtil.TOTAL, total);
+    
+  }
+  
+}

Propchange: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java
------------------------------------------------------------------------------
    svn:eol-style = native