You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/12/03 20:50:49 UTC

git commit: CRUNCH-119: Refactor the ReaderFactory classes for text and sequence files, and add in the NLine and TextFileTable sources

Updated Branches:
  refs/heads/master f26a7c731 -> 63050d0d4


CRUNCH-119: Refactor the ReaderFactory classes for text and sequence files, and add in the NLine and TextFileTable sources


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/63050d0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/63050d0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/63050d0d

Branch: refs/heads/master
Commit: 63050d0d49afc9d53bc3948270c49acbf337f3d9
Parents: f26a7c7
Author: Josh Wills <jw...@apache.org>
Authored: Wed Nov 21 16:49:02 2012 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Dec 3 11:14:35 2012 -0800

----------------------------------------------------------------------
 .../apache/crunch/io/CompositePathIterableIT.java  |    6 +-
 .../it/java/org/apache/crunch/io/NLineInputIT.java |   69 ++++++++
 .../java/org/apache/crunch/io/TextFileTableIT.java |   56 +++++++
 .../java/org/apache/crunch/io/ReadableSource.java  |   14 ++
 .../crunch/io/avro/AvroFileReaderFactory.java      |    5 +-
 .../org/apache/crunch/io/avro/AvroFileSource.java  |   13 +-
 .../apache/crunch/io/impl/AutoClosingIterator.java |    2 +-
 .../apache/crunch/io/impl/FileTableSourceImpl.java |    4 +
 .../apache/crunch/io/seq/SeqFileReaderFactory.java |   25 ++-
 .../org/apache/crunch/io/seq/SeqFileSource.java    |    2 +-
 .../crunch/io/seq/SeqFileTableReaderFactory.java   |   99 ------------
 .../apache/crunch/io/seq/SeqFileTableSource.java   |    6 +-
 .../java/org/apache/crunch/io/text/LineParser.java |  125 +++++++++++++++
 .../org/apache/crunch/io/text/NLineFileSource.java |   77 +++++++++
 .../crunch/io/text/TextFileReaderFactory.java      |   32 +---
 .../org/apache/crunch/io/text/TextFileSource.java  |    4 +-
 .../apache/crunch/io/text/TextFileTableSource.java |   76 +++++++++
 .../crunch/io/text/TextFileTableSourceTarget.java  |   63 ++++++++
 .../org/apache/crunch/io/text/TextFileTarget.java  |    2 +-
 .../crunch/io/avro/AvroFileReaderFactoryTest.java  |    2 +-
 20 files changed, 532 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
index 796b821..08d226d 100644
--- a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
@@ -48,7 +48,7 @@ public class CompositePathIterableIT {
     LocalFileSystem local = FileSystem.getLocal(conf);
 
     Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath),
-        new TextFileReaderFactory<String>(Writables.strings(), conf));
+        new TextFileReaderFactory<String>(Writables.strings()));
 
     assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable));
 
@@ -62,7 +62,7 @@ public class CompositePathIterableIT {
     LocalFileSystem local = FileSystem.getLocal(conf);
 
     Iterable<String> iterable = CompositePathIterable.create(local, emptyInputDir,
-        new TextFileReaderFactory<String>(Writables.strings(), conf));
+        new TextFileReaderFactory<String>(Writables.strings()));
 
     assertTrue(Lists.newArrayList(iterable).isEmpty());
   }
@@ -78,7 +78,7 @@ public class CompositePathIterableIT {
     LocalFileSystem local = FileSystem.getLocal(conf);
 
     CompositePathIterable.create(local, new Path(nonExistentDir.getAbsolutePath()), new TextFileReaderFactory<String>(
-        Writables.strings(), conf));
+        Writables.strings()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java b/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
new file mode 100644
index 0000000..3b7abf6
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.text.NLineFileSource;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class NLineInputIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  @Test
+  public void testNLine() throws Exception {
+    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+    Pipeline pipeline = new MRPipeline(NLineInputIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> shakespeare = pipeline.read(new NLineFileSource<String>(shakesInputPath,
+        Writables.strings(), 100));
+    assertEquals(new Integer(100),
+        shakespeare.parallelDo(new LineCountFn(), Avros.ints()).max().getValue());
+  }
+  
+  private static class LineCountFn extends DoFn<String, Integer> {
+
+    private int lineCount = 0;
+    
+    @Override
+    public void initialize() {
+      this.lineCount = 0;
+    }
+    
+    @Override
+    public void process(String input, Emitter<Integer> emitter) {
+      lineCount++;
+    }
+    
+    @Override
+    public void cleanup(Emitter<Integer> emitter) {
+      emitter.emit(lineCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java b/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
new file mode 100644
index 0000000..bddc0b5
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.apache.crunch.types.writable.Writables.*;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.text.TextFileTableSource;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ *
+ */
+public class TextFileTableIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  @Test
+  public void testTextFileTable() throws Exception {
+    String urlsFile = tmpDir.copyResourceFileName("urls.txt");
+    Pipeline pipeline = new MRPipeline(TextFileTableIT.class, tmpDir.getDefaultConfiguration());
+    PTable<String, String> urls = pipeline.read(
+        new TextFileTableSource<String, String>(urlsFile, tableOf(strings(), strings())));
+    Set<Pair<String, Long>> cnts = ImmutableSet.copyOf(urls.keys().count().materialize());
+    assertEquals(ImmutableSet.of(Pair.of("www.A.com", 4L), Pair.of("www.B.com", 2L),
+        Pair.of("www.C.com", 1L), Pair.of("www.D.com", 1L), Pair.of("www.E.com", 1L),
+        Pair.of("www.F.com", 2L)), cnts);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
index 73a13a3..0407167 100644
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
@@ -22,6 +22,20 @@ import java.io.IOException;
 import org.apache.crunch.Source;
 import org.apache.hadoop.conf.Configuration;
 
+/**
+ * An extension of the {@code Source} interface that indicates that a
+ * {@code Source} instance may be read as a series of records by the client
+ * code. This is used to determine whether a {@code PCollection} instance can be
+ * materialized.
+ */
 public interface ReadableSource<T> extends Source<T> {
+
+  /**
+   * Returns an {@code Iterable} that contains the contents of this source.
+   * 
+   * @param conf The current {@code Configuration} instance
+   * @return the contents of this {@code Source} as an {@code Iterable} instance
+   * @throws IOException
+   */
   Iterable<T> read(Configuration conf) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index c8ab8b8..2f8c1e3 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -33,7 +33,6 @@ import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.impl.AutoClosingIterator;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -46,12 +45,10 @@ class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
 
   private final DatumReader<T> recordReader;
   private final MapFn<T, T> mapFn;
-  private final Configuration conf;
 
-  public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
+  public AvroFileReaderFactory(AvroType<T> atype) {
     this.recordReader = AvroFileReaderFactory.createDatumReader(atype);
     this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
-    this.conf = conf;
   }
 
   static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 2226556..32b8054 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -33,11 +33,16 @@ import org.apache.hadoop.fs.Path;
 
 public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
 
-  public AvroFileSource(Path path, AvroType<T> ptype) {
-    super(path, ptype, new InputBundle(AvroInputFormat.class)
+  private static <S> InputBundle getBundle(AvroType<S> ptype) {
+    InputBundle bundle = new InputBundle(AvroInputFormat.class)
         .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
         .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
-        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()));
+        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());
+    return bundle;
+  }
+  
+  public AvroFileSource(Path path, AvroType<T> ptype) {
+    super(path, ptype, getBundle(ptype));
   }
 
   @Override
@@ -48,6 +53,6 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
     FileSystem fs = path.getFileSystem(conf);
-    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype, conf));
+    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
index d58f290..3bd802e 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
@@ -28,7 +28,7 @@ import com.google.common.io.Closeables;
  * Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false.  As long a client loops through to
  * completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically.
  */
-public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Iterator<T>, Closeable {
+public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Closeable {
   private final Iterator<T> iter;
   private Closeable closeable;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
index f6e8f1d..c7ea767 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
@@ -29,6 +29,10 @@ public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implem
     super(path, tableType, formatClass);
   }
 
+  public FileTableSourceImpl(Path path, PTableType<K, V> tableType, InputBundle bundle) {
+    super(path, tableType, bundle);
+  }
+  
   @Override
   public PTableType<K, V> getTableType() {
     return (PTableType<K, V>) getType();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
index aa5a00a..2f32746 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -25,8 +25,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.impl.AutoClosingIterator;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
@@ -40,23 +41,29 @@ class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
 
   private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class);
 
+  private final Converter converter;
   private final MapFn<Object, T> mapFn;
   private final Writable key;
   private final Writable value;
-  private final Configuration conf;
 
-  public SeqFileReaderFactory(PType<T> ptype, Configuration conf) {
-    this.mapFn = SeqFileHelper.getInputMapFn(ptype);
-    this.key = NullWritable.get();
-    this.value = SeqFileHelper.newInstance(ptype, conf);
-    this.conf = conf;
+  public SeqFileReaderFactory(PType<T> ptype) {
+    this.converter = ptype.getConverter();
+    this.mapFn = ptype.getInputMapFn();
+    if (ptype instanceof PTableType) {
+      PTableType ptt = (PTableType) ptype;
+      this.key = SeqFileHelper.newInstance(ptt.getKeyType(), null);
+      this.value = SeqFileHelper.newInstance(ptt.getValueType(), null);
+    } else {
+      this.key = NullWritable.get();
+      this.value = SeqFileHelper.newInstance(ptype, null);
+    }
   }
 
   @Override
   public Iterator<T> read(FileSystem fs, final Path path) {
     mapFn.initialize();
     try {
-      final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, fs.getConf());
       return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
         boolean nextChecked = false;
         boolean hasNext = false;
@@ -82,7 +89,7 @@ class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
             return null;
           }
           nextChecked = false;
-          return mapFn.map(value);
+          return mapFn.map(converter.convertInput(key, value));
         }
       });
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
index e8f3dcf..8fac4ae 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
@@ -37,7 +37,7 @@ public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSourc
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
     FileSystem fs = path.getFileSystem(conf);
-    return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype, conf));
+    return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
deleted file mode 100644
index 67259fb..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.seq;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.io.impl.AutoClosingIterator;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
-
-class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K, V>> {
-
-  private static final Log LOG = LogFactory.getLog(SeqFileTableReaderFactory.class);
-
-  private final MapFn<Object, K> keyMapFn;
-  private final MapFn<Object, V> valueMapFn;
-  private final Writable key;
-  private final Writable value;
-  private final Configuration conf;
-
-  public SeqFileTableReaderFactory(PTableType<K, V> tableType, Configuration conf) {
-    PType<K> keyType = tableType.getKeyType();
-    PType<V> valueType = tableType.getValueType();
-    this.keyMapFn = SeqFileHelper.getInputMapFn(keyType);
-    this.valueMapFn = SeqFileHelper.getInputMapFn(valueType);
-    this.key = SeqFileHelper.newInstance(keyType, conf);
-    this.value = SeqFileHelper.newInstance(valueType, conf);
-    this.conf = conf;
-  }
-
-  @Override
-  public Iterator<Pair<K, V>> read(FileSystem fs, final Path path) {
-    keyMapFn.initialize();
-    valueMapFn.initialize();
-    try {
-      final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-      return new AutoClosingIterator<Pair<K, V>>(reader, new UnmodifiableIterator<Pair<K, V>>() {
-        boolean nextChecked = false;
-        boolean hasNext = false;
-
-        @Override
-        public boolean hasNext() {
-          if (nextChecked == true) {
-            return hasNext;
-          }
-          try {
-            hasNext = reader.next(key, value);
-            nextChecked = true;
-            return hasNext;
-          } catch (IOException e) {
-            LOG.info("Error reading from path: " + path, e);
-            return false;
-          }
-        }
-
-        @Override
-        public Pair<K, V> next() {
-          if (!nextChecked && !hasNext()) {
-            return null;
-          }
-          nextChecked = false;
-          return Pair.of(keyMapFn.map(key), valueMapFn.map(value));
-        }
-      });
-    } catch (IOException e) {
-      LOG.info("Could not read seqfile at path: " + path, e);
-      return Iterators.emptyIterator();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
index 56ed985..7a63272 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
@@ -30,7 +30,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 
 /**
- *
+ * A {@code TableSource} that uses {@code SequenceFileInputFormat} to read the input
+ * file.
  */
 public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implements ReadableSource<Pair<K, V>> {
 
@@ -45,7 +46,8 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen
   @Override
   public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
     FileSystem fs = path.getFileSystem(conf);
-    return CompositePathIterable.create(fs, path, new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
+    return CompositePathIterable.create(fs, path,
+        new SeqFileReaderFactory<Pair<K, V>>(getTableType()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java b/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java
new file mode 100644
index 0000000..9438014
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.CompositeMapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * An abstraction for parsing the lines of a text file using a {@code PType<T>} to
+ * convert the lines of text into a given data type. 
+ *
+ * @param <T> The type returned by the text parsing
+ */
+abstract class LineParser<T> {
+
+  public static <S> LineParser<S> forType(PType<S> ptype) {
+    return new SimpleLineParser<S>(ptype);
+  }
+  
+  public static <K, V> LineParser<Pair<K, V>> forTableType(PTableType<K, V> ptt, String sep) {
+    return new KeyValueLineParser<K, V>(ptt, sep); 
+  }
+  
+  private MapFn<String, T> mapFn;
+  
+  public void initialize() {
+    mapFn = getMapFn();
+    mapFn.initialize();
+  }
+    
+  public T parse(String line) {
+    return mapFn.map(line);
+  }
+  
+  protected abstract MapFn<String, T> getMapFn();
+  
+  private static <T> MapFn<String, T> getMapFnForPType(PType<T> ptype) {
+    MapFn ret = null;
+    if (String.class.equals(ptype.getTypeClass())) {
+      ret = (MapFn) IdentityFn.getInstance();
+    } else {
+      // Check for a composite MapFn for the PType.
+      // Note that this won't work for Avro-- need to solve that.
+      ret = ptype.getInputMapFn();
+      if (ret instanceof CompositeMapFn) {
+        ret = ((CompositeMapFn) ret).getSecond();
+      }
+    }
+    return ret;
+  }
+  
+  private static class SimpleLineParser<S> extends LineParser<S> {
+
+    private final PType<S> ptype;
+    
+    public SimpleLineParser(PType<S> ptype) {
+      this.ptype = ptype;
+    }
+
+    @Override
+    protected MapFn<String, S> getMapFn() {
+      return getMapFnForPType(ptype);
+    }
+  }
+  
+  private static class KeyValueLineParser<K, V> extends LineParser<Pair<K, V>> {
+
+    private final PTableType<K, V> ptt;
+    private final String sep;
+    
+    public KeyValueLineParser(PTableType<K, V> ptt, String sep) {
+      this.ptt = ptt;
+      this.sep = sep;
+    }
+
+    @Override
+    protected MapFn<String, Pair<K, V>> getMapFn() {
+      final MapFn<String, K> keyMapFn = getMapFnForPType(ptt.getKeyType());
+      final MapFn<String, V> valueMapFn = getMapFnForPType(ptt.getValueType());
+      
+      return new MapFn<String, Pair<K, V>>() {
+        @Override
+        public void initialize() {
+          keyMapFn.initialize();
+          valueMapFn.initialize();
+        }
+        
+        @Override
+        public Pair<K, V> map(String input) {
+          List<String> kv = ImmutableList.copyOf(Splitter.on(sep).limit(1).split(input));
+          if (kv.size() != 2) {
+            throw new RuntimeException("Invalid input string: " + input);
+          }
+          return Pair.of(keyMapFn.map(kv.get(0)), valueMapFn.map(kv.get(1)));
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
new file mode 100644
index 0000000..d88ef4a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.io.IOException;
+
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.io.impl.InputBundle;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+
+/**
+ * A {@code Source} instance that uses the {@code NLineInputFormat}, which gives each map
+ * task a fraction of the lines in a text file as input. Most useful when running simulations
+ * on Hadoop, where each line represents configuration information about each simulation
+ * run.
+ */
+public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+
+  private static InputBundle getBundle(int linesPerTask) {
+    InputBundle bundle = new InputBundle(NLineInputFormat.class);
+    bundle.set(NLineInputFormat.LINES_PER_MAP, String.valueOf(linesPerTask));
+    return bundle;
+  }
+  
+  /**
+   * Create a new {@code NLineFileSource} instance.
+   * 
+   * @param path The path to the input data, as a String
+   * @param ptype The PType to use for processing the data
+   * @param linesPerTask The number of lines from the input each map task will process
+   */
+  public NLineFileSource(String path, PType<T> ptype, int linesPerTask) {
+    this(new Path(path), ptype, linesPerTask);
+  }
+  
+  /**
+   * Create a new {@code NLineFileSource} instance.
+   *  
+   * @param path The {@code Path} to the input data
+   * @param ptype The PType to use for processing the data
+   * @param linesPerTask The number of lines from the input each map task will process
+   */
+  public NLineFileSource(Path path, PType<T> ptype, int linesPerTask) {
+    super(path, ptype, getBundle(linesPerTask));
+  }
+
+  @Override
+  public String toString() {
+    return "NLine(" + path + ")";
+  }
+  
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    return CompositePathIterable.create(path.getFileSystem(conf), path,
+        new TextFileReaderFactory<T>(LineParser.forType(ptype)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
index a0c48e0..e1fea6e 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
@@ -24,13 +24,9 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.fn.CompositeMapFn;
-import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.impl.AutoClosingIterator;
 import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,28 +38,19 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
 
   private static final Log LOG = LogFactory.getLog(TextFileReaderFactory.class);
 
-  private final PType<T> ptype;
-  private final Configuration conf;
+  private final LineParser<T> parser;
 
-  public TextFileReaderFactory(PType<T> ptype, Configuration conf) {
-    this.ptype = ptype;
-    this.conf = conf;
+  public TextFileReaderFactory(PType<T> ptype) {
+    this(LineParser.forType(ptype));
+  }
+  
+  public TextFileReaderFactory(LineParser<T> parser) {
+    this.parser = parser;
   }
 
   @Override
   public Iterator<T> read(FileSystem fs, Path path) {
-    MapFn mapFn = null;
-    if (String.class.equals(ptype.getTypeClass())) {
-      mapFn = IdentityFn.getInstance();
-    } else {
-      // Check for a composite MapFn for the PType.
-      // Note that this won't work for Avro-- need to solve that.
-      MapFn input = ptype.getInputMapFn();
-      if (input instanceof CompositeMapFn) {
-        mapFn = ((CompositeMapFn) input).getSecond();
-      }
-    }
-    mapFn.initialize();
+    parser.initialize();
 
     FSDataInputStream is;
     try {
@@ -74,7 +61,6 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
     }
 
     final BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-    final MapFn<String, T> iterMapFn = mapFn;
     return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
       private String nextLine;
 
@@ -90,7 +76,7 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
 
       @Override
       public T next() {
-        return iterMapFn.map(nextLine);
+        return parser.parse(nextLine);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
index ee51c04..026fca9 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
@@ -67,7 +67,7 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements ReadableSour
 
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    return CompositePathIterable.create(path.getFileSystem(conf), path, new TextFileReaderFactory<T>(ptype,
-        conf));
+    return CompositePathIterable.create(path.getFileSystem(conf), path,
+        new TextFileReaderFactory<T>(LineParser.forType(ptype)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
new file mode 100644
index 0000000..c94676a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.io.IOException;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileTableSourceImpl;
+import org.apache.crunch.io.impl.InputBundle;
+import org.apache.crunch.types.PTableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+
+/** 
+ * A {@code Source} that uses the {@code KeyValueTextInputFormat} to process
+ * input text. If a separator for the keys and values in the text file is not specified,
+ * a tab character is used. 
+ */
+public class TextFileTableSource<K, V> extends FileTableSourceImpl<K, V>
+    implements ReadableSource<Pair<K, V>> {
+
+  private static InputBundle getBundle(String sep) {
+    InputBundle bundle = new InputBundle(KeyValueTextInputFormat.class);
+    bundle.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, sep);
+    return bundle;
+  }
+  
+  private final String separator;
+  
+  public TextFileTableSource(String path, PTableType<K, V> tableType) {
+    this(new Path(path), tableType);
+  }
+  
+  public TextFileTableSource(Path path, PTableType<K, V> tableType) {
+    this(path, tableType, "\t");
+  }
+  
+  public TextFileTableSource(String path, PTableType<K, V> tableType, String separator) {
+    this(new Path(path), tableType, separator);
+  }
+  
+  public TextFileTableSource(Path path, PTableType<K, V> tableType, String separator) {
+    super(path, tableType, getBundle(separator));
+    this.separator = separator;
+  }
+
+  @Override
+  public String toString() {
+    return "KeyValueText(" + path + ")";
+  }
+
+  @Override
+  public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
+    return CompositePathIterable.create(path.getFileSystem(conf), path,
+        new TextFileReaderFactory<Pair<K, V>>(LineParser.forTableType(getTableType(), separator)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java
new file mode 100644
index 0000000..bdc83a1
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.PTableType;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@code TableSource} and {@code SourceTarget} implementation that uses the
+ * {@code KeyValueTextInputFormat} and {@code TextOutputFormat} to support reading
+ * and writing text files as {@code PTable} instances using a tab separator for
+ * the keys and the values.
+ */
+public class TextFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl<Pair<K, V>> implements
+    TableSource<K, V> {
+
+  private final PTableType<K, V> tableType;
+  
+  public TextFileTableSourceTarget(String path, PTableType<K, V> tableType) {
+    this(new Path(path), tableType);
+  }
+
+  public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType) {
+    this(path, tableType, new SequentialFileNamingScheme());
+  }
+
+  public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType,
+      FileNamingScheme fileNamingScheme) {
+    super(new TextFileTableSource<K, V>(path, tableType), new TextFileTarget(path),
+        fileNamingScheme);
+    this.tableType = tableType;
+  }
+
+  @Override
+  public PTableType<K, V> getTableType() {
+    return tableType;
+  }
+
+  @Override
+  public String toString() {
+    return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index c7e06d3..ec7d521 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -73,7 +73,7 @@ public class TextFileTarget extends FileTargetImpl {
   @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
     if (ptype instanceof PTableType) {
-      return null;
+      return new TextFileTableSourceTarget(path, (PTableType) ptype);
     }
     return new TextFileSourceTarget<T>(path, ptype);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
index 66863ba..62085f8 100644
--- a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
+++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
@@ -83,7 +83,7 @@ public class AvroFileReaderFactoryTest {
   }
 
   private <T> AvroFileReaderFactory<T> createFileReaderFactory(AvroType<T> avroType) {
-    return new AvroFileReaderFactory<T>(avroType, new Configuration());
+    return new AvroFileReaderFactory<T>(avroType);
   }
 
   @Test