You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/19 18:15:23 UTC
svn commit: r1447823 - in /avro/trunk: ./
lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/
lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/
lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/
Author: cutting
Date: Tue Feb 19 17:15:22 2013
New Revision: 1447823
URL: http://svn.apache.org/r1447823
Log:
AVRO-1254. Java: Add support for new mapreduce APIs to Trevni. Contributed by Ted Malaska.
Added:
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java (with props)
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java (with props)
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java (with props)
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java (with props)
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java (with props)
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java (with props)
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java (with props)
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java (with props)
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java (with props)
avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java (with props)
avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/
avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java (with props)
avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java (with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java
avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1447823&r1=1447822&r2=1447823&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Feb 19 17:15:22 2013
@@ -18,6 +18,9 @@ Trunk (not yet released)
AVRO-1253. Java: Add support for bzip2 file compression to Trevni.
(Ted Malaska via cutting)
+ AVRO-1254. Java: Add support for new mapreduce APIs to Trevni.
+ (Ted Malaska via cutting)
+
IMPROVEMENTS
AVRO-1211. Add MR guide to documentation. (Skye Wanderman-Milne via
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.InputFormat} for Trevni files.
+ *
+ * This implement was modeled off
+ * {@link org.apache.avro.mapreduce.AvroKeyInputFormat} to allow for easy
+ * transition
+ *
+ * A MapReduce InputFormat that can handle Trevni container files.
+ *
+ * <p>Keys are AvroKey wrapper objects that contain the Trevni data. Since Trevni
+ * container files store only records (not key/value pairs), the value from
+ * this InputFormat is a NullWritable.</p>
+ *
+ * <p>
+ * A subset schema to be read may be specified with
+ * {@link AvroJob#setInputKeySchema(Schema)}.
+ */
+public class AvroTrevniKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
+
+ @Override
+ public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+
+ return new AvroTrevniKeyRecordReader<T>();
+ }
+
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes Avro data to
+ * Trevni files.
+ *
+ * This implement was modeled off
+ * {@link org.apache.avro.mapreduce.AvroKeyOutputFormat} to allow for easy
+ * transition
+ *
+ * FileOutputFormat for writing Trevni container files.
+ *
+ * <p>Since Trevni container files only contain records (not key/value pairs), this output
+ * format ignores the value.</p>
+ *
+ * @param <T> The (java) type of the Trevni data to write.
+ *
+ * <p>Writes a directory of files per task, each comprising a single filesystem
+ * block. To reduce the number of files, increase the default filesystem block
+ * size for the job. Each task also requires enough memory to buffer a
+ * filesystem block.
+ */
+public class AvroTrevniKeyOutputFormat <T> extends FileOutputFormat<AvroKey<T>, NullWritable> {
+
+ @Override
+ public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ return new AvroTrevniKeyRecordWriter<T>(context );
+ }
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyOutputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * Reads records from an input split representing a chunk of an Trenvi container file.
+ *
+ * @param <T> The (java) type of data in Trevni container file.
+ */
+public class AvroTrevniKeyRecordReader<T> extends AvroTrevniRecordReaderBase<AvroKey<T>, NullWritable, T> {
+
+ /** A reusable object to hold records of the Avro container file. */
+ private final AvroKey<T> mCurrentKey = new AvroKey<T>();
+
+ /** {@inheritDoc} */
+ @Override
+ public AvroKey<T> getCurrentKey() throws IOException,
+ InterruptedException {
+ return mCurrentKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public NullWritable getCurrentValue() throws IOException,
+ InterruptedException {
+ return NullWritable.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ boolean hasNext = super.nextKeyValue();
+ mCurrentKey.datum(getCurrentRecord());
+ return hasNext;
+ }
+
+}
+
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes Trevni records to an Trevni container file output stream.
+ *
+ * @param <T> The Java type of the Trevni data to write.
+ */
+public class AvroTrevniKeyRecordWriter<T> extends AvroTrevniRecordWriterBase<AvroKey<T>, NullWritable, T> {
+
+ /**
+ * Constructor.
+ * @param context The TaskAttempContext to supply the writer with information form the job configuration
+ */
+ public AvroTrevniKeyRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ super(context);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(AvroKey<T> key, NullWritable value) throws IOException,
+ InterruptedException {
+ writer.write(key.datum());
+ if (writer.sizeEstimate() >= blockSize) // block full
+ flush();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected Schema initSchema(TaskAttemptContext context) {
+ boolean isMapOnly = context.getNumReduceTasks() == 0;
+ return isMapOnly ? AvroJob.getMapOutputKeySchema(context
+ .getConfiguration()) : AvroJob.getOutputKeySchema(context
+ .getConfiguration());
+ }
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyRecordWriter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.InputFormat} for Trevni files.
+ *
+ * This implement was modeled off
+ * {@link org.apache.avro.mapreduce.AvroKeyValueInputFormat} to allow for easy
+ * transition
+ *
+ * <p>
+ * A MapReduce InputFormat that reads from Trevni container files of key/value generic records.
+ *
+ * <p>
+ * Trevni container files that container generic records with the two fields 'key' and
+ * 'value' are expected. The contents of the 'key' field will be used as the job input
+ * key, and the contents of the 'value' field will be used as the job output value.</p>
+ *
+ * @param <K> The type of the Trevni key to read.
+ * @param <V> The type of the Trevni value to read.
+ *
+ * <p>
+ * A subset schema to be read may be specified with
+ * {@link AvroJob#setInputKeySchema(Schema)} and
+ * {@link AvroJob#setInputValueSchema(Schema)}..
+ */
+public class AvroTrevniKeyValueInputFormat<K, V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
+
+ /** {@inheritDoc} */
+ @Override
+ public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+
+ return new AvroTrevniKeyValueRecordReader<K, V>();
+ }
+
+
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes Avro data to
+ * Trevni files.
+ *
+ * This implement was modeled off
+ * {@link org.apache.avro.mapreduce.AvroKeyValueOutputFormat} to allow for easy
+ * transition
+ *
+ * * FileOutputFormat for writing Trevni container files of key/value pairs.
+ *
+ * <p>Since Trevni container files can only contain records (not key/value pairs), this
+ * output format puts the key and value into an Avro generic record with two fields, named
+ * 'key' and 'value'.</p>
+ *
+ * <p>The keys and values given to this output format may be Avro objects wrapped in
+ * <code>AvroKey</code> or <code>AvroValue</code> objects. The basic Writable types are
+ * also supported (e.g., IntWritable, Text); they will be converted to their corresponding
+ * Avro types.</p>
+ *
+ * @param <K> The type of key. If an Avro type, it must be wrapped in an <code>AvroKey</code>.
+ * @param <V> The type of value. If an Avro type, it must be wrapped in an <code>AvroValue</code>.
+ *
+ * <p>Writes a directory of files per task, each comprising a single filesystem
+ * block. To reduce the number of files, increase the default filesystem block
+ * size for the job. Each task also requires enough memory to buffer a
+ * filesystem block.
+ */
+public class AvroTrevniKeyValueOutputFormat <K, V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
+
+ /** {@inheritDoc} */
+ @Override
+ public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ return new AvroTrevniKeyValueRecordWriter<K, V>(context );
+ }
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueOutputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+
+/**
+ * Reads Trevni generic records from an Trevni container file, where the records contain two
+ * fields: 'key' and 'value'.
+ *
+ * <p>The contents of the 'key' field will be parsed into an AvroKey object. The contents
+ * of the 'value' field will be parsed into an AvroValue object.</p>
+ *
+ * @param <K> The type of the Avro key to read.
+ * @param <V> The type of the Avro value to read.
+ */
+public class AvroTrevniKeyValueRecordReader<K, V> extends AvroTrevniRecordReaderBase<AvroKey<K>, AvroValue<V>, GenericRecord> {
+
+ /** The current key the reader is on. */
+ private final AvroKey<K> mCurrentKey = new AvroKey<K>();
+ /** The current value the reader is on. */
+ private final AvroValue<V> mCurrentValue = new AvroValue<V>();
+
+ /** {@inheritDoc} */
+ @Override
+ public AvroKey<K> getCurrentKey() throws IOException,
+ InterruptedException {
+ return mCurrentKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public AvroValue<V> getCurrentValue() throws IOException,
+ InterruptedException {
+ return mCurrentValue;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ boolean hasNext = super.nextKeyValue();
+ AvroKeyValue<K, V> avroKeyValue = new AvroKeyValue<K, V>(getCurrentRecord());
+ mCurrentKey.datum(avroKeyValue.getKey());
+ mCurrentValue.datum(avroKeyValue.getValue());
+ return hasNext;
+ }
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes key/value pairs to an Trevni container file.
+ *
+ * <p>Each entry in the Trevni container file will be a generic record with two fields,
+ * named 'key' and 'value'. The input types may be basic Writable objects like Text or
+ * IntWritable, or they may be AvroWrapper subclasses (AvroKey or AvroValue). Writable
+ * objects will be converted to their corresponding Avro types when written to the generic
+ * record key/value pair.</p>
+ *
+ * @param <K> The type of key to write.
+ * @param <V> The type of value to write.
+ */
+public class AvroTrevniKeyValueRecordWriter <K, V> extends AvroTrevniRecordWriterBase<AvroKey<K>, AvroValue<V>, GenericRecord> {
+
+ /** The writer schema for the generic record entries of the Trevni container file. */
+ Schema mKeyValuePairSchema;
+
+ /** A reusable Avro generic record for writing key/value pairs to the file. */
+ AvroKeyValue<Object, Object> keyValueRecord;
+
+ /** A helper object that converts the input key to an Avro datum. */
+ AvroDatumConverter<K, ?> keyConverter;
+
+ /** A helper object that converts the input value to an Avro datum. */
+ AvroDatumConverter<V, ?> valueConverter;
+
+ /**
+ * Constructor.
+ * @param context The TaskAttempContext to supply the writer with information form the job configuration
+ */
+ public AvroTrevniKeyValueRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ super(context);
+
+ mKeyValuePairSchema = initSchema(context);
+ keyValueRecord = new AvroKeyValue<Object, Object>(new GenericData.Record(mKeyValuePairSchema));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(AvroKey<K> key, AvroValue<V> value) throws IOException,
+ InterruptedException {
+
+ keyValueRecord.setKey(key.datum());
+ keyValueRecord.setValue(value.datum());
+ writer.write(keyValueRecord.get());
+ if (writer.sizeEstimate() >= blockSize) // block full
+ flush();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Schema initSchema(TaskAttemptContext context) {
+ AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(
+ context.getConfiguration());
+
+ keyConverter = converterFactory.create((Class<K>) context
+ .getOutputKeyClass());
+ valueConverter = converterFactory.create((Class<V>) context
+ .getOutputValueClass());
+
+ // Create the generic record schema for the key/value pair.
+ return AvroKeyValue.getSchema(
+ keyConverter.getWriterSchema(), valueConverter.getWriterSchema());
+
+ }
+
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniKeyValueRecordWriter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.trevni.avro.AvroColumnReader;
+import org.apache.trevni.avro.HadoopInput;
+
+/**
+ * Abstract base class for <code>RecordReader</code>s that read Trevni container files.
+ *
+ * @param <K> The type of key the record reader should generate.
+ * @param <V> The type of value the record reader should generate.
+ * @param <T> The type of the entries within the Trevni container file being read.
+ */
+public abstract class AvroTrevniRecordReaderBase<K, V, T> extends RecordReader<K, V> {
+
+ /** The Trevni file reader */
+ private AvroColumnReader<T> reader;
+
+ /** Number of rows in the Trevni file */
+ private float rows;
+
+ /** The current row number being read in */
+ private long row;
+
+ /** A reusable object to hold records of the Avro container file. */
+ private T mCurrentRecord;
+
+ /** {@inheritDoc} */
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ final FileSplit file = (FileSplit)inputSplit;
+ context.setStatus(file.toString());
+
+ final AvroColumnReader.Params params =
+ new AvroColumnReader.Params(new HadoopInput(file.getPath(), context.getConfiguration()));
+ params.setModel(ReflectData.get());
+
+ if (AvroJob.getInputKeySchema(context.getConfiguration()) != null) {
+ params.setSchema(AvroJob.getInputKeySchema(context.getConfiguration()));
+ }
+
+ reader = new AvroColumnReader<T>(params);
+ rows = reader.getRowCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!reader.hasNext())
+ return false;
+ mCurrentRecord = reader.next();
+ row++;
+ return true;
+ }
+
+ /**
+ * Gets the current record read from the Trevni container file.
+ *
+ * <p>Calling <code>nextKeyValue()</code> moves this to the next record.</p>
+ *
+ * @return The current Trevni record (may be null if no record has been read).
+ */
+ protected T getCurrentRecord() {
+ return mCurrentRecord;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return row / rows;
+ }
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordReaderBase.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.trevni.ColumnFileMetaData;
+import org.apache.trevni.MetaData;
+import org.apache.trevni.avro.AvroColumnWriter;
+
+/**
+ * Abstract base class for <code>RecordWriter</code>s that writes Trevni container files.
+ *
+ * @param <K> The type of key the record writer should generate.
+ * @param <V> The type of value the record wrtier should generate.
+ * @param <T> The type of the entries within the Trevni container file being writen.
+ */
+public abstract class AvroTrevniRecordWriterBase<K,V, T> extends RecordWriter<K, V> {
+
+ /** trevni file extension */
+ public final static String EXT = ".trv";
+
+ /** prefix of job configs that we care about */
+ public static final String META_PREFIX = "trevni.meta.";
+
+ /** Counter that increments as new trevni files are create because the current file
+ * has exceeded the block size
+ * */
+ protected int part = 0;
+
+ /** Trevni file writer */
+ protected AvroColumnWriter<T> writer;
+
+ /** This will be a unique directory linked to the task */
+ final Path dirPath;
+
+ /** HDFS object */
+ final FileSystem fs;
+
+ /** Current configured blocksize */
+ final long blockSize;
+
+ /** Provided avro schema from the context */
+ protected Schema schema;
+
+ /** meta data to be stored in the output file. */
+ protected ColumnFileMetaData meta;
+
+ /**
+ * Constructor.
+ * @param context The TaskAttempContext to supply the writer with information form the job configuration
+ */
+ public AvroTrevniRecordWriterBase(TaskAttemptContext context) throws IOException {
+
+ schema = initSchema(context);
+ meta = filterMetadata(context.getConfiguration());
+ writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
+
+ Path outputPath = FileOutputFormat.getOutputPath(context);
+
+ String dir = FileOutputFormat.getUniqueFile(context, "part", "");
+ dirPath = new Path(outputPath.toString() + "/" + dir);
+ fs = dirPath.getFileSystem(context.getConfiguration());
+ fs.mkdirs(dirPath);
+
+ blockSize = fs.getDefaultBlockSize();
+ }
+
+ /**
+ * Use the task context to construct a schema for writing
+ * @throws IOException
+ */
+ abstract protected Schema initSchema(TaskAttemptContext context);
+
+ /**
+ * A Trevni flush will close the current file and prep a new writer
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ OutputStream out = fs.create(new Path(dirPath, "part-" + (part++) + EXT));
+ try {
+ writer.writeTo(out);
+ } finally {
+ out.close();
+ }
+ writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException,
+ InterruptedException {
+ flush();
+ }
+
+ static ColumnFileMetaData filterMetadata(final Configuration configuration) {
+ final ColumnFileMetaData meta = new ColumnFileMetaData();
+ Iterator<Entry<String, String>> keyIterator = configuration.iterator();
+
+ while (keyIterator.hasNext()) {
+ Entry<String, String> confEntry = keyIterator.next();
+ if (confEntry.getKey().startsWith(META_PREFIX))
+ meta.put(confEntry.getKey().substring(META_PREFIX.length()), confEntry
+ .getValue().getBytes(MetaData.UTF8));
+ }
+
+ return meta;
+ }
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/main/java/org/apache/trevni/avro/mapreduce/AvroTrevniRecordWriterBase.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java?rev=1447823&r1=1447822&r2=1447823&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/TestWordCount.java Tue Feb 19 17:15:22 2013
@@ -50,7 +50,6 @@ import org.apache.avro.Schema;
import org.junit.Test;
import static org.junit.Assert.*;
-import static org.apache.trevni.avro.WordCountUtil.DIR;
public class TestWordCount {
@@ -82,14 +81,16 @@ public class TestWordCount {
testInputFormat();
}
- private static final Schema STRING = Schema.create(Schema.Type.STRING);
+ static final Schema STRING = Schema.create(Schema.Type.STRING);
static { GenericData.setStringType(STRING, GenericData.StringType.String); }
- private static final Schema LONG = Schema.create(Schema.Type.LONG);
+ static final Schema LONG = Schema.create(Schema.Type.LONG);
public void testOutputFormat() throws Exception {
JobConf job = new JobConf();
- WordCountUtil.writeLinesFile();
+ WordCountUtil wordCountUtil = new WordCountUtil("trevniMapredTest");
+
+ wordCountUtil.writeLinesFile();
AvroJob.setInputSchema(job, STRING);
AvroJob.setOutputSchema(job, Pair.getPairSchema(STRING,LONG));
@@ -98,15 +99,15 @@ public class TestWordCount {
AvroJob.setCombinerClass(job, ReduceImpl.class);
AvroJob.setReducerClass(job, ReduceImpl.class);
- FileInputFormat.setInputPaths(job, new Path(DIR + "/in"));
- FileOutputFormat.setOutputPath(job, new Path(DIR + "/out"));
+ FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/in"));
+ FileOutputFormat.setOutputPath(job, new Path(wordCountUtil.getDir().toString() + "/out"));
FileOutputFormat.setCompressOutput(job, true);
job.setOutputFormat(AvroTrevniOutputFormat.class);
JobClient.runJob(job);
- WordCountUtil.validateCountsFile();
+ wordCountUtil.validateCountsFile();
}
private static long total;
@@ -121,6 +122,9 @@ public class TestWordCount {
public void testInputFormat() throws Exception {
JobConf job = new JobConf();
+ WordCountUtil wordCountUtil = new WordCountUtil("trevniMapredTest");
+
+
Schema subSchema = Schema.parse("{\"type\":\"record\"," +
"\"name\":\"PairValue\","+
"\"fields\": [ " +
@@ -128,7 +132,7 @@ public class TestWordCount {
"]}");
AvroJob.setInputSchema(job, subSchema);
AvroJob.setMapperClass(job, Counter.class);
- FileInputFormat.setInputPaths(job, new Path(DIR + "/out/*"));
+ FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/out/*"));
job.setInputFormat(AvroTrevniInputFormat.class);
job.setNumReduceTasks(0); // map-only
Modified: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java?rev=1447823&r1=1447822&r2=1447823&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java (original)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/WordCountUtil.java Tue Feb 19 17:15:22 2013
@@ -41,9 +41,11 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.avro.Schema;
+import org.apache.avro.hadoop.io.AvroKeyValue;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.DataFileStream;
@@ -51,12 +53,20 @@ import org.apache.avro.mapred.Pair;
public class WordCountUtil {
- public static final File DIR = new File("target", "wc");
- public static final File LINES_FILE
- = new File(new File(DIR, "in"), "lines.avro");
- static final File COUNTS_FILE
- = new File(new File(DIR, "out"), "part-00000/part-0.trv");
+ public File dir;
+ public File linesFiles;
+ public File countFiles;
+ public WordCountUtil (String testName) {
+ this(testName, "part-00000");
+ }
+
+ public WordCountUtil (String testName, String partDirName) {
+ dir = new File("target/wc", testName);
+ linesFiles = new File(new File(dir, "in"), "lines.avro");
+ countFiles = new File(new File(dir, "out"), partDirName + "/part-0.trv");
+ }
+
public static final String[] LINES = new String[] {
"the quick brown fox jumps over the lazy dog",
"the cow jumps over the moon",
@@ -80,21 +90,25 @@ public class WordCountUtil {
TOTAL = total;
}
- public static void writeLinesFile() throws IOException {
- FileUtil.fullyDelete(DIR);
+ public File getDir() {
+ return dir;
+ }
+
+ public void writeLinesFile() throws IOException {
+ FileUtil.fullyDelete(dir);
DatumWriter<String> writer = new GenericDatumWriter<String>();
DataFileWriter<String> out = new DataFileWriter<String>(writer);
- LINES_FILE.getParentFile().mkdirs();
- out.create(Schema.create(Schema.Type.STRING), LINES_FILE);
+ linesFiles.getParentFile().mkdirs();
+ out.create(Schema.create(Schema.Type.STRING), linesFiles);
for (String line : LINES)
out.append(line);
out.close();
}
- public static void validateCountsFile() throws Exception {
+ public void validateCountsFile() throws Exception {
AvroColumnReader<Pair<String,Long>> reader =
new AvroColumnReader<Pair<String,Long>>
- (new AvroColumnReader.Params(COUNTS_FILE).setModel(SpecificData.get()));
+ (new AvroColumnReader.Params(countFiles).setModel(SpecificData.get()));
int numWords = 0;
for (Pair<String,Long> wc : reader) {
assertEquals(wc.key(), COUNTS.get(wc.key()), wc.value());
@@ -103,5 +117,19 @@ public class WordCountUtil {
reader.close();
assertEquals(COUNTS.size(), numWords);
}
+
+ public void validateCountsFileGenericRecord() throws Exception {
+ AvroColumnReader<GenericRecord > reader =
+ new AvroColumnReader<GenericRecord >
+ (new AvroColumnReader.Params(countFiles).setModel(SpecificData.get()));
+ int numWords = 0;
+ for (GenericRecord wc : reader) {
+ assertEquals((String)wc.get("key"), COUNTS.get(wc.get("key")), (Long)wc.get("value"));
+ //assertEquals(wc.getKey(), COUNTS.get(wc.getKey()), wc.getValue());
+ numWords++;
+ }
+ reader.close();
+ assertEquals(COUNTS.size(), numWords);
+ }
}
Added: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.trevni.avro.WordCountUtil;
+import org.junit.Test;
+
+public class TestKeyValueWordCount {
+
+ private static long total = 0;
+
+ static final Schema STRING = Schema.create(Schema.Type.STRING);
+ static { GenericData.setStringType(STRING, GenericData.StringType.String); }
+ static final Schema LONG = Schema.create(Schema.Type.LONG);
+
+ private static class WordCountMapper extends
+ Mapper<AvroKey<String>, NullWritable, Text, LongWritable> {
+ private LongWritable mCount = new LongWritable();
+ private Text mText = new Text();
+
+ @Override
+ protected void setup(Context context) {
+ mCount.set(1);
+ }
+
+ @Override
+ protected void map(AvroKey<String> key, NullWritable value, Context context)
+ throws IOException, InterruptedException {
+
+ try {
+ StringTokenizer tokens = new StringTokenizer(key.datum());
+ while (tokens.hasMoreTokens()) {
+ mText.set(tokens.nextToken());
+ context.write(mText, mCount);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(key + " " + key.datum(), e);
+ }
+
+ }
+ }
+
+ private static class WordCountReducer extends Reducer< Text, LongWritable, AvroKey<String>, AvroValue<Long>> {
+
+ AvroKey<String> resultKey = new AvroKey<String>();
+ AvroValue<Long> resultValue = new AvroValue<Long>();
+
+ @Override
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ long sum = 0;
+ for (LongWritable value: values) {
+ sum += value.get();
+ }
+ resultKey.datum(key.toString());
+ resultValue.datum(sum);
+
+ context.write(resultKey, resultValue);
+ }
+ }
+
+ public static class Counter extends
+ Mapper<AvroKey<String>, AvroValue<Long>, NullWritable, NullWritable> {
+ @Override
+ protected void map(AvroKey<String> key, AvroValue<Long> value, Context context)
+ throws IOException, InterruptedException {
+ total += value.datum();
+ }
+ }
+
+ @Test
+ public void testKeyOutputFormat() throws Exception {
+ Job job = new Job();
+
+ WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyValueTest", "part-r-00000");
+
+ wordCountUtil.writeLinesFile();
+
+ AvroJob.setInputKeySchema(job, STRING);
+ AvroJob.setOutputKeySchema(job, STRING);
+ AvroJob.setOutputValueSchema(job, LONG);
+
+ job.setMapperClass(WordCountMapper.class);
+ job.setReducerClass(WordCountReducer.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+
+ FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/in"));
+ FileOutputFormat.setOutputPath(job, new Path(wordCountUtil.getDir().toString() + "/out"));
+ FileOutputFormat.setCompressOutput(job, true);
+
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ job.setOutputFormatClass(AvroTrevniKeyValueOutputFormat.class);
+
+ job.waitForCompletion(true);
+
+ wordCountUtil.validateCountsFileGenericRecord();
+ }
+
+ @Test
+ public void testInputFormat() throws Exception {
+ Job job = new Job();
+
+ WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyValueTest");
+
+ job.setMapperClass(Counter.class);
+
+ FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/out/*"));
+ job.setInputFormatClass(AvroTrevniKeyValueInputFormat.class);
+
+ job.setNumReduceTasks(0);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ total = 0;
+ job.waitForCompletion(true);
+ assertEquals(WordCountUtil.TOTAL, total);
+
+ }
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyValueWordCount.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java?rev=1447823&view=auto
==============================================================================
--- avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java (added)
+++ avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java Tue Feb 19 17:15:22 2013
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.Pair;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.trevni.avro.WordCountUtil;
+import org.apache.trevni.avro.mapreduce.AvroTrevniKeyOutputFormat;
+import org.junit.Test;
+
+public class TestKeyWordCount {
+
+ private static long total = 0;
+
+ static final Schema STRING = Schema.create(Schema.Type.STRING);
+ static { GenericData.setStringType(STRING, GenericData.StringType.String); }
+ static final Schema LONG = Schema.create(Schema.Type.LONG);
+
+
+ private static class WordCountMapper extends
+ Mapper<AvroKey<String>, NullWritable, Text, LongWritable> {
+ private LongWritable mCount = new LongWritable();
+ private Text mText = new Text();
+
+ @Override
+ protected void setup(Context context) {
+ mCount.set(1);
+ }
+
+ @Override
+ protected void map(AvroKey<String> key, NullWritable value, Context context)
+ throws IOException, InterruptedException {
+
+ try {
+ StringTokenizer tokens = new StringTokenizer(key.datum());
+ while (tokens.hasMoreTokens()) {
+ mText.set(tokens.nextToken());
+ context.write(mText, mCount);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(key + " " + key.datum() , e);
+ }
+
+ }
+ }
+
+ private static class WordCountReducer extends Reducer< Text, LongWritable, AvroKey<GenericData.Record>, NullWritable> {
+
+ private AvroKey<GenericData.Record> result ;
+
+ @Override
+ protected void setup(Context context) {
+ result = new AvroKey<GenericData.Record>();
+ result.datum(new Record(Pair.getPairSchema(STRING,LONG)));
+ }
+
+ @Override
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ long count = 0;
+ for (LongWritable value: values) {
+ count += value.get();
+ }
+
+ result.datum().put("key", key.toString());
+ result.datum().put("value", count);
+
+ context.write(result, NullWritable.get());
+ }
+ }
+
+
+
+ public static class Counter extends
+ Mapper<AvroKey<GenericData.Record>, NullWritable, NullWritable, NullWritable> {
+ @Override
+ protected void map(AvroKey<GenericData.Record> key, NullWritable value, Context context)
+ throws IOException, InterruptedException {
+ total += (Long)key.datum().get("value");
+ }
+ }
+
+
+ @Test
+ public void testKeyOutputFormat() throws Exception {
+ Job job = new Job();
+
+ WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyTest", "part-r-00000");
+
+ wordCountUtil.writeLinesFile();
+
+ AvroJob.setInputKeySchema(job, STRING);
+ AvroJob.setOutputKeySchema(job, Pair.getPairSchema(STRING,LONG));
+
+ job.setMapperClass(WordCountMapper.class);
+ job.setReducerClass(WordCountReducer.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+
+ FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/in"));
+ FileOutputFormat.setOutputPath(job, new Path(wordCountUtil.getDir().toString() + "/out"));
+ FileOutputFormat.setCompressOutput(job, true);
+
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ job.setOutputFormatClass(AvroTrevniKeyOutputFormat.class);
+
+ job.waitForCompletion(true);
+
+ wordCountUtil.validateCountsFile();
+ }
+
+ @Test
+ public void testInputFormat() throws Exception {
+ Job job = new Job();
+
+ WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyTest");
+
+ job.setMapperClass(Counter.class);
+
+ Schema subSchema = Schema.parse("{\"type\":\"record\"," +
+ "\"name\":\"PairValue\","+
+ "\"fields\": [ " +
+ "{\"name\":\"value\", \"type\":\"long\"}" +
+ "]}");
+ AvroJob.setInputKeySchema(job, subSchema);
+
+ FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/out/*"));
+ job.setInputFormatClass(AvroTrevniKeyInputFormat.class);
+
+ job.setNumReduceTasks(0);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ total = 0;
+ job.waitForCompletion(true);
+ assertEquals(WordCountUtil.TOTAL, total);
+
+ }
+
+}
Propchange: avro/trunk/lang/java/trevni/avro/src/test/java/org/apache/trevni/avro/mapreduce/TestKeyWordCount.java
------------------------------------------------------------------------------
svn:eol-style = native