You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/12/03 20:50:49 UTC
git commit: CRUNCH-119: Refactor the ReaderFactory classes for text
and sequence files, and add in the NLine and TextFileTable sources
Updated Branches:
refs/heads/master f26a7c731 -> 63050d0d4
CRUNCH-119: Refactor the ReaderFactory classes for text and sequence files, and add in the NLine and TextFileTable sources
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/63050d0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/63050d0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/63050d0d
Branch: refs/heads/master
Commit: 63050d0d49afc9d53bc3948270c49acbf337f3d9
Parents: f26a7c7
Author: Josh Wills <jw...@apache.org>
Authored: Wed Nov 21 16:49:02 2012 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Dec 3 11:14:35 2012 -0800
----------------------------------------------------------------------
.../apache/crunch/io/CompositePathIterableIT.java | 6 +-
.../it/java/org/apache/crunch/io/NLineInputIT.java | 69 ++++++++
.../java/org/apache/crunch/io/TextFileTableIT.java | 56 +++++++
.../java/org/apache/crunch/io/ReadableSource.java | 14 ++
.../crunch/io/avro/AvroFileReaderFactory.java | 5 +-
.../org/apache/crunch/io/avro/AvroFileSource.java | 13 +-
.../apache/crunch/io/impl/AutoClosingIterator.java | 2 +-
.../apache/crunch/io/impl/FileTableSourceImpl.java | 4 +
.../apache/crunch/io/seq/SeqFileReaderFactory.java | 25 ++-
.../org/apache/crunch/io/seq/SeqFileSource.java | 2 +-
.../crunch/io/seq/SeqFileTableReaderFactory.java | 99 ------------
.../apache/crunch/io/seq/SeqFileTableSource.java | 6 +-
.../java/org/apache/crunch/io/text/LineParser.java | 125 +++++++++++++++
.../org/apache/crunch/io/text/NLineFileSource.java | 77 +++++++++
.../crunch/io/text/TextFileReaderFactory.java | 32 +---
.../org/apache/crunch/io/text/TextFileSource.java | 4 +-
.../apache/crunch/io/text/TextFileTableSource.java | 76 +++++++++
.../crunch/io/text/TextFileTableSourceTarget.java | 63 ++++++++
.../org/apache/crunch/io/text/TextFileTarget.java | 2 +-
.../crunch/io/avro/AvroFileReaderFactoryTest.java | 2 +-
20 files changed, 532 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
index 796b821..08d226d 100644
--- a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
@@ -48,7 +48,7 @@ public class CompositePathIterableIT {
LocalFileSystem local = FileSystem.getLocal(conf);
Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath),
- new TextFileReaderFactory<String>(Writables.strings(), conf));
+ new TextFileReaderFactory<String>(Writables.strings()));
assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable));
@@ -62,7 +62,7 @@ public class CompositePathIterableIT {
LocalFileSystem local = FileSystem.getLocal(conf);
Iterable<String> iterable = CompositePathIterable.create(local, emptyInputDir,
- new TextFileReaderFactory<String>(Writables.strings(), conf));
+ new TextFileReaderFactory<String>(Writables.strings()));
assertTrue(Lists.newArrayList(iterable).isEmpty());
}
@@ -78,7 +78,7 @@ public class CompositePathIterableIT {
LocalFileSystem local = FileSystem.getLocal(conf);
CompositePathIterable.create(local, new Path(nonExistentDir.getAbsolutePath()), new TextFileReaderFactory<String>(
- Writables.strings(), conf));
+ Writables.strings()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java b/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
new file mode 100644
index 0000000..3b7abf6
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.text.NLineFileSource;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class NLineInputIT {
+
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void testNLine() throws Exception {
+ String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+ Pipeline pipeline = new MRPipeline(NLineInputIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<String> shakespeare = pipeline.read(new NLineFileSource<String>(shakesInputPath,
+ Writables.strings(), 100));
+ assertEquals(new Integer(100),
+ shakespeare.parallelDo(new LineCountFn(), Avros.ints()).max().getValue());
+ }
+
+ private static class LineCountFn extends DoFn<String, Integer> {
+
+ private int lineCount = 0;
+
+ @Override
+ public void initialize() {
+ this.lineCount = 0;
+ }
+
+ @Override
+ public void process(String input, Emitter<Integer> emitter) {
+ lineCount++;
+ }
+
+ @Override
+ public void cleanup(Emitter<Integer> emitter) {
+ emitter.emit(lineCount);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java b/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
new file mode 100644
index 0000000..bddc0b5
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.apache.crunch.types.writable.Writables.*;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.text.TextFileTableSource;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ *
+ */
+public class TextFileTableIT {
+
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void testTextFileTable() throws Exception {
+ String urlsFile = tmpDir.copyResourceFileName("urls.txt");
+ Pipeline pipeline = new MRPipeline(TextFileTableIT.class, tmpDir.getDefaultConfiguration());
+ PTable<String, String> urls = pipeline.read(
+ new TextFileTableSource<String, String>(urlsFile, tableOf(strings(), strings())));
+ Set<Pair<String, Long>> cnts = ImmutableSet.copyOf(urls.keys().count().materialize());
+ assertEquals(ImmutableSet.of(Pair.of("www.A.com", 4L), Pair.of("www.B.com", 2L),
+ Pair.of("www.C.com", 1L), Pair.of("www.D.com", 1L), Pair.of("www.E.com", 1L),
+ Pair.of("www.F.com", 2L)), cnts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
index 73a13a3..0407167 100644
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
@@ -22,6 +22,20 @@ import java.io.IOException;
import org.apache.crunch.Source;
import org.apache.hadoop.conf.Configuration;
+/**
+ * An extension of the {@code Source} interface that indicates that a
+ * {@code Source} instance may be read as a series of records by the client
+ * code. This is used to determine whether a {@code PCollection} instance can be
+ * materialized.
+ */
public interface ReadableSource<T> extends Source<T> {
+
+ /**
+ * Returns an {@code Iterable} that contains the contents of this source.
+ *
+ * @param conf The current {@code Configuration} instance
+ * @return the contents of this {@code Source} as an {@code Iterable} instance
+ * @throws IOException
+ */
Iterable<T> read(Configuration conf) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index c8ab8b8..2f8c1e3 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -33,7 +33,6 @@ import org.apache.crunch.io.FileReaderFactory;
import org.apache.crunch.io.impl.AutoClosingIterator;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -46,12 +45,10 @@ class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
private final DatumReader<T> recordReader;
private final MapFn<T, T> mapFn;
- private final Configuration conf;
- public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
+ public AvroFileReaderFactory(AvroType<T> atype) {
this.recordReader = AvroFileReaderFactory.createDatumReader(atype);
this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
- this.conf = conf;
}
static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 2226556..32b8054 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -33,11 +33,16 @@ import org.apache.hadoop.fs.Path;
public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
- public AvroFileSource(Path path, AvroType<T> ptype) {
- super(path, ptype, new InputBundle(AvroInputFormat.class)
+ private static <S> InputBundle getBundle(AvroType<S> ptype) {
+ InputBundle bundle = new InputBundle(AvroInputFormat.class)
.set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
.set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
- .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()));
+ .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());
+ return bundle;
+ }
+
+ public AvroFileSource(Path path, AvroType<T> ptype) {
+ super(path, ptype, getBundle(ptype));
}
@Override
@@ -48,6 +53,6 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour
@Override
public Iterable<T> read(Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
- return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype, conf));
+ return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
index d58f290..3bd802e 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
@@ -28,7 +28,7 @@ import com.google.common.io.Closeables;
* Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false. As long a client loops through to
* completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically.
*/
-public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Iterator<T>, Closeable {
+public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Closeable {
private final Iterator<T> iter;
private Closeable closeable;
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
index f6e8f1d..c7ea767 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
@@ -29,6 +29,10 @@ public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implem
super(path, tableType, formatClass);
}
+ public FileTableSourceImpl(Path path, PTableType<K, V> tableType, InputBundle bundle) {
+ super(path, tableType, bundle);
+ }
+
@Override
public PTableType<K, V> getTableType() {
return (PTableType<K, V>) getType();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
index aa5a00a..2f32746 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -25,8 +25,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.crunch.MapFn;
import org.apache.crunch.io.FileReaderFactory;
import org.apache.crunch.io.impl.AutoClosingIterator;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
@@ -40,23 +41,29 @@ class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class);
+ private final Converter converter;
private final MapFn<Object, T> mapFn;
private final Writable key;
private final Writable value;
- private final Configuration conf;
- public SeqFileReaderFactory(PType<T> ptype, Configuration conf) {
- this.mapFn = SeqFileHelper.getInputMapFn(ptype);
- this.key = NullWritable.get();
- this.value = SeqFileHelper.newInstance(ptype, conf);
- this.conf = conf;
+ public SeqFileReaderFactory(PType<T> ptype) {
+ this.converter = ptype.getConverter();
+ this.mapFn = ptype.getInputMapFn();
+ if (ptype instanceof PTableType) {
+ PTableType ptt = (PTableType) ptype;
+ this.key = SeqFileHelper.newInstance(ptt.getKeyType(), null);
+ this.value = SeqFileHelper.newInstance(ptt.getValueType(), null);
+ } else {
+ this.key = NullWritable.get();
+ this.value = SeqFileHelper.newInstance(ptype, null);
+ }
}
@Override
public Iterator<T> read(FileSystem fs, final Path path) {
mapFn.initialize();
try {
- final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, fs.getConf());
return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
boolean nextChecked = false;
boolean hasNext = false;
@@ -82,7 +89,7 @@ class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
return null;
}
nextChecked = false;
- return mapFn.map(value);
+ return mapFn.map(converter.convertInput(key, value));
}
});
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
index e8f3dcf..8fac4ae 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
@@ -37,7 +37,7 @@ public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSourc
@Override
public Iterable<T> read(Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
- return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype, conf));
+ return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
deleted file mode 100644
index 67259fb..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.seq;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.io.impl.AutoClosingIterator;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
-
-class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K, V>> {
-
- private static final Log LOG = LogFactory.getLog(SeqFileTableReaderFactory.class);
-
- private final MapFn<Object, K> keyMapFn;
- private final MapFn<Object, V> valueMapFn;
- private final Writable key;
- private final Writable value;
- private final Configuration conf;
-
- public SeqFileTableReaderFactory(PTableType<K, V> tableType, Configuration conf) {
- PType<K> keyType = tableType.getKeyType();
- PType<V> valueType = tableType.getValueType();
- this.keyMapFn = SeqFileHelper.getInputMapFn(keyType);
- this.valueMapFn = SeqFileHelper.getInputMapFn(valueType);
- this.key = SeqFileHelper.newInstance(keyType, conf);
- this.value = SeqFileHelper.newInstance(valueType, conf);
- this.conf = conf;
- }
-
- @Override
- public Iterator<Pair<K, V>> read(FileSystem fs, final Path path) {
- keyMapFn.initialize();
- valueMapFn.initialize();
- try {
- final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- return new AutoClosingIterator<Pair<K, V>>(reader, new UnmodifiableIterator<Pair<K, V>>() {
- boolean nextChecked = false;
- boolean hasNext = false;
-
- @Override
- public boolean hasNext() {
- if (nextChecked == true) {
- return hasNext;
- }
- try {
- hasNext = reader.next(key, value);
- nextChecked = true;
- return hasNext;
- } catch (IOException e) {
- LOG.info("Error reading from path: " + path, e);
- return false;
- }
- }
-
- @Override
- public Pair<K, V> next() {
- if (!nextChecked && !hasNext()) {
- return null;
- }
- nextChecked = false;
- return Pair.of(keyMapFn.map(key), valueMapFn.map(value));
- }
- });
- } catch (IOException e) {
- LOG.info("Could not read seqfile at path: " + path, e);
- return Iterators.emptyIterator();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
index 56ed985..7a63272 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
@@ -30,7 +30,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
/**
- *
+ * A {@code TableSource} that uses {@code SequenceFileInputFormat} to read the input
+ * file.
*/
public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implements ReadableSource<Pair<K, V>> {
@@ -45,7 +46,8 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen
@Override
public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
- return CompositePathIterable.create(fs, path, new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
+ return CompositePathIterable.create(fs, path,
+ new SeqFileReaderFactory<Pair<K, V>>(getTableType()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java b/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java
new file mode 100644
index 0000000..9438014
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.CompositeMapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * An abstraction for parsing the lines of a text file using a {@code PType<T>} to
+ * convert the lines of text into a given data type.
+ *
+ * @param <T> The type returned by the text parsing
+ */
+abstract class LineParser<T> {
+
+ public static <S> LineParser<S> forType(PType<S> ptype) {
+ return new SimpleLineParser<S>(ptype);
+ }
+
+ public static <K, V> LineParser<Pair<K, V>> forTableType(PTableType<K, V> ptt, String sep) {
+ return new KeyValueLineParser<K, V>(ptt, sep);
+ }
+
+ private MapFn<String, T> mapFn;
+
+ public void initialize() {
+ mapFn = getMapFn();
+ mapFn.initialize();
+ }
+
+ public T parse(String line) {
+ return mapFn.map(line);
+ }
+
+ protected abstract MapFn<String, T> getMapFn();
+
+ private static <T> MapFn<String, T> getMapFnForPType(PType<T> ptype) {
+ MapFn ret = null;
+ if (String.class.equals(ptype.getTypeClass())) {
+ ret = (MapFn) IdentityFn.getInstance();
+ } else {
+ // Check for a composite MapFn for the PType.
+ // Note that this won't work for Avro-- need to solve that.
+ ret = ptype.getInputMapFn();
+ if (ret instanceof CompositeMapFn) {
+ ret = ((CompositeMapFn) ret).getSecond();
+ }
+ }
+ return ret;
+ }
+
+ private static class SimpleLineParser<S> extends LineParser<S> {
+
+ private final PType<S> ptype;
+
+ public SimpleLineParser(PType<S> ptype) {
+ this.ptype = ptype;
+ }
+
+ @Override
+ protected MapFn<String, S> getMapFn() {
+ return getMapFnForPType(ptype);
+ }
+ }
+
+ private static class KeyValueLineParser<K, V> extends LineParser<Pair<K, V>> {
+
+ private final PTableType<K, V> ptt;
+ private final String sep;
+
+ public KeyValueLineParser(PTableType<K, V> ptt, String sep) {
+ this.ptt = ptt;
+ this.sep = sep;
+ }
+
+ @Override
+ protected MapFn<String, Pair<K, V>> getMapFn() {
+ final MapFn<String, K> keyMapFn = getMapFnForPType(ptt.getKeyType());
+ final MapFn<String, V> valueMapFn = getMapFnForPType(ptt.getValueType());
+
+ return new MapFn<String, Pair<K, V>>() {
+ @Override
+ public void initialize() {
+ keyMapFn.initialize();
+ valueMapFn.initialize();
+ }
+
+ @Override
+ public Pair<K, V> map(String input) {
+ List<String> kv = ImmutableList.copyOf(Splitter.on(sep).limit(1).split(input));
+ if (kv.size() != 2) {
+ throw new RuntimeException("Invalid input string: " + input);
+ }
+ return Pair.of(keyMapFn.map(kv.get(0)), valueMapFn.map(kv.get(1)));
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
new file mode 100644
index 0000000..d88ef4a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.io.IOException;
+
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.io.impl.InputBundle;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+
+/**
+ * A {@code Source} instance that uses the {@code NLineInputFormat}, which gives each map
+ * task a fraction of the lines in a text file as input. Most useful when running simulations
+ * on Hadoop, where each line represents configuration information about each simulation
+ * run.
+ */
+public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+
+ private static InputBundle getBundle(int linesPerTask) {
+ InputBundle bundle = new InputBundle(NLineInputFormat.class);
+ bundle.set(NLineInputFormat.LINES_PER_MAP, String.valueOf(linesPerTask));
+ return bundle;
+ }
+
+ /**
+ * Create a new {@code NLineFileSource} instance.
+ *
+ * @param path The path to the input data, as a String
+ * @param ptype The PType to use for processing the data
+ * @param linesPerTask The number of lines from the input each map task will process
+ */
+ public NLineFileSource(String path, PType<T> ptype, int linesPerTask) {
+ this(new Path(path), ptype, linesPerTask);
+ }
+
+ /**
+ * Create a new {@code NLineFileSource} instance.
+ *
+ * @param path The {@code Path} to the input data
+ * @param ptype The PType to use for processing the data
+ * @param linesPerTask The number of lines from the input each map task will process
+ */
+ public NLineFileSource(Path path, PType<T> ptype, int linesPerTask) {
+ super(path, ptype, getBundle(linesPerTask));
+ }
+
+ @Override
+ public String toString() {
+ return "NLine(" + path + ")";
+ }
+
+ @Override
+ public Iterable<T> read(Configuration conf) throws IOException {
+ return CompositePathIterable.create(path.getFileSystem(conf), path,
+ new TextFileReaderFactory<T>(LineParser.forType(ptype)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
index a0c48e0..e1fea6e 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
@@ -24,13 +24,9 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.fn.CompositeMapFn;
-import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.io.FileReaderFactory;
import org.apache.crunch.io.impl.AutoClosingIterator;
import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,28 +38,19 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
private static final Log LOG = LogFactory.getLog(TextFileReaderFactory.class);
- private final PType<T> ptype;
- private final Configuration conf;
+ private final LineParser<T> parser;
- public TextFileReaderFactory(PType<T> ptype, Configuration conf) {
- this.ptype = ptype;
- this.conf = conf;
+ public TextFileReaderFactory(PType<T> ptype) {
+ this(LineParser.forType(ptype));
+ }
+
+ public TextFileReaderFactory(LineParser<T> parser) {
+ this.parser = parser;
}
@Override
public Iterator<T> read(FileSystem fs, Path path) {
- MapFn mapFn = null;
- if (String.class.equals(ptype.getTypeClass())) {
- mapFn = IdentityFn.getInstance();
- } else {
- // Check for a composite MapFn for the PType.
- // Note that this won't work for Avro-- need to solve that.
- MapFn input = ptype.getInputMapFn();
- if (input instanceof CompositeMapFn) {
- mapFn = ((CompositeMapFn) input).getSecond();
- }
- }
- mapFn.initialize();
+ parser.initialize();
FSDataInputStream is;
try {
@@ -74,7 +61,6 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
}
final BufferedReader reader = new BufferedReader(new InputStreamReader(is));
- final MapFn<String, T> iterMapFn = mapFn;
return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
private String nextLine;
@@ -90,7 +76,7 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
@Override
public T next() {
- return iterMapFn.map(nextLine);
+ return parser.parse(nextLine);
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
index ee51c04..026fca9 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
@@ -67,7 +67,7 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements ReadableSour
@Override
public Iterable<T> read(Configuration conf) throws IOException {
- return CompositePathIterable.create(path.getFileSystem(conf), path, new TextFileReaderFactory<T>(ptype,
- conf));
+ return CompositePathIterable.create(path.getFileSystem(conf), path,
+ new TextFileReaderFactory<T>(LineParser.forType(ptype)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
new file mode 100644
index 0000000..c94676a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.io.IOException;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileTableSourceImpl;
+import org.apache.crunch.io.impl.InputBundle;
+import org.apache.crunch.types.PTableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+
+/**
+ * A {@code Source} that uses the {@code KeyValueTextInputFormat} to process
+ * input text. If a separator for the keys and values in the text file is not specified,
+ * a tab character is used.
+ */
+public class TextFileTableSource<K, V> extends FileTableSourceImpl<K, V>
+ implements ReadableSource<Pair<K, V>> {
+
+ private static InputBundle getBundle(String sep) {
+ InputBundle bundle = new InputBundle(KeyValueTextInputFormat.class);
+ bundle.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, sep);
+ return bundle;
+ }
+
+ private final String separator;
+
+ public TextFileTableSource(String path, PTableType<K, V> tableType) {
+ this(new Path(path), tableType);
+ }
+
+ public TextFileTableSource(Path path, PTableType<K, V> tableType) {
+ this(path, tableType, "\t");
+ }
+
+ public TextFileTableSource(String path, PTableType<K, V> tableType, String separator) {
+ this(new Path(path), tableType, separator);
+ }
+
+ public TextFileTableSource(Path path, PTableType<K, V> tableType, String separator) {
+ super(path, tableType, getBundle(separator));
+ this.separator = separator;
+ }
+
+ @Override
+ public String toString() {
+ return "KeyValueText(" + path + ")";
+ }
+
+ @Override
+ public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
+ return CompositePathIterable.create(path.getFileSystem(conf), path,
+ new TextFileReaderFactory<Pair<K, V>>(LineParser.forTableType(getTableType(), separator)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java
new file mode 100644
index 0000000..bdc83a1
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.PTableType;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@code TableSource} and {@code SourceTarget} implementation that uses the
+ * {@code KeyValueTextInputFormat} and {@code TextOutputFormat} to support reading
+ * and writing text files as {@code PTable} instances using a tab separator for
+ * the keys and the values.
+ */
+public class TextFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl<Pair<K, V>> implements
+ TableSource<K, V> {
+
+ private final PTableType<K, V> tableType;
+
+ public TextFileTableSourceTarget(String path, PTableType<K, V> tableType) {
+ this(new Path(path), tableType);
+ }
+
+ public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType) {
+ this(path, tableType, new SequentialFileNamingScheme());
+ }
+
+ public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType,
+ FileNamingScheme fileNamingScheme) {
+ super(new TextFileTableSource<K, V>(path, tableType), new TextFileTarget(path),
+ fileNamingScheme);
+ this.tableType = tableType;
+ }
+
+ @Override
+ public PTableType<K, V> getTableType() {
+ return tableType;
+ }
+
+ @Override
+ public String toString() {
+ return target.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index c7e06d3..ec7d521 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -73,7 +73,7 @@ public class TextFileTarget extends FileTargetImpl {
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
if (ptype instanceof PTableType) {
- return null;
+ return new TextFileTableSourceTarget(path, (PTableType) ptype);
}
return new TextFileSourceTarget<T>(path, ptype);
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
index 66863ba..62085f8 100644
--- a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
+++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
@@ -83,7 +83,7 @@ public class AvroFileReaderFactoryTest {
}
private <T> AvroFileReaderFactory<T> createFileReaderFactory(AvroType<T> avroType) {
- return new AvroFileReaderFactory<T>(avroType, new Configuration());
+ return new AvroFileReaderFactory<T>(avroType);
}
@Test