You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/09/18 00:22:36 UTC

svn commit: r998354 - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/file/ lang/java/src/java/org/apache/avro/mapred/ lang/java/src/java/org/apache/avro/tool/ lang/java/src/test/java/org/apache/avro/mapred/

Author: cutting
Date: Fri Sep 17 22:22:35 2010
New Revision: 998354

URL: http://svn.apache.org/viewvc?rev=998354&view=rev
Log:
AVRO-662. Java: Add support for reading Hadoop sequence files as Avro data to MapReduce API.

Added:
    avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Sep 17 22:22:35 2010
@@ -10,6 +10,9 @@ Avro 1.4.1 (unreleased)
 
     AVRO-641. Java: Add SASL security for socket-based RPC. (cutting)
 
+    AVRO-634. Java: Add support for reading Hadoop sequence files as
+    Avro data to MapReduce API. (cutting)
+
   IMPROVEMENTS
 
     AVRO-655. Change build so that 'dist' target no longer also runs C

Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java Fri Sep 17 22:22:35 2010
@@ -29,7 +29,8 @@ import static org.apache.avro.file.DataF
 /** Random access to files written with {@link DataFileWriter}.
  * @see DataFileWriter
  */
-public class DataFileReader<D> extends DataFileStream<D> {
+public class DataFileReader<D>
+  extends DataFileStream<D> implements FileReader<D> {
   private SeekableInputStream sin;
   private long blockStart;
 
@@ -105,6 +106,8 @@ public class DataFileReader<D> extends D
     return ((blockStart >= position+SYNC_SIZE)||(blockStart >= sin.length()));
   }
 
+  @Override public long tell() throws IOException { return sin.tell(); }
+
   private static class SeekableInputStream extends InputStream 
   implements SeekableInput {
     private final byte[] oneByte = new byte[1];

Added: avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/FileReader.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.IOException;
+import java.io.Closeable;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+
+/** Interface for reading data from a file. */
+public interface FileReader<D> extends Iterator<D>, Iterable<D>, Closeable {
+  /** Return the schema for data in this file. */
+  Schema getSchema();
+
+  /** Read the next datum from the file.
+   * @param reuse an instance to reuse.
+   * @throws NoSuchElementException if no more remain in the file.
+   */
+  D next(D reuse) throws IOException;
+
+  /** Move to the next synchronization point after a position. To process a
+   * range of file entires, call this with the starting position, then check
+   * {@link #pastSync(long)} with the end point before each call to {@link
+   * #next()}. */
+  void sync(long position) throws IOException;
+
+  /** Return true if past the next synchronization point after a position. */ 
+  boolean pastSync(long position) throws IOException;
+
+  /** Return the current position in the input. */
+  long tell() throws IOException;
+
+}

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Fri Sep 17 22:22:35 2010
@@ -95,6 +95,11 @@ public class AvroJob {
     }
   }
 
+  /** Indicate that a job's input files are in SequenceFile format.*/
+  public static void setInputSequenceFile(JobConf job) {
+    job.setInputFormat(SequenceFileInputFormat.class);
+  }
+
   /** Return a job's output key schema. */
   public static Schema getOutputSchema(Configuration job) {
     return Schema.parse(job.get(OUTPUT_SCHEMA));

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java Fri Sep 17 22:22:35 2010
@@ -25,7 +25,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.RecordReader;
 
-import org.apache.avro.Schema;
+import org.apache.avro.file.FileReader;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.specific.SpecificDatumReader;
 
@@ -33,20 +33,23 @@ import org.apache.avro.specific.Specific
 public class AvroRecordReader<T>
   implements RecordReader<AvroWrapper<T>, NullWritable> {
 
-  private FsInput in;
-  private DataFileReader<T> reader;
+  private FileReader<T> reader;
   private long start;
   private long end;
 
   public AvroRecordReader(JobConf job, FileSplit split)
     throws IOException {
-    this.in = new FsInput(split.getPath(), job);
-
-    Schema s = AvroJob.getInputSchema(job);
-    this.reader = new DataFileReader<T>(in, new SpecificDatumReader<T>(s));
+    this(new DataFileReader<T>
+         (new FsInput(split.getPath(), job),
+          new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
+         split);
+  }
 
+  protected AvroRecordReader(FileReader<T> reader, FileSplit split)
+    throws IOException {
+    this.reader = reader;
     reader.sync(split.getStart());                    // sync to start
-    this.start = in.tell();
+    this.start = reader.tell();
     this.end = split.getStart() + split.getLength();
   }
 
@@ -68,12 +71,12 @@ public class AvroRecordReader<T>
     if (end == start) {
       return 0.0f;
     } else {
-      return Math.min(1.0f, (in.tell() - start) / (float)(end - start));
+      return Math.min(1.0f, (getPos() - start) / (float)(end - start));
     }
   }
   
   public long getPos() throws IOException {
-    return in.tell();
+    return reader.tell();
   }
 
   public void close() throws IOException { reader.close(); }

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileInputFormat.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+
+/** An {@link org.apache.hadoop.mapred.InputFormat} for sequence files. */
+public class SequenceFileInputFormat<K,V>
+  extends FileInputFormat<AvroWrapper<Pair<K,V>>, NullWritable> {
+
+  @Override
+  public RecordReader<AvroWrapper<Pair<K,V>>, NullWritable>
+      getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+    throws IOException {
+    reporter.setStatus(split.toString());
+    return new SequenceFileRecordReader<K,V>(job, (FileSplit)split);
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileReader.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.NoSuchElementException;
+import java.net.URI;
+import java.lang.reflect.Type;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.reflect.ReflectData;
+
+/** A {@link FileReader} for sequence files. */
+@SuppressWarnings(value="unchecked")
+public class SequenceFileReader<K,V> implements FileReader<Pair<K,V>> {
+  private SequenceFile.Reader reader;
+  private Schema schema;
+  private boolean ready = false;            // true iff done & key are current
+  private boolean done = false;             // true iff at EOF
+  private Writable key, spareKey, value;
+
+  private Converter<K> keyConverter =
+    new Converter<K>() { public K convert(Writable o) { return (K)o; } };
+
+  private Converter<V> valConverter =
+    new Converter<V>() { public V convert(Writable o) { return (V)o; } };
+
+  public SequenceFileReader(File file) throws IOException {
+    this(file.toURI(), new Configuration());
+  }
+
+  public SequenceFileReader(URI uri, Configuration c) throws IOException {
+    this(new SequenceFile.Reader(FileSystem.get(uri, c),
+                                 new Path(uri.toString()), c), c);
+  }
+
+  public SequenceFileReader(SequenceFile.Reader reader, Configuration conf) {
+    this.reader = reader;
+    this.schema =
+      Pair.getPairSchema(WritableData.get().getSchema(reader.getKeyClass()),
+                         WritableData.get().getSchema(reader.getValueClass()));
+    this.key =
+      (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+    this.spareKey =
+      (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+    this.value =
+      (Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+    if (WRITABLE_CONVERTERS.containsKey(reader.getKeyClass()) )
+      keyConverter = WRITABLE_CONVERTERS.get(reader.getKeyClass());
+    if (WRITABLE_CONVERTERS.containsKey(reader.getValueClass()) )
+      valConverter = WRITABLE_CONVERTERS.get(reader.getValueClass());
+  }
+
+  @Override public void close() throws IOException { reader.close(); }
+
+  @Override public void remove() { throw new UnsupportedOperationException(); }
+
+  @Override public Iterator<Pair<K,V>> iterator() { return this; }
+
+  @Override public Schema getSchema() { return schema; }
+
+  private void prepare() throws IOException {
+    if (ready) return;
+    this.done = !reader.next(key);
+    ready = true;
+  }
+
+  @Override public boolean hasNext() {
+    try {
+      prepare();
+      return !done;
+    } catch (IOException e) {
+      throw new AvroRuntimeException(e);
+    }
+  }
+
+  @Override public Pair<K,V> next() {
+    try {
+      return next(null);
+    } catch (IOException e) {
+      throw new AvroRuntimeException(e);
+    }
+  }
+
+  @Override public Pair<K,V> next(Pair<K,V> reuse) throws IOException {
+    prepare();
+    if (!hasNext())
+      throw new NoSuchElementException();
+
+    Pair<K,V> result = reuse;
+    if (result == null)
+      result = new Pair<K,V>(schema);
+
+    result.key(keyConverter.convert(key));
+    reader.getCurrentValue(value);
+    result.value(valConverter.convert(value));
+
+    // swap key and spareKey
+    Writable k = key;
+    key = spareKey;
+    spareKey = k;
+
+    ready = false;
+
+    return result;
+  }
+
+  @Override public void sync(long position) throws IOException {
+    if (position > reader.getPosition())
+      reader.sync(position);
+    ready = false;
+  }
+  
+  @Override public boolean pastSync(long position) throws IOException {
+    return reader.getPosition() >= position && reader.syncSeen();
+  }
+
+  @Override public long tell() throws IOException {return reader.getPosition();}
+
+  private static final Map<Type,Schema> WRITABLE_SCHEMAS =
+    new HashMap<Type,Schema>();
+  static {
+    WRITABLE_SCHEMAS.put(NullWritable.class,
+                         Schema.create(Schema.Type.NULL));
+    WRITABLE_SCHEMAS.put(BooleanWritable.class,
+                         Schema.create(Schema.Type.BOOLEAN));
+    WRITABLE_SCHEMAS.put(IntWritable.class,
+                         Schema.create(Schema.Type.INT));
+    WRITABLE_SCHEMAS.put(LongWritable.class,
+                         Schema.create(Schema.Type.LONG));
+    WRITABLE_SCHEMAS.put(FloatWritable.class,
+                         Schema.create(Schema.Type.FLOAT));
+    WRITABLE_SCHEMAS.put(DoubleWritable.class,
+                         Schema.create(Schema.Type.DOUBLE));
+    WRITABLE_SCHEMAS.put(BytesWritable.class,
+                         Schema.create(Schema.Type.BYTES));
+    WRITABLE_SCHEMAS.put(Text.class,
+                         Schema.create(Schema.Type.STRING));
+  }
+
+  private static class WritableData extends ReflectData {
+    private static final WritableData INSTANCE = new WritableData();
+    protected WritableData() {}
+    
+    /** Return the singleton instance. */
+    public static WritableData get() { return INSTANCE; }
+
+    @Override public Schema getSchema(java.lang.reflect.Type type) {
+      if (WRITABLE_SCHEMAS.containsKey(type))
+        return WRITABLE_SCHEMAS.get(type);
+      else
+        return super.getSchema(type);
+    }
+  }
+
+  private interface Converter<T> {
+    T convert(Writable o);
+  }
+  
+  private static final Map<Type,Converter> WRITABLE_CONVERTERS =
+    new HashMap<Type,Converter>();
+  static {
+    WRITABLE_CONVERTERS.put
+      (NullWritable.class,
+       new Converter<Void>() {
+        public Void convert(Writable o) { return null; }
+      });
+    WRITABLE_CONVERTERS.put
+      (BooleanWritable.class,
+       new Converter<Boolean>() {
+        public Boolean convert(Writable o) {return ((BooleanWritable)o).get();}
+      });
+    WRITABLE_CONVERTERS.put
+      (IntWritable.class,
+       new Converter<Integer>() {
+        public Integer convert(Writable o) { return ((IntWritable)o).get(); }
+      });
+    WRITABLE_CONVERTERS.put
+      (LongWritable.class,
+       new Converter<Long>() {
+        public Long convert(Writable o) { return ((LongWritable)o).get(); }
+      });
+    WRITABLE_CONVERTERS.put
+      (FloatWritable.class,
+       new Converter<Float>() {
+        public Float convert(Writable o) { return ((FloatWritable)o).get(); }
+      });
+    WRITABLE_CONVERTERS.put
+      (DoubleWritable.class,
+       new Converter<Double>() {
+        public Double convert(Writable o) { return ((DoubleWritable)o).get(); }
+      });
+    WRITABLE_CONVERTERS.put
+      (BytesWritable.class,
+       new Converter<ByteBuffer>() {
+        public ByteBuffer convert(Writable o) {
+          BytesWritable b = (BytesWritable)o;
+          return ByteBuffer.wrap(b.getBytes(), 0, b.getLength());
+        }
+      });
+    WRITABLE_CONVERTERS.put
+      (Text.class,
+       new Converter<String>() {
+        public String convert(Writable o) { return o.toString(); }
+      });
+  }
+
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/SequenceFileRecordReader.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileSplit;
+
+/** A {@link org.apache.hadoop.mapred.RecordReader} for sequence files. */
+public class SequenceFileRecordReader<K,V> extends AvroRecordReader<Pair<K,V>> {
+
+  public SequenceFileRecordReader(JobConf job, FileSplit split)
+    throws IOException {
+    super(new SequenceFileReader<K,V>(split.getPath().toUri(), job),
+          split);
+  }
+  
+}
+

Modified: avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java?rev=998354&r1=998353&r2=998354&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java Fri Sep 17 22:22:35 2010
@@ -23,6 +23,7 @@ import java.io.PrintStream;
 import java.util.List;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.FileReader;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.generic.GenericDatumReader;
@@ -55,7 +56,7 @@ public class DataFileReadTool implements
     }
 
     GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
-    DataFileReader<Object> fileReader =
+    FileReader<Object> fileReader =
       new DataFileReader<Object>(new File(args.get(0)), reader);
     try {
       Schema schema = fileReader.getSchema();

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java?rev=998354&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java Fri Sep 17 22:22:35 2010
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.File;
+import java.net.URI;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.specific.SpecificDatumReader;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSequenceFileReader {
+  private static final int COUNT =
+    Integer.parseInt(System.getProperty("test.count", "10"));
+  private static final File DIR
+    = new File(System.getProperty("test.dir", "/tmp"));
+  private static final File FILE = new File(DIR, "test.seq");
+
+  @BeforeClass
+  public static void testWriteSequenceFile() throws IOException {
+    FILE.delete();
+    Configuration c = new Configuration();
+    URI uri = FILE.toURI();
+    SequenceFile.Writer writer
+      = new SequenceFile.Writer(FileSystem.get(uri, c), c,
+                                new Path(uri.toString()),
+                                LongWritable.class, Text.class);
+    final LongWritable key = new LongWritable();
+    final Text val = new Text();
+    for (int i = 0; i < COUNT; ++i) {
+      key.set(i);
+      val.set(Integer.toString(i));
+      writer.append(key, val);
+    }
+    writer.close();
+  }
+
+  @Test
+  public void testReadSequenceFile() throws Exception {
+    checkFile(new SequenceFileReader<Long,CharSequence>(FILE));
+  }
+
+  public void checkFile(FileReader<Pair<Long,CharSequence>> reader) throws Exception {
+    long i = 0;
+    for (Pair<Long,CharSequence> p : reader) {
+      assertEquals((Long)i, p.key());
+      assertEquals(Long.toString(i), p.value().toString());
+      i++;
+    }
+    assertEquals(COUNT, i);
+    reader.close();
+  }
+
+  @Test
+  public void testSequenceFileInputFormat() throws Exception {
+    JobConf job = new JobConf();
+    Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
+
+    output.getFileSystem(job).delete(output);
+    
+    Schema schema = Pair.getPairSchema(Schema.create(Schema.Type.LONG),
+                                       Schema.create(Schema.Type.STRING));
+
+    AvroJob.setInputSequenceFile(job);
+
+    AvroJob.setInputSchema(job, schema);
+    AvroJob.setOutputSchema(job, schema);
+
+    FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+    FileOutputFormat.setOutputPath(job, output);
+    
+    JobClient.runJob(job);
+
+    checkFile(new DataFileReader<Pair<Long,CharSequence>>
+              (new File(output.toString()+"/part-00000.avro"),
+               new SpecificDatumReader<Pair<Long,CharSequence>>()));
+  }
+
+}