You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/12/13 20:20:56 UTC

git commit: CRUNCH-97: Add support for parsing structured records out of text files to crunch-contrib.

Updated Branches:
  refs/heads/master 4a3aa0dfd -> ac317a185


CRUNCH-97: Add support for parsing structured records out of text files to crunch-contrib.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/ac317a18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/ac317a18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/ac317a18

Branch: refs/heads/master
Commit: ac317a185401cb0ef1afe7096b51ac29106eb1e2
Parents: 4a3aa0d
Author: Josh Wills <jw...@apache.org>
Authored: Mon Oct 15 20:04:18 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Dec 13 11:18:14 2012 -0800

----------------------------------------------------------------------
 .../contrib/text/AbstractCompositeExtractor.java   |  100 +++
 .../contrib/text/AbstractSimpleExtractor.java      |   97 +++
 .../org/apache/crunch/contrib/text/Extractor.java  |   67 ++
 .../apache/crunch/contrib/text/ExtractorStats.java |   59 ++
 .../org/apache/crunch/contrib/text/Extractors.java |  548 +++++++++++++++
 .../java/org/apache/crunch/contrib/text/Parse.java |  132 ++++
 .../org/apache/crunch/contrib/text/Tokenizer.java  |  135 ++++
 .../crunch/contrib/text/TokenizerFactory.java      |  173 +++++
 .../org/apache/crunch/contrib/text/ParseTest.java  |  120 ++++
 9 files changed, 1431 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java
new file mode 100644
index 0000000..f26fc57
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Base class for {@code Extractor} instances that delegates the parsing of fields to other
+ * {@code Extractor} instances, primarily used for constructing composite records that implement
+ * the {@code Tuple} interface.
+ */
+public abstract class AbstractCompositeExtractor<T> implements Extractor<T> {
+
+  private final TokenizerFactory tokenizerFactory;
+  private int errors = 0;
+  private boolean errorOnLast;
+  private final List<Extractor<?>> extractors;
+  
+  public AbstractCompositeExtractor(TokenizerFactory scannerFactory, List<Extractor<?>> extractors) {
+    Preconditions.checkArgument(extractors.size() > 0);
+    this.tokenizerFactory = scannerFactory;
+    this.extractors = extractors;
+  }
+
+  @Override
+  public T extract(String input) {
+    errorOnLast = false;
+    Tokenizer tokenizer = tokenizerFactory.create(input);
+    Object[] values = new Object[extractors.size()];
+    try {
+      for (int i = 0; i < values.length; i++) {
+        values[i] = extractors.get(i).extract(tokenizer.next());
+        if (extractors.get(i).errorOnLastRecord() && !errorOnLast) {
+          errors++;
+          errorOnLast = true;
+        }
+      }
+    } catch (Exception e) {
+      if (!errorOnLast) {
+        errors++;
+        errorOnLast = true;
+      }
+      return getDefaultValue();
+    }
+    
+    return doCreate(values);
+  }
+  
+  @Override
+  public void initialize() {
+    this.errors = 0;
+    this.errorOnLast = false;
+    for (Extractor<?> x : extractors) {
+      x.initialize();
+    }
+  }
+  
+  @Override
+  public boolean errorOnLastRecord() {
+    return errorOnLast;
+  }
+  
+  @Override
+  public ExtractorStats getStats() {
+    return new ExtractorStats(errors, Lists.transform(extractors, new Function<Extractor<?>, Integer>() {
+      @Override
+      public Integer apply(Extractor<?> input) {
+        return input.getStats().getErrorCount();
+      }
+    }));
+  }
+  
+  /**
+   * Subclasses should return a new instance of the object based on the fields parsed by
+   * the {@code Extractor} instances for this composite {@code Extractor} instance.
+   * 
+   * @param values The values that were extracted by the component {@code Extractor} objects
+   * @return A new instance of the composite class for this {@code Extractor}
+   */
+  protected abstract T doCreate(Object[] values);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java
new file mode 100644
index 0000000..9959b44
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Base class for the common case {@code Extractor} instances that construct a single
+ * object from a block of text stored in a {@code String}, with support for error handling
+ * and reporting. 
+ */
+public abstract class AbstractSimpleExtractor<T> implements Extractor<T> {
+
+  private static final Log LOG = LogFactory.getLog(AbstractSimpleExtractor.class);
+  private static final int LOG_ERROR_LIMIT = 100;
+  
+  private int errors;
+  private boolean errorOnLast;
+  private final T defaultValue;
+  private final TokenizerFactory scannerFactory;
+  
+  public AbstractSimpleExtractor(T defaultValue) {
+    this(defaultValue, TokenizerFactory.getDefaultInstance());
+  }
+  
+  public AbstractSimpleExtractor(T defaultValue, TokenizerFactory scannerFactory) {
+    this.defaultValue = defaultValue;
+    this.scannerFactory = scannerFactory;
+  }
+
+  @Override
+  public void initialize() {
+    this.errors = 0;
+    this.errorOnLast = false;
+  }
+  
+  @Override
+  public T extract(String input) {
+    errorOnLast = false;
+    T res = defaultValue;
+    try {
+      res = doExtract(scannerFactory.create(input));
+    } catch (Exception e) {
+      errorOnLast = true;
+      errors++;
+      if (errors < LOG_ERROR_LIMIT) {
+        String msg = String.format("Error occurred parsing input '%s' using extractor %s",
+            input, this); 
+        LOG.error(msg, e);
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public boolean errorOnLastRecord() {
+    return errorOnLast;
+  }
+  
+  @Override
+  public T getDefaultValue() {
+    return defaultValue;
+  }
+  
+  @Override
+  public ExtractorStats getStats() {
+    return new ExtractorStats(errors);
+  }
+  
+  /**
+   * Subclasses must override this method to return a new instance of the
+   * class that this {@code Extractor} instance is designed to parse.
+   * <p>Any runtime parsing exceptions from the given {@code Tokenizer} instance
+   * should be thrown so that they may be caught by the error handling logic
+   * inside of this class.
+   * 
+   * @param tokenizer The {@code Tokenizer} instance for the current record
+   * @return A new instance of the type defined for this class
+   */
+  protected abstract T doExtract(Tokenizer tokenizer);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java
new file mode 100644
index 0000000..1e7f6b7
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.io.Serializable;
+
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * An interface for extracting a specific data type from a text string that
+ * is being processed by a {@code Scanner} object.
+ *
+ * @param <T> The data type to be extracted
+ */
+public interface Extractor<T> extends Serializable {
+  
+  /**
+   * Extract a value with the type of this instance.
+   */
+  T extract(String input);
+  
+  /**
+   * Returns the {@code PType} associated with this data type for the
+   * given {@code PTypeFamily}.
+   */
+  PType<T> getPType(PTypeFamily ptf);
+  
+  /**
+   * Returns the default value for this {@code Extractor} in case of an
+   * error.
+   */
+  T getDefaultValue();
+  
+  /**
+   * Perform any initialization required by this {@code Extractor} during the
+   * start of a map or reduce task.
+   */
+  void initialize();
+  
+  /**
+   * Returns true if the last call to {@code extract} on this instance
+   * threw an exception that was handled.
+   */
+  boolean errorOnLastRecord();
+  
+  /**
+   * Return statistics about how many errors this {@code Extractor} instance
+   * encountered while parsing input data.
+   */
+  ExtractorStats getStats();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java
new file mode 100644
index 0000000..7dc37f4
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Records the number of kind of errors that an {@code Extractor} encountered when parsing
+ * input data.
+ */
+public class ExtractorStats {
+
+  private final int errorCount;
+  private final List<Integer> fieldErrors;
+  
+  public ExtractorStats(int errorCount) {
+    this(errorCount, ImmutableList.<Integer>of());
+  }
+  
+  public ExtractorStats(int errorCount, List<Integer> fieldErrors) {
+    this.errorCount = errorCount;
+    this.fieldErrors = fieldErrors;
+  }
+  
+  /**
+   * The overall number of records that had some kind of parsing error.
+   * @return The overall number of records that had some kind of parsing error 
+   */
+  public int getErrorCount() {
+    return errorCount;
+  }
+  
+  /**
+   * Returns the number of errors that occurred when parsing the individual fields of
+   * a composite record type, like a {@code Pair} or {@code TupleN}.
+   * @return The number of errors that occurred when parsing the individual fields of
+   * a composite record type
+   */
+  public List<Integer> getFieldErrors() {
+    return fieldErrors;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java
new file mode 100644
index 0000000..0ed1282
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.Scanner;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Factory methods for constructing common {@code Extractor} types.
+ */
+public class Extractors {
+  
+  /**
+   * Returns an Extractor for integers.
+   */
+  public static Extractor<Integer> xint() {
+    return xint(0);
+  }
+
+  /**
+   * Returns an Extractor for integers.
+   */
+  public static Extractor<Integer> xint(Integer defaultValue) {
+    return new IntExtractor(defaultValue);
+  }
+
+  /**
+   * Returns an Extractor for longs.
+   */
+  public static Extractor<Long> xlong() {
+    return xlong(0L);
+  }
+  
+  /**
+   * Returns an Extractor for longs.
+   */
+  public static Extractor<Long> xlong(Long defaultValue) {
+    return new LongExtractor(defaultValue);
+  }
+  
+  /**
+   * Returns an Extractor for floats.
+   */
+  public static Extractor<Float> xfloat() {
+    return xfloat(0f);
+  }
+  
+  public static Extractor<Float> xfloat(Float defaultValue) {
+    return new FloatExtractor(defaultValue);
+  }
+  
+  /**
+   * Returns an Extractor for doubles.
+   */
+  public static Extractor<Double> xdouble() {
+    return xdouble(0.0);
+  }
+
+  public static Extractor<Double> xdouble(Double defaultValue) {
+    return new DoubleExtractor(defaultValue);
+  }
+  
+  /**
+   * Returns an Extractor for booleans.
+   */
+  public static Extractor<Boolean> xboolean() {
+    return xboolean(false);
+  }
+
+  public static Extractor<Boolean> xboolean(Boolean defaultValue) {
+    return new BooleanExtractor(defaultValue);
+  }
+
+  /**
+   * Returns an Extractor for strings.
+   */
+  public static Extractor<String> xstring() {
+    return xstring("");
+  }
+
+  public static Extractor<String> xstring(String defaultValue) {
+    return new StringExtractor(defaultValue);
+  }
+
+  public static <T> Extractor<Collection<T>> xcollect(TokenizerFactory scannerFactory, Extractor<T> extractor) {
+    return new CollectionExtractor<T>(scannerFactory, extractor);
+  }
+  
+  /**
+   * Returns an Extractor for pairs of the given types that uses the given {@code TokenizerFactory}
+   * for parsing the sub-fields.
+   */
+  public static <K, V> Extractor<Pair<K, V>> xpair(TokenizerFactory scannerFactory,
+      Extractor<K> one, Extractor<V> two) {
+    return new PairExtractor<K, V>(scannerFactory, one, two);
+  }
+  
+  /**
+   * Returns an Extractor for triples of the given types that uses the given {@code TokenizerFactory}
+   * for parsing the sub-fields.
+   */
+  public static <A, B, C> Extractor<Tuple3<A, B, C>> xtriple(TokenizerFactory scannerFactory, Extractor<A> a,
+      Extractor<B> b, Extractor<C> c) {
+    return new TripExtractor<A, B, C>(scannerFactory, a, b, c);
+  }
+  
+  /**
+   * Returns an Extractor for quads of the given types that uses the given {@code TokenizerFactory}
+   * for parsing the sub-fields.
+   */
+  public static <A, B, C, D> Extractor<Tuple4<A, B, C, D>> xquad(TokenizerFactory scannerFactory, Extractor<A> a,
+      Extractor<B> b, Extractor<C> c, Extractor<D> d) {
+    return new QuadExtractor<A, B, C, D>(scannerFactory, a, b, c, d);
+  }
+  
+  /**
+   * Returns an Extractor for an arbitrary number of types that uses the given {@code TokenizerFactory}
+   * for parsing the sub-fields.
+   */
+  public static Extractor<TupleN> xtupleN(TokenizerFactory scannerFactory, Extractor...extractors) {
+    return new TupleNExtractor(scannerFactory, extractors);
+  }
+  
+  /**
+   * Returns an Extractor for a subclass of {@code Tuple} with a constructor that
+   * has the given extractor types that uses the given {@code TokenizerFactory}
+   * for parsing the sub-fields.
+   */
+  public static <T extends Tuple> Extractor<T> xcustom(Class<T> clazz, TokenizerFactory scannerFactory, Extractor... extractors) {
+    return new CustomTupleExtractor<T>(scannerFactory, clazz, extractors);
+  }
+  
+  private static class IntExtractor extends AbstractSimpleExtractor<Integer> {
+    
+    public IntExtractor(Integer defaultValue) {
+      super(defaultValue);
+    }
+    
+    @Override
+    protected Integer doExtract(Tokenizer tokenizer) {
+      return tokenizer.nextInt();
+    }
+
+    @Override
+    public PType<Integer> getPType(PTypeFamily ptf) {
+      return ptf.ints();
+    }
+    
+    @Override
+    public String toString() {
+      return "xint";
+    }
+  }
+  
+  private static class LongExtractor extends AbstractSimpleExtractor<Long> {
+    public LongExtractor(Long defaultValue) {
+      super(defaultValue);
+    }
+    
+    @Override
+    protected Long doExtract(Tokenizer tokenizer) {
+      return tokenizer.nextLong();
+    }
+
+    @Override
+    public PType<Long> getPType(PTypeFamily ptf) {
+      return ptf.longs();
+    }
+    
+    @Override
+    public String toString() {
+      return "xlong";
+    }
+  };
+  
+  private static class FloatExtractor extends AbstractSimpleExtractor<Float> {
+    public FloatExtractor(Float defaultValue) {
+      super(defaultValue);
+    }
+    
+    @Override
+    protected Float doExtract(Tokenizer tokenizer) {
+      return tokenizer.nextFloat();
+    }
+
+    @Override
+    public PType<Float> getPType(PTypeFamily ptf) {
+      return ptf.floats();
+    }
+    
+    @Override
+    public String toString() {
+      return "xfloat";
+    }
+  };
+  
+  private static class DoubleExtractor extends AbstractSimpleExtractor<Double> {
+    public DoubleExtractor(Double defaultValue) {
+      super(defaultValue);
+    }
+    
+    @Override
+    protected Double doExtract(Tokenizer tokenizer) {
+      return tokenizer.nextDouble();
+    }
+
+    @Override
+    public PType<Double> getPType(PTypeFamily ptf) {
+      return ptf.doubles();
+    }
+    
+    @Override
+    public String toString() {
+      return "xdouble";
+    }
+  };
+  
+  private static class BooleanExtractor extends AbstractSimpleExtractor<Boolean> {
+    
+    public BooleanExtractor(Boolean defaultValue) {
+      super(defaultValue);
+    }
+    
+    @Override
+    protected Boolean doExtract(Tokenizer tokenizer) {
+      return tokenizer.nextBoolean();
+    }
+
+    @Override
+    public PType<Boolean> getPType(PTypeFamily ptf) {
+      return ptf.booleans();
+    }
+    
+    @Override
+    public String toString() {
+      return "xboolean";
+    }
+  };
+  
+  private static class StringExtractor extends AbstractSimpleExtractor<String> {
+    
+    public StringExtractor(String defaultValue) {
+      super(defaultValue);
+    }
+    
+    @Override
+    protected String doExtract(Tokenizer tokenizer) {
+      return tokenizer.next();
+    }
+
+    @Override
+    public PType<String> getPType(PTypeFamily ptf) {
+      return ptf.strings();
+    }
+    
+    @Override
+    public String toString() {
+      return "xstring";
+    }
+  };
+  
+  private static class CollectionExtractor<T> implements Extractor<Collection<T>> {
+
+    private final TokenizerFactory tokenizerFactory;
+    private final Extractor<T> extractor;
+    private int errors = 0;
+    private boolean errorOnLast;
+
+    public CollectionExtractor(TokenizerFactory tokenizerFactory, Extractor<T> extractor) {
+      this.tokenizerFactory = tokenizerFactory;
+      this.extractor = extractor;
+    }
+    
+    @Override
+    public Collection<T> extract(String input) {
+      errorOnLast = false;
+      Tokenizer tokenizer = tokenizerFactory.create(input);
+      Collection<T> parsed = Lists.newArrayList();
+      while (tokenizer.hasNext()) {
+        parsed.add(extractor.extract(tokenizer.next()));
+        if (extractor.errorOnLastRecord() && !errorOnLast) {
+          errorOnLast = true;
+          errors++;
+        }
+      }
+      return parsed;
+    }
+
+    @Override
+    public PType<Collection<T>> getPType(PTypeFamily ptf) {
+      return ptf.collections(extractor.getPType(ptf));
+    }
+
+    @Override
+    public Collection<T> getDefaultValue() {
+      return ImmutableList.<T>of();
+    }
+
+    @Override
+    public ExtractorStats getStats() {
+      return new ExtractorStats(errors,
+          ImmutableList.of(extractor.getStats().getErrorCount()));
+    }
+
+    @Override
+    public void initialize() {
+      this.errorOnLast = false;
+      this.errors = 0;
+      extractor.initialize();
+    }
+    
+    @Override
+    public boolean errorOnLastRecord() {
+      return errorOnLast;
+    }
+    
+  }
+  
+  private static class PairExtractor<K, V> extends AbstractCompositeExtractor<Pair<K, V>> {
+    private final Extractor<K> one;
+    private final Extractor<V> two;
+    
+    public PairExtractor(TokenizerFactory scannerFactory, Extractor<K> one, Extractor<V> two) {
+      super(scannerFactory, ImmutableList.<Extractor<?>>of(one, two));
+      this.one = one;
+      this.two = two;
+    }
+
+    @Override
+    protected Pair<K, V> doCreate(Object[] values) {
+      return Pair.of((K) values[0], (V) values[1]);
+    }
+    
+    @Override
+    public PType<Pair<K, V>> getPType(PTypeFamily ptf) {
+      return ptf.pairs(one.getPType(ptf), two.getPType(ptf));
+    }
+    
+    @Override
+    public String toString() {
+      return "xpair(" + one + "," + two + ")";
+    }
+
+    @Override
+    public Pair<K, V> getDefaultValue() {
+      return Pair.of(one.getDefaultValue(), two.getDefaultValue());
+    }
+  };
+  
+  private static class TripExtractor<A, B, C> extends AbstractCompositeExtractor<Tuple3<A, B, C>> {
+    private final Extractor<A> one;
+    private final Extractor<B> two;
+    private final Extractor<C> three;
+    
+    public TripExtractor(TokenizerFactory sf, Extractor<A> one, Extractor<B> two, Extractor<C> three) {
+      super(sf, ImmutableList.<Extractor<?>>of(one, two, three));
+      this.one = one;
+      this.two = two;
+      this.three = three;
+    }
+
+    @Override
+    protected Tuple3<A, B, C> doCreate(Object[] values) {
+      return Tuple3.of((A) values[0], (B) values[1], (C) values[2]);
+    }
+    
+    @Override
+    public PType<Tuple3<A, B, C>> getPType(PTypeFamily ptf) {
+      return ptf.triples(one.getPType(ptf), two.getPType(ptf), three.getPType(ptf));
+    }
+    
+    @Override
+    public Tuple3<A, B, C> getDefaultValue() {
+      return Tuple3.of(one.getDefaultValue(), two.getDefaultValue(), three.getDefaultValue());
+    }
+    
+    @Override
+    public String toString() {
+      return "xtriple(" + one + "," + two + "," + three + ")";
+    }
+  };
+  
+  private static class QuadExtractor<A, B, C, D> extends AbstractCompositeExtractor<Tuple4<A, B, C, D>> {
+    private final Extractor<A> one;
+    private final Extractor<B> two;
+    private final Extractor<C> three;
+    private final Extractor<D> four;
+    
+    public QuadExtractor(TokenizerFactory sf, Extractor<A> one, Extractor<B> two, Extractor<C> three,
+        Extractor<D> four) {
+      super(sf, ImmutableList.<Extractor<?>>of(one, two, three, four));
+      this.one = one;
+      this.two = two;
+      this.three = three;
+      this.four = four;
+    }
+    
+    @Override
+    protected Tuple4<A, B, C, D> doCreate(Object[] values) {
+      return Tuple4.of((A) values[0], (B) values[1], (C) values[2], (D) values[3]);
+    }
+
+    @Override
+    public PType<Tuple4<A, B, C, D>> getPType(PTypeFamily ptf) {
+      return ptf.quads(one.getPType(ptf), two.getPType(ptf), three.getPType(ptf),
+          four.getPType(ptf));
+    }
+    
+    @Override
+    public Tuple4<A, B, C, D> getDefaultValue() {
+      return Tuple4.of(one.getDefaultValue(), two.getDefaultValue(), three.getDefaultValue(),
+          four.getDefaultValue());
+    }
+    
+    @Override
+    public String toString() {
+      return "xquad(" + one + "," + two + "," + three + "," + four + ")";
+    }
+  };
+  
+  private static class TupleNExtractor extends AbstractCompositeExtractor<TupleN> {
+    private final Extractor[] extractors;
+    
+    public TupleNExtractor(TokenizerFactory scannerFactory, Extractor...extractors) {
+      super(scannerFactory, ImmutableList.<Extractor<?>>copyOf(extractors));
+      this.extractors = extractors;
+    }
+    
+    @Override
+    protected TupleN doCreate(Object[] values) {
+      return new TupleN(values);
+    }
+
+    @Override
+    public PType<TupleN> getPType(PTypeFamily ptf) {
+      PType[] ptypes = new PType[extractors.length];
+      for (int i = 0; i < ptypes.length; i++) {
+        ptypes[i] = extractors[i].getPType(ptf);
+      }
+      return ptf.tuples(ptypes);
+    }
+    
+    @Override
+    public TupleN getDefaultValue() {
+      Object[] values = new Object[extractors.length];
+      for (int i = 0; i < values.length; i++) {
+        values[i] = extractors[i].getDefaultValue();
+      }
+      return doCreate(values);
+    }
+    
+    @Override
+    public String toString() {
+      return "xtupleN(" + Joiner.on(',').join(extractors) + ")";
+    }
+  };
+  
+  private static class CustomTupleExtractor<T extends Tuple> extends AbstractCompositeExtractor<T> {
+
+    private final Class<T> clazz;
+    private final Extractor[] extractors;
+    
+    private transient Constructor<T> constructor;
+    
+    public CustomTupleExtractor(TokenizerFactory sf, Class<T> clazz, Extractor... extractors) {
+      super(sf, ImmutableList.<Extractor<?>>copyOf(extractors));
+      this.clazz = clazz;
+      this.extractors = extractors;
+    }
+    
+    @Override
+    public void initialize() {
+      super.initialize();
+      
+      Class[] typeArgs = new Class[extractors.length];
+      for (int i = 0; i < typeArgs.length; i++) {
+        typeArgs[i] = extractors[i].getPType(
+            AvroTypeFamily.getInstance()).getTypeClass();
+      }
+      try {
+        constructor = clazz.getConstructor(typeArgs);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    @Override
+    public T doCreate(Object[] values) {
+      try {
+        return constructor.newInstance(values);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public PType<T> getPType(PTypeFamily ptf) {
+      PType[] ptypes = new PType[extractors.length];
+      for (int i = 0; i < ptypes.length; i++) {
+        ptypes[i] = extractors[i].getPType(ptf);
+      }
+      return ptf.tuples(clazz, ptypes);
+    }
+
+    @Override
+    public T getDefaultValue() {
+      Object[] values = new Object[extractors.length];
+      for (int i = 0; i < values.length; i++) {
+        values[i] = extractors[i].getDefaultValue();
+      }
+      return doCreate(values);
+    }
+
+    @Override
+    public String toString() {
+      return "Extractor(" + clazz + ")";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java
new file mode 100644
index 0000000..a1c610b
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Methods for parsing instances of {@code PCollection<String>} into {@code PCollection}'s of strongly-typed
+ * tuples.
+ */
+public final class Parse {
+
+  /**
+   * Parses the lines of the input {@code PCollection<String>} and returns a {@code PCollection<T>} using
+   * the given {@code Extractor<T>}.
+   * 
+   * @param groupName A label to use for tracking errors related to the parsing process
+   * @param input The input {@code PCollection<String>} to convert
+   * @param extractor The {@code Extractor<T>} that converts each line
+   * @return A {@code PCollection<T>}
+   */
+  public static <T> PCollection<T> parse(String groupName, PCollection<String> input,
+      Extractor<T> extractor) {
+    return parse(groupName, input, input.getTypeFamily(), extractor);
+  }
+  
+  /**
+   * Parses the lines of the input {@code PCollection<String>} and returns a {@code PCollection<T>} using
+   * the given {@code Extractor<T>} that uses the given {@code PTypeFamily}.
+   * 
+   * @param groupName A label to use for tracking errors related to the parsing process
+   * @param input The input {@code PCollection<String>} to convert
+   * @param ptf The {@code PTypeFamily} of the returned {@code PCollection<T>}
+   * @param extractor The {@code Extractor<T>} that converts each line
+   * @return A {@code PCollection<T>}
+   */
+  public static <T> PCollection<T> parse(String groupName, PCollection<String> input, PTypeFamily ptf,
+      Extractor<T> extractor) {
+    return input.parallelDo(groupName, new ExtractorFn<T>(groupName, extractor), extractor.getPType(ptf));
+  }
+
+  /**
+   * Parses the lines of the input {@code PCollection<String>} and returns a {@code PTable<K, V>} using
+   * the given {@code Extractor<Pair<K, V>>}.
+   * 
+   * @param groupName A label to use for tracking errors related to the parsing process
+   * @param input The input {@code PCollection<String>} to convert
+   * @param extractor The {@code Extractor<Pair<K, V>>} that converts each line
+   * @return A {@code PTable<K, V>}
+   */
+  public static <K, V> PTable<K, V> parseTable(String groupName, PCollection<String> input,
+      Extractor<Pair<K, V>> extractor) {
+    return parseTable(groupName, input, input.getTypeFamily(), extractor);
+  }
+  
+  /**
+   * Parses the lines of the input {@code PCollection<String>} and returns a {@code PTable<K, V>} using
+   * the given {@code Extractor<Pair<K, V>>} that uses the given {@code PTypeFamily}.
+   * 
+   * @param groupName A label to use for tracking errors related to the parsing process
+   * @param input The input {@code PCollection<String>} to convert
+   * @param ptf The {@code PTypeFamily} of the returned {@code PTable<K, V>}
+   * @param extractor The {@code Extractor<Pair<K, V>>} that converts each line
+   * @return A {@code PTable<K, V>}
+   */
+  public static <K, V> PTable<K, V> parseTable(String groupName, PCollection<String> input,
+      PTypeFamily ptf, Extractor<Pair<K, V>> extractor) {
+    List<PType> st = extractor.getPType(ptf).getSubTypes();
+    PTableType<K, V> ptt = ptf.tableOf((PType<K>) st.get(0), (PType<V>) st.get(1));
+    return input.parallelDo(groupName, new ExtractorFn<Pair<K, V>>(groupName, extractor), ptt);
+  }
+  
+  private static class ExtractorFn<T> extends MapFn<String, T> {
+
+    private final String groupName;
+    private final Extractor<T> extractor;
+    
+    public ExtractorFn(String groupName, Extractor<T> extractor) {
+      this.groupName = groupName;
+      this.extractor = extractor;
+    }
+    
+    @Override
+    public void initialize() {
+      extractor.initialize();
+    }
+    
+    @Override
+    public T map(String input) {
+      return extractor.extract(input);
+    }
+    
+    @Override
+    public void cleanup(Emitter<T> emitter) {
+      if (getContext() != null) {
+        ExtractorStats stats = extractor.getStats();
+        getCounter(groupName, "OVERALL_ERRORS").increment(stats.getErrorCount());
+        List<Integer> fieldErrors = stats.getFieldErrors();
+        for (int i = 0; i < fieldErrors.size(); i++) {
+          getCounter(groupName, "ERRORS_FOR_FIELD_" + i).increment(fieldErrors.get(i));
+        }
+      }
+    }
+  }
+  
+  // Non-instantiable.
+  private Parse() { }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java
new file mode 100644
index 0000000..8de90b6
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.util.Scanner;
+import java.util.Set;
+
+/**
+ * Manages a {@link Scanner} instance and provides support for returning only a subset
+ * of the fields returned by the underlying {@code Scanner}.
+ */
+public class Tokenizer {
+
+  private final Scanner scanner;
+  private final Set<Integer> indices;
+  private final boolean keep;
+  private int current;
+  
+  /**
+   * Create a new {@code Tokenizer} instance.
+   * 
+   * @param scanner The scanner to manage
+   * @param indices The indices to keep/drop
+   * @param keep Whether the indices should be kept (true) or dropped (false)
+   */
+  public Tokenizer(Scanner scanner, Set<Integer> indices, boolean keep) {
+    this.scanner = scanner;
+    this.indices = checkIndices(indices);
+    this.keep = keep;
+    this.current = -1;
+  }
+  
+  private static Set<Integer> checkIndices(Set<Integer> indices) {
+    for (Integer index : indices) {
+      if (index < 0) {
+        throw new IllegalArgumentException("All tokenizer indices must be non-negative");
+      }
+    }
+    return indices;
+  }
+  
+  private void advance() {
+    if (indices.isEmpty()) {
+      return;
+    }
+    current++;
+    while (scanner.hasNext() &&
+        (keep && !indices.contains(current)) || (!keep && indices.contains(current))) {
+      scanner.next();
+      current++;
+    }
+  }
+  
+  /**
+   * Returns true if the underlying {@code Scanner} has any tokens remaining.
+   */
+  public boolean hasNext() {
+    return scanner.hasNext();
+  }
+
+  /**
+   * Advance this {@code Tokenizer} and return the next String from the {@code Scanner}.
+   * 
+   * @return The next String from the {@code Scanner}
+   */
+  public String next() {
+    advance();
+    return scanner.next();
+  }
+
+  /**
+   * Advance this {@code Tokenizer} and return the next Long from the {@code Scanner}.
+   * 
+   * @return The next Long from the {@code Scanner}
+   */
+  public Long nextLong() {
+    advance();
+    return scanner.nextLong();
+  }
+
+  /**
+   * Advance this {@code Tokenizer} and return the next Boolean from the {@code Scanner}.
+   * 
+   * @return The next Boolean from the {@code Scanner}
+   */
+  public Boolean nextBoolean() {
+    advance();
+    return scanner.nextBoolean();
+  }
+
+  /**
+   * Advance this {@code Tokenizer} and return the next Double from the {@code Scanner}.
+   * 
+   * @return The next Double from the {@code Scanner}
+   */
+  public Double nextDouble() {
+    advance();
+    return scanner.nextDouble();
+  }
+
+  /**
+   * Advance this {@code Tokenizer} and return the next Float from the {@code Scanner}.
+   * 
+   * @return The next Float from the {@code Scanner}
+   */
+  public Float nextFloat() {
+    advance();
+    return scanner.nextFloat();
+  }
+
+  /**
+   * Advance this {@code Tokenizer} and return the next Integer from the {@code Scanner}.
+   * 
+   * @return The next Integer from the {@code Scanner}
+   */
+  public Integer nextInt() {
+    advance();
+    return scanner.nextInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java
new file mode 100644
index 0000000..f43478d
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.io.Serializable;
+import java.util.Locale;
+import java.util.Scanner;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Factory class that constructs {@link Tokenizer} instances for input strings that use a fixed
+ * set of delimiters, skip patterns, locales, and sets of indices to keep or drop.
+ */
+public class TokenizerFactory implements Serializable {
+
+  private static TokenizerFactory DEFAULT_INSTANCE = new TokenizerFactory(null, null, null,
+      ImmutableSet.<Integer>of(), true);
+  
+  private final String delim;
+  private final String skip;
+  private final Locale locale;
+  private final Set<Integer> indices;
+  private final boolean keep;
+  
+  /**
+   * Returns a default {@code TokenizerFactory} that uses whitespace as a delimiter and does
+   * not skip any input fields.
+   * @return The default {@code TokenizerFactory}
+   */
+  public static TokenizerFactory getDefaultInstance() { return DEFAULT_INSTANCE; }
+  
+  private TokenizerFactory(String delim, String skip, Locale locale,
+      Set<Integer> indices, boolean keep) {
+    this.delim = delim;
+    this.skip = skip;
+    this.locale = locale;
+    this.indices = indices;
+    this.keep = keep;
+  }
+  
+  /**
+   * Return a {@code Scanner} instance that wraps the input string and uses the delimiter,
+   * skip, and locale settings for this {@code TokenizerFactory} instance.
+   * 
+   * @param input The input string
+   * @return A new {@code Scanner} instance with appropriate settings
+   */
+  public Tokenizer create(String input) {
+    Scanner s = new Scanner(input);
+    if (delim != null) {
+      s.useDelimiter(delim);
+    }
+    if (skip != null) {
+      s.skip(skip);
+    }
+    if (locale != null) {
+      s.useLocale(locale);
+    }
+    return new Tokenizer(s, indices, keep);
+  }
+
+  /**
+   * Factory method for creating a {@code TokenizerFactory.Builder} instance.
+   * @return A new {@code TokenizerFactory.Builder}
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+  
+  /**
+   * A class for constructing new {@code TokenizerFactory} instances using the Builder pattern.
+   */
+  public static class Builder {
+    private String delim;
+    private String skip;
+    private Locale locale;
+    private Set<Integer> indices = ImmutableSet.of();
+    private boolean keep;
+    
+    /**
+     * Sets the delimiter used by the {@code TokenizerFactory} instances constructed by
+     * this instance.
+     * @param delim The delimiter to use, which may be a regular expression
+     * @return This {@code Builder} instance
+     */
+    public Builder delimiter(String delim) {
+      this.delim = delim;
+      return this;
+    }
+    
+    /**
+     * Sets the regular expression that determines which input characters should be
+     * ignored by the {@code Scanner} that is returned by the constructed
+     * {@code TokenizerFactory}.
+     * 
+     * @param skip The regular expression of input values to ignore
+     * @return This {@code Builder} instance
+     */
+    public Builder skip(String skip) {
+      this.skip = skip;
+      return this;
+    }
+    
+    /**
+     * Sets the {@code Locale} to use with the {@code TokenizerFactory} returned by
+     * this {@code Builder} instance.
+     * 
+     * @param locale The locale to use
+     * @return This {@code Builder} instance
+     */
+    public Builder locale(Locale locale) {
+      this.locale = locale;
+      return this;
+    }
+    
+    /**
+     * Keep only the specified fields found by the input scanner, counting from
+     * zero.
+     * 
+     * @param indices The indices to keep
+     * @return This {@code Builder} instance
+     */
+    public Builder keep(Integer... indices) {
+      Preconditions.checkArgument(this.indices.isEmpty(),
+          "Cannot set keep indices more than once");
+      this.indices = ImmutableSet.copyOf(indices);
+      this.keep = true;
+      return this;
+    }
+    
+    /**
+     * Drop the specified fields found by the input scanner, counting from zero.
+     * 
+     * @param indices The indices to drop
+     * @return This {@code Builder} instance
+     */
+    public Builder drop(Integer... indices) {
+      Preconditions.checkArgument(this.indices.isEmpty(),
+          "Cannot set drop indices more than once");
+      this.indices = ImmutableSet.copyOf(indices);
+      this.keep = false;
+      return this;
+    }
+    
+    /**
+     * Returns a new {@code TokenizerFactory} with settings determined by this
+     * {@code Builder} instance.
+     * @return A new {@code TokenizerFactory}
+     */
+    public TokenizerFactory build() {
+      return new TokenizerFactory(delim, skip, locale, indices, keep);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java b/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java
new file mode 100644
index 0000000..4da7521
--- /dev/null
+++ b/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import static org.apache.crunch.contrib.text.Extractors.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.contrib.text.Parse;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ *
+ */
+public class ParseTest {
+
+  @Test
+  public void testInt() {
+    assertEquals(Integer.valueOf(1729), xint().extract("1729"));
+    assertEquals(Integer.valueOf(321), xint(321).extract("foo"));
+  }
+
+  @Test
+  public void testString() {
+    assertEquals("bar", xstring().extract("bar"));
+  }
+
+  @Test
+  public void testPairWithDrop() {
+    TokenizerFactory sf = TokenizerFactory.builder().delimiter(",").drop(0, 2).build();
+    assertEquals(Pair.of(1, "abc"), xpair(sf, xint(), xstring()).extract("foo,1,17.29,abc"));
+  }
+
+  @Test
+  public void testTripsWithSkip() {
+    TokenizerFactory sf = TokenizerFactory.builder().delimiter(";").skip("^foo").build();
+    assertEquals(Tuple3.of(17, "abc", 3.4f),
+        xtriple(sf, xint(), xstring(), xfloat()).extract("foo17;abc;3.4"));
+  }
+  
+  @Test
+  public void testTripsWithKeep() {
+    TokenizerFactory sf = TokenizerFactory.builder().delimiter(";").keep(1, 2, 3).build();
+    assertEquals(Tuple3.of(17, "abc", 3.4f),
+        xtriple(sf, xint(), xstring(), xfloat()).extract("foo;17;abc;3.4"));
+  }
+  
+  @Test
+  public void testQuadsWithWhitespace() {
+    TokenizerFactory sf = TokenizerFactory.getDefaultInstance();
+    assertEquals(Tuple4.of(1.3, "foo", true, 1L),
+        xquad(sf, xdouble(), xstring(), xboolean(), xlong()).extract("1.3   foo  true 1"));
+  }
+  
+  @Test
+  public void testTupleN() {
+    TokenizerFactory sf = TokenizerFactory.builder().delimiter(",").build();
+    assertEquals(new TupleN(1, false, true, 2, 3),
+        xtupleN(sf, xint(), xboolean(), xboolean(), xint(), xint()).extract("1,false,true,2,3"));
+  }
+  
+  @Test
+  public void testCollections() {
+    TokenizerFactory sf = TokenizerFactory.builder().delimiter(";").build();
+    // Use 3000 as the default for values we can't parse
+    Extractor<Collection<Integer>> x = xcollect(sf, xint(3000));
+    
+    assertEquals(ImmutableList.of(1, 2, 3), x.extract("1;2;3"));
+    assertFalse(x.errorOnLastRecord());
+    assertEquals(ImmutableList.of(17, 29, 3000), x.extract("17;29;a"));
+    assertTrue(x.errorOnLastRecord());
+    assertEquals(1, x.getStats().getErrorCount());
+  }
+  
+  @Test
+  public void testNestedComposites() {
+    TokenizerFactory outer = TokenizerFactory.builder().delimiter(";").build();
+    TokenizerFactory inner = TokenizerFactory.builder().delimiter(",").build();
+    Extractor<Pair<Pair<Long, Integer>, Tuple3<String, Integer, Float>>> extractor =
+        xpair(outer, xpair(inner, xlong(), xint()), xtriple(inner, xstring(), xint(), xfloat()));
+    assertEquals(Pair.of(Pair.of(1L, 2), Tuple3.of("a", 17, 29f)),
+        extractor.extract("1,2;a,17,29"));
+  }
+  
+  @Test
+  public void testParse() {
+    TokenizerFactory sf = TokenizerFactory.builder().delimiter(",").build();
+    PCollection<String> lines = MemPipeline.typedCollectionOf(Avros.strings(), "1,3.0");
+    Iterable<Pair<Integer, Float>> it = Parse.parse("test", lines,
+        xpair(sf, xint(), xfloat())).materialize();
+    assertEquals(ImmutableList.of(Pair.of(1, 3.0f)), it);
+  }
+}