You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/09/18 00:22:36 UTC
svn commit: r998354 - in /avro/trunk: ./
lang/java/src/java/org/apache/avro/file/
lang/java/src/java/org/apache/avro/mapred/
lang/java/src/java/org/apache/avro/tool/
lang/java/src/test/java/org/apache/avro/mapred/
Author: cutting
Date: Fri Sep 17 22:22:35 2010
New Revision: 998354
URL: http://svn.apache.org/viewvc?rev=998354&view=rev
Log:
AVRO-662. Java: Add support for reading Hadoop sequence files as Avro data to MapReduce API.
Added:
avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Sep 17 22:22:35 2010
@@ -10,6 +10,9 @@ Avro 1.4.1 (unreleased)
AVRO-641. Java: Add SASL security for socket-based RPC. (cutting)
+ AVRO-634. Java: Add support for reading Hadoop sequence files as
+ Avro data to MapReduce API. (cutting)
+
IMPROVEMENTS
AVRO-655. Change build so that 'dist' target no longer also runs C
Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java Fri Sep 17 22:22:35 2010
@@ -29,7 +29,8 @@ import static org.apache.avro.file.DataF
/** Random access to files written with {@link DataFileWriter}.
* @see DataFileWriter
*/
-public class DataFileReader<D> extends DataFileStream<D> {
+public class DataFileReader<D>
+ extends DataFileStream<D> implements FileReader<D> {
private SeekableInputStream sin;
private long blockStart;
@@ -105,6 +106,8 @@ public class DataFileReader<D> extends D
return ((blockStart >= position+SYNC_SIZE)||(blockStart >= sin.length()));
}
+ @Override public long tell() throws IOException { return sin.tell(); }
+
private static class SeekableInputStream extends InputStream
implements SeekableInput {
private final byte[] oneByte = new byte[1];
Added: avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.IOException;
+import java.io.Closeable;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+
+/** Interface for reading data from a file. */
+public interface FileReader<D> extends Iterator<D>, Iterable<D>, Closeable {
+ /** Return the schema for data in this file. */
+ Schema getSchema();
+
+ /** Read the next datum from the file.
+ * @param reuse an instance to reuse.
+ * @throws NoSuchElementException if no more remain in the file.
+ */
+ D next(D reuse) throws IOException;
+
+ /** Move to the next synchronization point after a position. To process a
+ * range of file entires, call this with the starting position, then check
+ * {@link #pastSync(long)} with the end point before each call to {@link
+ * #next()}. */
+ void sync(long position) throws IOException;
+
+ /** Return true if past the next synchronization point after a position. */
+ boolean pastSync(long position) throws IOException;
+
+ /** Return the current position in the input. */
+ long tell() throws IOException;
+
+}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Fri Sep 17 22:22:35 2010
@@ -95,6 +95,11 @@ public class AvroJob {
}
}
+ /** Indicate that a job's input files are in SequenceFile format.*/
+ public static void setInputSequenceFile(JobConf job) {
+ job.setInputFormat(SequenceFileInputFormat.class);
+ }
+
/** Return a job's output key schema. */
public static Schema getOutputSchema(Configuration job) {
return Schema.parse(job.get(OUTPUT_SCHEMA));
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java Fri Sep 17 22:22:35 2010
@@ -25,7 +25,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.avro.Schema;
+import org.apache.avro.file.FileReader;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.specific.SpecificDatumReader;
@@ -33,20 +33,23 @@ import org.apache.avro.specific.Specific
public class AvroRecordReader<T>
implements RecordReader<AvroWrapper<T>, NullWritable> {
- private FsInput in;
- private DataFileReader<T> reader;
+ private FileReader<T> reader;
private long start;
private long end;
public AvroRecordReader(JobConf job, FileSplit split)
throws IOException {
- this.in = new FsInput(split.getPath(), job);
-
- Schema s = AvroJob.getInputSchema(job);
- this.reader = new DataFileReader<T>(in, new SpecificDatumReader<T>(s));
+ this(new DataFileReader<T>
+ (new FsInput(split.getPath(), job),
+ new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
+ split);
+ }
+ protected AvroRecordReader(FileReader<T> reader, FileSplit split)
+ throws IOException {
+ this.reader = reader;
reader.sync(split.getStart()); // sync to start
- this.start = in.tell();
+ this.start = reader.tell();
this.end = split.getStart() + split.getLength();
}
@@ -68,12 +71,12 @@ public class AvroRecordReader<T>
if (end == start) {
return 0.0f;
} else {
- return Math.min(1.0f, (in.tell() - start) / (float)(end - start));
+ return Math.min(1.0f, (getPos() - start) / (float)(end - start));
}
}
public long getPos() throws IOException {
- return in.tell();
+ return reader.tell();
}
public void close() throws IOException { reader.close(); }
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+
+/** An {@link org.apache.hadoop.mapred.InputFormat} for sequence files. */
+public class SequenceFileInputFormat<K,V>
+ extends FileInputFormat<AvroWrapper<Pair<K,V>>, NullWritable> {
+
+ @Override
+ public RecordReader<AvroWrapper<Pair<K,V>>, NullWritable>
+ getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ reporter.setStatus(split.toString());
+ return new SequenceFileRecordReader<K,V>(job, (FileSplit)split);
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.NoSuchElementException;
+import java.net.URI;
+import java.lang.reflect.Type;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.reflect.ReflectData;
+
+/** A {@link FileReader} for sequence files. */
+@SuppressWarnings(value="unchecked")
+public class SequenceFileReader<K,V> implements FileReader<Pair<K,V>> {
+ private SequenceFile.Reader reader;
+ private Schema schema;
+ private boolean ready = false; // true iff done & key are current
+ private boolean done = false; // true iff at EOF
+ private Writable key, spareKey, value;
+
+ private Converter<K> keyConverter =
+ new Converter<K>() { public K convert(Writable o) { return (K)o; } };
+
+ private Converter<V> valConverter =
+ new Converter<V>() { public V convert(Writable o) { return (V)o; } };
+
+ public SequenceFileReader(File file) throws IOException {
+ this(file.toURI(), new Configuration());
+ }
+
+ public SequenceFileReader(URI uri, Configuration c) throws IOException {
+ this(new SequenceFile.Reader(FileSystem.get(uri, c),
+ new Path(uri.toString()), c), c);
+ }
+
+ public SequenceFileReader(SequenceFile.Reader reader, Configuration conf) {
+ this.reader = reader;
+ this.schema =
+ Pair.getPairSchema(WritableData.get().getSchema(reader.getKeyClass()),
+ WritableData.get().getSchema(reader.getValueClass()));
+ this.key =
+ (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ this.spareKey =
+ (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ this.value =
+ (Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ if (WRITABLE_CONVERTERS.containsKey(reader.getKeyClass()) )
+ keyConverter = WRITABLE_CONVERTERS.get(reader.getKeyClass());
+ if (WRITABLE_CONVERTERS.containsKey(reader.getValueClass()) )
+ valConverter = WRITABLE_CONVERTERS.get(reader.getValueClass());
+ }
+
+ @Override public void close() throws IOException { reader.close(); }
+
+ @Override public void remove() { throw new UnsupportedOperationException(); }
+
+ @Override public Iterator<Pair<K,V>> iterator() { return this; }
+
+ @Override public Schema getSchema() { return schema; }
+
+ private void prepare() throws IOException {
+ if (ready) return;
+ this.done = !reader.next(key);
+ ready = true;
+ }
+
+ @Override public boolean hasNext() {
+ try {
+ prepare();
+ return !done;
+ } catch (IOException e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+
+ @Override public Pair<K,V> next() {
+ try {
+ return next(null);
+ } catch (IOException e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+
+ @Override public Pair<K,V> next(Pair<K,V> reuse) throws IOException {
+ prepare();
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Pair<K,V> result = reuse;
+ if (result == null)
+ result = new Pair<K,V>(schema);
+
+ result.key(keyConverter.convert(key));
+ reader.getCurrentValue(value);
+ result.value(valConverter.convert(value));
+
+ // swap key and spareKey
+ Writable k = key;
+ key = spareKey;
+ spareKey = k;
+
+ ready = false;
+
+ return result;
+ }
+
+ @Override public void sync(long position) throws IOException {
+ if (position > reader.getPosition())
+ reader.sync(position);
+ ready = false;
+ }
+
+ @Override public boolean pastSync(long position) throws IOException {
+ return reader.getPosition() >= position && reader.syncSeen();
+ }
+
+ @Override public long tell() throws IOException {return reader.getPosition();}
+
+ private static final Map<Type,Schema> WRITABLE_SCHEMAS =
+ new HashMap<Type,Schema>();
+ static {
+ WRITABLE_SCHEMAS.put(NullWritable.class,
+ Schema.create(Schema.Type.NULL));
+ WRITABLE_SCHEMAS.put(BooleanWritable.class,
+ Schema.create(Schema.Type.BOOLEAN));
+ WRITABLE_SCHEMAS.put(IntWritable.class,
+ Schema.create(Schema.Type.INT));
+ WRITABLE_SCHEMAS.put(LongWritable.class,
+ Schema.create(Schema.Type.LONG));
+ WRITABLE_SCHEMAS.put(FloatWritable.class,
+ Schema.create(Schema.Type.FLOAT));
+ WRITABLE_SCHEMAS.put(DoubleWritable.class,
+ Schema.create(Schema.Type.DOUBLE));
+ WRITABLE_SCHEMAS.put(BytesWritable.class,
+ Schema.create(Schema.Type.BYTES));
+ WRITABLE_SCHEMAS.put(Text.class,
+ Schema.create(Schema.Type.STRING));
+ }
+
+ private static class WritableData extends ReflectData {
+ private static final WritableData INSTANCE = new WritableData();
+ protected WritableData() {}
+
+ /** Return the singleton instance. */
+ public static WritableData get() { return INSTANCE; }
+
+ @Override public Schema getSchema(java.lang.reflect.Type type) {
+ if (WRITABLE_SCHEMAS.containsKey(type))
+ return WRITABLE_SCHEMAS.get(type);
+ else
+ return super.getSchema(type);
+ }
+ }
+
+ private interface Converter<T> {
+ T convert(Writable o);
+ }
+
+ private static final Map<Type,Converter> WRITABLE_CONVERTERS =
+ new HashMap<Type,Converter>();
+ static {
+ WRITABLE_CONVERTERS.put
+ (NullWritable.class,
+ new Converter<Void>() {
+ public Void convert(Writable o) { return null; }
+ });
+ WRITABLE_CONVERTERS.put
+ (BooleanWritable.class,
+ new Converter<Boolean>() {
+ public Boolean convert(Writable o) {return ((BooleanWritable)o).get();}
+ });
+ WRITABLE_CONVERTERS.put
+ (IntWritable.class,
+ new Converter<Integer>() {
+ public Integer convert(Writable o) { return ((IntWritable)o).get(); }
+ });
+ WRITABLE_CONVERTERS.put
+ (LongWritable.class,
+ new Converter<Long>() {
+ public Long convert(Writable o) { return ((LongWritable)o).get(); }
+ });
+ WRITABLE_CONVERTERS.put
+ (FloatWritable.class,
+ new Converter<Float>() {
+ public Float convert(Writable o) { return ((FloatWritable)o).get(); }
+ });
+ WRITABLE_CONVERTERS.put
+ (DoubleWritable.class,
+ new Converter<Double>() {
+ public Double convert(Writable o) { return ((DoubleWritable)o).get(); }
+ });
+ WRITABLE_CONVERTERS.put
+ (BytesWritable.class,
+ new Converter<ByteBuffer>() {
+ public ByteBuffer convert(Writable o) {
+ BytesWritable b = (BytesWritable)o;
+ return ByteBuffer.wrap(b.getBytes(), 0, b.getLength());
+ }
+ });
+ WRITABLE_CONVERTERS.put
+ (Text.class,
+ new Converter<String>() {
+ public String convert(Writable o) { return o.toString(); }
+ });
+ }
+
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileSplit;
+
+/** A {@link org.apache.hadoop.mapred.RecordReader} for sequence files. */
+public class SequenceFileRecordReader<K,V> extends AvroRecordReader<Pair<K,V>> {
+
+ public SequenceFileRecordReader(JobConf job, FileSplit split)
+ throws IOException {
+ super(new SequenceFileReader<K,V>(split.getPath().toUri(), job),
+ split);
+ }
+
+}
+
Modified: avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java Fri Sep 17 22:22:35 2010
@@ -23,6 +23,7 @@ import java.io.PrintStream;
import java.util.List;
import org.apache.avro.Schema;
+import org.apache.avro.file.FileReader;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.generic.GenericDatumReader;
@@ -55,7 +56,7 @@ public class DataFileReadTool implements
}
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
- DataFileReader<Object> fileReader =
+ FileReader<Object> fileReader =
new DataFileReader<Object>(new File(args.get(0)), reader);
try {
Schema schema = fileReader.getSchema();
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.File;
+import java.net.URI;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.specific.SpecificDatumReader;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSequenceFileReader {
+ private static final int COUNT =
+ Integer.parseInt(System.getProperty("test.count", "10"));
+ private static final File DIR
+ = new File(System.getProperty("test.dir", "/tmp"));
+ private static final File FILE = new File(DIR, "test.seq");
+
+ @BeforeClass
+ public static void testWriteSequenceFile() throws IOException {
+ FILE.delete();
+ Configuration c = new Configuration();
+ URI uri = FILE.toURI();
+ SequenceFile.Writer writer
+ = new SequenceFile.Writer(FileSystem.get(uri, c), c,
+ new Path(uri.toString()),
+ LongWritable.class, Text.class);
+ final LongWritable key = new LongWritable();
+ final Text val = new Text();
+ for (int i = 0; i < COUNT; ++i) {
+ key.set(i);
+ val.set(Integer.toString(i));
+ writer.append(key, val);
+ }
+ writer.close();
+ }
+
+ @Test
+ public void testReadSequenceFile() throws Exception {
+ checkFile(new SequenceFileReader<Long,CharSequence>(FILE));
+ }
+
+ public void checkFile(FileReader<Pair<Long,CharSequence>> reader) throws Exception {
+ long i = 0;
+ for (Pair<Long,CharSequence> p : reader) {
+ assertEquals((Long)i, p.key());
+ assertEquals(Long.toString(i), p.value().toString());
+ i++;
+ }
+ assertEquals(COUNT, i);
+ reader.close();
+ }
+
+ @Test
+ public void testSequenceFileInputFormat() throws Exception {
+ JobConf job = new JobConf();
+ Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
+
+ output.getFileSystem(job).delete(output);
+
+ Schema schema = Pair.getPairSchema(Schema.create(Schema.Type.LONG),
+ Schema.create(Schema.Type.STRING));
+
+ AvroJob.setInputSequenceFile(job);
+
+ AvroJob.setInputSchema(job, schema);
+ AvroJob.setOutputSchema(job, schema);
+
+ FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+ FileOutputFormat.setOutputPath(job, output);
+
+ JobClient.runJob(job);
+
+ checkFile(new DataFileReader<Pair<Long,CharSequence>>
+ (new File(output.toString()+"/part-00000.avro"),
+ new SpecificDatumReader<Pair<Long,CharSequence>>()));
+ }
+
+}