You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/12/13 20:20:56 UTC
git commit: CRUNCH-97: Add support for parsing structured records out
of text files to crunch-contrib.
Updated Branches:
refs/heads/master 4a3aa0dfd -> ac317a185
CRUNCH-97: Add support for parsing structured records out of text files to crunch-contrib.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/ac317a18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/ac317a18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/ac317a18
Branch: refs/heads/master
Commit: ac317a185401cb0ef1afe7096b51ac29106eb1e2
Parents: 4a3aa0d
Author: Josh Wills <jw...@apache.org>
Authored: Mon Oct 15 20:04:18 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Dec 13 11:18:14 2012 -0800
----------------------------------------------------------------------
.../contrib/text/AbstractCompositeExtractor.java | 100 +++
.../contrib/text/AbstractSimpleExtractor.java | 97 +++
.../org/apache/crunch/contrib/text/Extractor.java | 67 ++
.../apache/crunch/contrib/text/ExtractorStats.java | 59 ++
.../org/apache/crunch/contrib/text/Extractors.java | 548 +++++++++++++++
.../java/org/apache/crunch/contrib/text/Parse.java | 132 ++++
.../org/apache/crunch/contrib/text/Tokenizer.java | 135 ++++
.../crunch/contrib/text/TokenizerFactory.java | 173 +++++
.../org/apache/crunch/contrib/text/ParseTest.java | 120 ++++
9 files changed, 1431 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java
new file mode 100644
index 0000000..f26fc57
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Base class for {@code Extractor} instances that delegates the parsing of fields to other
+ * {@code Extractor} instances, primarily used for constructing composite records that implement
+ * the {@code Tuple} interface.
+ */
+public abstract class AbstractCompositeExtractor<T> implements Extractor<T> {
+
+ private final TokenizerFactory tokenizerFactory;
+ private int errors = 0;
+ private boolean errorOnLast;
+ private final List<Extractor<?>> extractors;
+
+ public AbstractCompositeExtractor(TokenizerFactory scannerFactory, List<Extractor<?>> extractors) {
+ Preconditions.checkArgument(extractors.size() > 0);
+ this.tokenizerFactory = scannerFactory;
+ this.extractors = extractors;
+ }
+
+ @Override
+ public T extract(String input) {
+ errorOnLast = false;
+ Tokenizer tokenizer = tokenizerFactory.create(input);
+ Object[] values = new Object[extractors.size()];
+ try {
+ for (int i = 0; i < values.length; i++) {
+ values[i] = extractors.get(i).extract(tokenizer.next());
+ if (extractors.get(i).errorOnLastRecord() && !errorOnLast) {
+ errors++;
+ errorOnLast = true;
+ }
+ }
+ } catch (Exception e) {
+ if (!errorOnLast) {
+ errors++;
+ errorOnLast = true;
+ }
+ return getDefaultValue();
+ }
+
+ return doCreate(values);
+ }
+
+ @Override
+ public void initialize() {
+ this.errors = 0;
+ this.errorOnLast = false;
+ for (Extractor<?> x : extractors) {
+ x.initialize();
+ }
+ }
+
+ @Override
+ public boolean errorOnLastRecord() {
+ return errorOnLast;
+ }
+
+ @Override
+ public ExtractorStats getStats() {
+ return new ExtractorStats(errors, Lists.transform(extractors, new Function<Extractor<?>, Integer>() {
+ @Override
+ public Integer apply(Extractor<?> input) {
+ return input.getStats().getErrorCount();
+ }
+ }));
+ }
+
+ /**
+ * Subclasses should return a new instance of the object based on the fields parsed by
+ * the {@code Extractor} instances for this composite {@code Extractor} instance.
+ *
+ * @param values The values that were extracted by the component {@code Extractor} objects
+ * @return A new instance of the composite class for this {@code Extractor}
+ */
+ protected abstract T doCreate(Object[] values);
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java
new file mode 100644
index 0000000..9959b44
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Base class for the common case {@code Extractor} instances that construct a single
+ * object from a block of text stored in a {@code String}, with support for error handling
+ * and reporting.
+ */
+public abstract class AbstractSimpleExtractor<T> implements Extractor<T> {
+
+ private static final Log LOG = LogFactory.getLog(AbstractSimpleExtractor.class);
+ private static final int LOG_ERROR_LIMIT = 100;
+
+ private int errors;
+ private boolean errorOnLast;
+ private final T defaultValue;
+ private final TokenizerFactory scannerFactory;
+
+ public AbstractSimpleExtractor(T defaultValue) {
+ this(defaultValue, TokenizerFactory.getDefaultInstance());
+ }
+
+ public AbstractSimpleExtractor(T defaultValue, TokenizerFactory scannerFactory) {
+ this.defaultValue = defaultValue;
+ this.scannerFactory = scannerFactory;
+ }
+
+ @Override
+ public void initialize() {
+ this.errors = 0;
+ this.errorOnLast = false;
+ }
+
+ @Override
+ public T extract(String input) {
+ errorOnLast = false;
+ T res = defaultValue;
+ try {
+ res = doExtract(scannerFactory.create(input));
+ } catch (Exception e) {
+ errorOnLast = true;
+ errors++;
+ if (errors < LOG_ERROR_LIMIT) {
+ String msg = String.format("Error occurred parsing input '%s' using extractor %s",
+ input, this);
+ LOG.error(msg, e);
+ }
+ }
+ return res;
+ }
+
+ @Override
+ public boolean errorOnLastRecord() {
+ return errorOnLast;
+ }
+
+ @Override
+ public T getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override
+ public ExtractorStats getStats() {
+ return new ExtractorStats(errors);
+ }
+
+ /**
+ * Subclasses must override this method to return a new instance of the
+ * class that this {@code Extractor} instance is designed to parse.
+ * <p>Any runtime parsing exceptions from the given {@code Tokenizer} instance
+ * should be thrown so that they may be caught by the error handling logic
+ * inside of this class.
+ *
+ * @param tokenizer The {@code Tokenizer} instance for the current record
+ * @return A new instance of the type defined for this class
+ */
+ protected abstract T doExtract(Tokenizer tokenizer);
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java
new file mode 100644
index 0000000..1e7f6b7
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.io.Serializable;
+
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * An interface for extracting a specific data type from a text string that
+ * is being processed by a {@code Scanner} object.
+ *
+ * @param <T> The data type to be extracted
+ */
+public interface Extractor<T> extends Serializable {
+
+ /**
+ * Extract a value with the type of this instance.
+ */
+ T extract(String input);
+
+ /**
+ * Returns the {@code PType} associated with this data type for the
+ * given {@code PTypeFamily}.
+ */
+ PType<T> getPType(PTypeFamily ptf);
+
+ /**
+ * Returns the default value for this {@code Extractor} in case of an
+ * error.
+ */
+ T getDefaultValue();
+
+ /**
+ * Perform any initialization required by this {@code Extractor} during the
+ * start of a map or reduce task.
+ */
+ void initialize();
+
+ /**
+ * Returns true if the last call to {@code extract} on this instance
+ * threw an exception that was handled.
+ */
+ boolean errorOnLastRecord();
+
+ /**
+ * Return statistics about how many errors this {@code Extractor} instance
+ * encountered while parsing input data.
+ */
+ ExtractorStats getStats();
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java
new file mode 100644
index 0000000..7dc37f4
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Records the number of kind of errors that an {@code Extractor} encountered when parsing
+ * input data.
+ */
+public class ExtractorStats {
+
+ private final int errorCount;
+ private final List<Integer> fieldErrors;
+
+ public ExtractorStats(int errorCount) {
+ this(errorCount, ImmutableList.<Integer>of());
+ }
+
+ public ExtractorStats(int errorCount, List<Integer> fieldErrors) {
+ this.errorCount = errorCount;
+ this.fieldErrors = fieldErrors;
+ }
+
+ /**
+ * The overall number of records that had some kind of parsing error.
+ * @return The overall number of records that had some kind of parsing error
+ */
+ public int getErrorCount() {
+ return errorCount;
+ }
+
+ /**
+ * Returns the number of errors that occurred when parsing the individual fields of
+ * a composite record type, like a {@code Pair} or {@code TupleN}.
+ * @return The number of errors that occurred when parsing the individual fields of
+ * a composite record type
+ */
+ public List<Integer> getFieldErrors() {
+ return fieldErrors;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java
new file mode 100644
index 0000000..0ed1282
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.Scanner;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Factory methods for constructing common {@code Extractor} types.
+ */
+public class Extractors {
+
+ /**
+ * Returns an Extractor for integers.
+ */
+ public static Extractor<Integer> xint() {
+ return xint(0);
+ }
+
+ /**
+ * Returns an Extractor for integers.
+ */
+ public static Extractor<Integer> xint(Integer defaultValue) {
+ return new IntExtractor(defaultValue);
+ }
+
+ /**
+ * Returns an Extractor for longs.
+ */
+ public static Extractor<Long> xlong() {
+ return xlong(0L);
+ }
+
+ /**
+ * Returns an Extractor for longs.
+ */
+ public static Extractor<Long> xlong(Long defaultValue) {
+ return new LongExtractor(defaultValue);
+ }
+
+ /**
+ * Returns an Extractor for floats.
+ */
+ public static Extractor<Float> xfloat() {
+ return xfloat(0f);
+ }
+
+ public static Extractor<Float> xfloat(Float defaultValue) {
+ return new FloatExtractor(defaultValue);
+ }
+
+ /**
+ * Returns an Extractor for doubles.
+ */
+ public static Extractor<Double> xdouble() {
+ return xdouble(0.0);
+ }
+
+ public static Extractor<Double> xdouble(Double defaultValue) {
+ return new DoubleExtractor(defaultValue);
+ }
+
+ /**
+ * Returns an Extractor for booleans.
+ */
+ public static Extractor<Boolean> xboolean() {
+ return xboolean(false);
+ }
+
+ public static Extractor<Boolean> xboolean(Boolean defaultValue) {
+ return new BooleanExtractor(defaultValue);
+ }
+
+ /**
+ * Returns an Extractor for strings.
+ */
+ public static Extractor<String> xstring() {
+ return xstring("");
+ }
+
+ public static Extractor<String> xstring(String defaultValue) {
+ return new StringExtractor(defaultValue);
+ }
+
+ public static <T> Extractor<Collection<T>> xcollect(TokenizerFactory scannerFactory, Extractor<T> extractor) {
+ return new CollectionExtractor<T>(scannerFactory, extractor);
+ }
+
+ /**
+ * Returns an Extractor for pairs of the given types that uses the given {@code TokenizerFactory}
+ * for parsing the sub-fields.
+ */
+ public static <K, V> Extractor<Pair<K, V>> xpair(TokenizerFactory scannerFactory,
+ Extractor<K> one, Extractor<V> two) {
+ return new PairExtractor<K, V>(scannerFactory, one, two);
+ }
+
+ /**
+ * Returns an Extractor for triples of the given types that uses the given {@code TokenizerFactory}
+ * for parsing the sub-fields.
+ */
+ public static <A, B, C> Extractor<Tuple3<A, B, C>> xtriple(TokenizerFactory scannerFactory, Extractor<A> a,
+ Extractor<B> b, Extractor<C> c) {
+ return new TripExtractor<A, B, C>(scannerFactory, a, b, c);
+ }
+
+ /**
+ * Returns an Extractor for quads of the given types that uses the given {@code TokenizerFactory}
+ * for parsing the sub-fields.
+ */
+ public static <A, B, C, D> Extractor<Tuple4<A, B, C, D>> xquad(TokenizerFactory scannerFactory, Extractor<A> a,
+ Extractor<B> b, Extractor<C> c, Extractor<D> d) {
+ return new QuadExtractor<A, B, C, D>(scannerFactory, a, b, c, d);
+ }
+
+ /**
+ * Returns an Extractor for an arbitrary number of types that uses the given {@code TokenizerFactory}
+ * for parsing the sub-fields.
+ */
+ public static Extractor<TupleN> xtupleN(TokenizerFactory scannerFactory, Extractor...extractors) {
+ return new TupleNExtractor(scannerFactory, extractors);
+ }
+
+ /**
+ * Returns an Extractor for a subclass of {@code Tuple} with a constructor that
+ * has the given extractor types that uses the given {@code TokenizerFactory}
+ * for parsing the sub-fields.
+ */
+ public static <T extends Tuple> Extractor<T> xcustom(Class<T> clazz, TokenizerFactory scannerFactory, Extractor... extractors) {
+ return new CustomTupleExtractor<T>(scannerFactory, clazz, extractors);
+ }
+
+ private static class IntExtractor extends AbstractSimpleExtractor<Integer> {
+
+ public IntExtractor(Integer defaultValue) {
+ super(defaultValue);
+ }
+
+ @Override
+ protected Integer doExtract(Tokenizer tokenizer) {
+ return tokenizer.nextInt();
+ }
+
+ @Override
+ public PType<Integer> getPType(PTypeFamily ptf) {
+ return ptf.ints();
+ }
+
+ @Override
+ public String toString() {
+ return "xint";
+ }
+ }
+
+ private static class LongExtractor extends AbstractSimpleExtractor<Long> {
+ public LongExtractor(Long defaultValue) {
+ super(defaultValue);
+ }
+
+ @Override
+ protected Long doExtract(Tokenizer tokenizer) {
+ return tokenizer.nextLong();
+ }
+
+ @Override
+ public PType<Long> getPType(PTypeFamily ptf) {
+ return ptf.longs();
+ }
+
+ @Override
+ public String toString() {
+ return "xlong";
+ }
+ };
+
+ private static class FloatExtractor extends AbstractSimpleExtractor<Float> {
+ public FloatExtractor(Float defaultValue) {
+ super(defaultValue);
+ }
+
+ @Override
+ protected Float doExtract(Tokenizer tokenizer) {
+ return tokenizer.nextFloat();
+ }
+
+ @Override
+ public PType<Float> getPType(PTypeFamily ptf) {
+ return ptf.floats();
+ }
+
+ @Override
+ public String toString() {
+ return "xfloat";
+ }
+ };
+
+ private static class DoubleExtractor extends AbstractSimpleExtractor<Double> {
+ public DoubleExtractor(Double defaultValue) {
+ super(defaultValue);
+ }
+
+ @Override
+ protected Double doExtract(Tokenizer tokenizer) {
+ return tokenizer.nextDouble();
+ }
+
+ @Override
+ public PType<Double> getPType(PTypeFamily ptf) {
+ return ptf.doubles();
+ }
+
+ @Override
+ public String toString() {
+ return "xdouble";
+ }
+ };
+
+ private static class BooleanExtractor extends AbstractSimpleExtractor<Boolean> {
+
+ public BooleanExtractor(Boolean defaultValue) {
+ super(defaultValue);
+ }
+
+ @Override
+ protected Boolean doExtract(Tokenizer tokenizer) {
+ return tokenizer.nextBoolean();
+ }
+
+ @Override
+ public PType<Boolean> getPType(PTypeFamily ptf) {
+ return ptf.booleans();
+ }
+
+ @Override
+ public String toString() {
+ return "xboolean";
+ }
+ };
+
+ private static class StringExtractor extends AbstractSimpleExtractor<String> {
+
+ public StringExtractor(String defaultValue) {
+ super(defaultValue);
+ }
+
+ @Override
+ protected String doExtract(Tokenizer tokenizer) {
+ return tokenizer.next();
+ }
+
+ @Override
+ public PType<String> getPType(PTypeFamily ptf) {
+ return ptf.strings();
+ }
+
+ @Override
+ public String toString() {
+ return "xstring";
+ }
+ };
+
+ private static class CollectionExtractor<T> implements Extractor<Collection<T>> {
+
+ private final TokenizerFactory tokenizerFactory;
+ private final Extractor<T> extractor;
+ private int errors = 0;
+ private boolean errorOnLast;
+
+ public CollectionExtractor(TokenizerFactory tokenizerFactory, Extractor<T> extractor) {
+ this.tokenizerFactory = tokenizerFactory;
+ this.extractor = extractor;
+ }
+
+ @Override
+ public Collection<T> extract(String input) {
+ errorOnLast = false;
+ Tokenizer tokenizer = tokenizerFactory.create(input);
+ Collection<T> parsed = Lists.newArrayList();
+ while (tokenizer.hasNext()) {
+ parsed.add(extractor.extract(tokenizer.next()));
+ if (extractor.errorOnLastRecord() && !errorOnLast) {
+ errorOnLast = true;
+ errors++;
+ }
+ }
+ return parsed;
+ }
+
+ @Override
+ public PType<Collection<T>> getPType(PTypeFamily ptf) {
+ return ptf.collections(extractor.getPType(ptf));
+ }
+
+ @Override
+ public Collection<T> getDefaultValue() {
+ return ImmutableList.<T>of();
+ }
+
+ @Override
+ public ExtractorStats getStats() {
+ return new ExtractorStats(errors,
+ ImmutableList.of(extractor.getStats().getErrorCount()));
+ }
+
+ @Override
+ public void initialize() {
+ this.errorOnLast = false;
+ this.errors = 0;
+ extractor.initialize();
+ }
+
+ @Override
+ public boolean errorOnLastRecord() {
+ return errorOnLast;
+ }
+
+ }
+
+ private static class PairExtractor<K, V> extends AbstractCompositeExtractor<Pair<K, V>> {
+ private final Extractor<K> one;
+ private final Extractor<V> two;
+
+ public PairExtractor(TokenizerFactory scannerFactory, Extractor<K> one, Extractor<V> two) {
+ super(scannerFactory, ImmutableList.<Extractor<?>>of(one, two));
+ this.one = one;
+ this.two = two;
+ }
+
+ @Override
+ protected Pair<K, V> doCreate(Object[] values) {
+ return Pair.of((K) values[0], (V) values[1]);
+ }
+
+ @Override
+ public PType<Pair<K, V>> getPType(PTypeFamily ptf) {
+ return ptf.pairs(one.getPType(ptf), two.getPType(ptf));
+ }
+
+ @Override
+ public String toString() {
+ return "xpair(" + one + "," + two + ")";
+ }
+
+ @Override
+ public Pair<K, V> getDefaultValue() {
+ return Pair.of(one.getDefaultValue(), two.getDefaultValue());
+ }
+ };
+
+ private static class TripExtractor<A, B, C> extends AbstractCompositeExtractor<Tuple3<A, B, C>> {
+ private final Extractor<A> one;
+ private final Extractor<B> two;
+ private final Extractor<C> three;
+
+ public TripExtractor(TokenizerFactory sf, Extractor<A> one, Extractor<B> two, Extractor<C> three) {
+ super(sf, ImmutableList.<Extractor<?>>of(one, two, three));
+ this.one = one;
+ this.two = two;
+ this.three = three;
+ }
+
+ @Override
+ protected Tuple3<A, B, C> doCreate(Object[] values) {
+ return Tuple3.of((A) values[0], (B) values[1], (C) values[2]);
+ }
+
+ @Override
+ public PType<Tuple3<A, B, C>> getPType(PTypeFamily ptf) {
+ return ptf.triples(one.getPType(ptf), two.getPType(ptf), three.getPType(ptf));
+ }
+
+ @Override
+ public Tuple3<A, B, C> getDefaultValue() {
+ return Tuple3.of(one.getDefaultValue(), two.getDefaultValue(), three.getDefaultValue());
+ }
+
+ @Override
+ public String toString() {
+ return "xtriple(" + one + "," + two + "," + three + ")";
+ }
+ };
+
+ private static class QuadExtractor<A, B, C, D> extends AbstractCompositeExtractor<Tuple4<A, B, C, D>> {
+ private final Extractor<A> one;
+ private final Extractor<B> two;
+ private final Extractor<C> three;
+ private final Extractor<D> four;
+
+ public QuadExtractor(TokenizerFactory sf, Extractor<A> one, Extractor<B> two, Extractor<C> three,
+ Extractor<D> four) {
+ super(sf, ImmutableList.<Extractor<?>>of(one, two, three, four));
+ this.one = one;
+ this.two = two;
+ this.three = three;
+ this.four = four;
+ }
+
+ @Override
+ protected Tuple4<A, B, C, D> doCreate(Object[] values) {
+ return Tuple4.of((A) values[0], (B) values[1], (C) values[2], (D) values[3]);
+ }
+
+ @Override
+ public PType<Tuple4<A, B, C, D>> getPType(PTypeFamily ptf) {
+ return ptf.quads(one.getPType(ptf), two.getPType(ptf), three.getPType(ptf),
+ four.getPType(ptf));
+ }
+
+ @Override
+ public Tuple4<A, B, C, D> getDefaultValue() {
+ return Tuple4.of(one.getDefaultValue(), two.getDefaultValue(), three.getDefaultValue(),
+ four.getDefaultValue());
+ }
+
+ @Override
+ public String toString() {
+ return "xquad(" + one + "," + two + "," + three + "," + four + ")";
+ }
+ };
+
+ private static class TupleNExtractor extends AbstractCompositeExtractor<TupleN> {
+ private final Extractor[] extractors;
+
+ public TupleNExtractor(TokenizerFactory scannerFactory, Extractor...extractors) {
+ super(scannerFactory, ImmutableList.<Extractor<?>>copyOf(extractors));
+ this.extractors = extractors;
+ }
+
+ @Override
+ protected TupleN doCreate(Object[] values) {
+ return new TupleN(values);
+ }
+
+ @Override
+ public PType<TupleN> getPType(PTypeFamily ptf) {
+ PType[] ptypes = new PType[extractors.length];
+ for (int i = 0; i < ptypes.length; i++) {
+ ptypes[i] = extractors[i].getPType(ptf);
+ }
+ return ptf.tuples(ptypes);
+ }
+
+ @Override
+ public TupleN getDefaultValue() {
+ Object[] values = new Object[extractors.length];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = extractors[i].getDefaultValue();
+ }
+ return doCreate(values);
+ }
+
+ @Override
+ public String toString() {
+ return "xtupleN(" + Joiner.on(',').join(extractors) + ")";
+ }
+ };
+
+ private static class CustomTupleExtractor<T extends Tuple> extends AbstractCompositeExtractor<T> {
+
+ private final Class<T> clazz;
+ private final Extractor[] extractors;
+
+ private transient Constructor<T> constructor;
+
+ public CustomTupleExtractor(TokenizerFactory sf, Class<T> clazz, Extractor... extractors) {
+ super(sf, ImmutableList.<Extractor<?>>copyOf(extractors));
+ this.clazz = clazz;
+ this.extractors = extractors;
+ }
+
+ @Override
+ public void initialize() {
+ super.initialize();
+
+ Class[] typeArgs = new Class[extractors.length];
+ for (int i = 0; i < typeArgs.length; i++) {
+ typeArgs[i] = extractors[i].getPType(
+ AvroTypeFamily.getInstance()).getTypeClass();
+ }
+ try {
+ constructor = clazz.getConstructor(typeArgs);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public T doCreate(Object[] values) {
+ try {
+ return constructor.newInstance(values);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public PType<T> getPType(PTypeFamily ptf) {
+ PType[] ptypes = new PType[extractors.length];
+ for (int i = 0; i < ptypes.length; i++) {
+ ptypes[i] = extractors[i].getPType(ptf);
+ }
+ return ptf.tuples(clazz, ptypes);
+ }
+
+ @Override
+ public T getDefaultValue() {
+ Object[] values = new Object[extractors.length];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = extractors[i].getDefaultValue();
+ }
+ return doCreate(values);
+ }
+
+ @Override
+ public String toString() {
+ return "Extractor(" + clazz + ")";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java
new file mode 100644
index 0000000..a1c610b
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Methods for parsing instances of {@code PCollection<String>} into {@code PCollection}'s of strongly-typed
+ * tuples.
+ */
+public final class Parse {
+
+ /**
+ * Parses the lines of the input {@code PCollection<String>} and returns a {@code PCollection<T>} using
+ * the given {@code Extractor<T>}.
+ *
+ * @param groupName A label to use for tracking errors related to the parsing process
+ * @param input The input {@code PCollection<String>} to convert
+ * @param extractor The {@code Extractor<T>} that converts each line
+ * @return A {@code PCollection<T>}
+ */
+ public static <T> PCollection<T> parse(String groupName, PCollection<String> input,
+ Extractor<T> extractor) {
+ return parse(groupName, input, input.getTypeFamily(), extractor);
+ }
+
+ /**
+ * Parses the lines of the input {@code PCollection<String>} and returns a {@code PCollection<T>} using
+ * the given {@code Extractor<T>} that uses the given {@code PTypeFamily}.
+ *
+ * @param groupName A label to use for tracking errors related to the parsing process
+ * @param input The input {@code PCollection<String>} to convert
+ * @param ptf The {@code PTypeFamily} of the returned {@code PCollection<T>}
+ * @param extractor The {@code Extractor<T>} that converts each line
+ * @return A {@code PCollection<T>}
+ */
+ public static <T> PCollection<T> parse(String groupName, PCollection<String> input, PTypeFamily ptf,
+ Extractor<T> extractor) {
+ return input.parallelDo(groupName, new ExtractorFn<T>(groupName, extractor), extractor.getPType(ptf));
+ }
+
+ /**
+ * Parses the lines of the input {@code PCollection<String>} and returns a {@code PTable<K, V>} using
+ * the given {@code Extractor<Pair<K, V>>}.
+ *
+ * @param groupName A label to use for tracking errors related to the parsing process
+ * @param input The input {@code PCollection<String>} to convert
+ * @param extractor The {@code Extractor<Pair<K, V>>} that converts each line
+ * @return A {@code PTable<K, V>}
+ */
+ public static <K, V> PTable<K, V> parseTable(String groupName, PCollection<String> input,
+ Extractor<Pair<K, V>> extractor) {
+ return parseTable(groupName, input, input.getTypeFamily(), extractor);
+ }
+
+ /**
+ * Parses the lines of the input {@code PCollection<String>} and returns a {@code PTable<K, V>} using
+ * the given {@code Extractor<Pair<K, V>>} that uses the given {@code PTypeFamily}.
+ *
+ * @param groupName A label to use for tracking errors related to the parsing process
+ * @param input The input {@code PCollection<String>} to convert
+ * @param ptf The {@code PTypeFamily} of the returned {@code PTable<K, V>}
+ * @param extractor The {@code Extractor<Pair<K, V>>} that converts each line
+ * @return A {@code PTable<K, V>}
+ */
+ public static <K, V> PTable<K, V> parseTable(String groupName, PCollection<String> input,
+ PTypeFamily ptf, Extractor<Pair<K, V>> extractor) {
+ List<PType> st = extractor.getPType(ptf).getSubTypes();
+ PTableType<K, V> ptt = ptf.tableOf((PType<K>) st.get(0), (PType<V>) st.get(1));
+ return input.parallelDo(groupName, new ExtractorFn<Pair<K, V>>(groupName, extractor), ptt);
+ }
+
+ private static class ExtractorFn<T> extends MapFn<String, T> {
+
+ private final String groupName;
+ private final Extractor<T> extractor;
+
+ public ExtractorFn(String groupName, Extractor<T> extractor) {
+ this.groupName = groupName;
+ this.extractor = extractor;
+ }
+
+ @Override
+ public void initialize() {
+ extractor.initialize();
+ }
+
+ @Override
+ public T map(String input) {
+ return extractor.extract(input);
+ }
+
+ @Override
+ public void cleanup(Emitter<T> emitter) {
+ if (getContext() != null) {
+ ExtractorStats stats = extractor.getStats();
+ getCounter(groupName, "OVERALL_ERRORS").increment(stats.getErrorCount());
+ List<Integer> fieldErrors = stats.getFieldErrors();
+ for (int i = 0; i < fieldErrors.size(); i++) {
+ getCounter(groupName, "ERRORS_FOR_FIELD_" + i).increment(fieldErrors.get(i));
+ }
+ }
+ }
+ }
+
+ // Non-instantiable.
+ private Parse() { }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java
new file mode 100644
index 0000000..8de90b6
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.util.Scanner;
+import java.util.Set;
+
+/**
+ * Manages a {@link Scanner} instance and provides support for returning only a subset
+ * of the fields returned by the underlying {@code Scanner}.
+ */
+public class Tokenizer {
+
+ private final Scanner scanner;
+ private final Set<Integer> indices;
+ private final boolean keep;
+ private int current;
+
+ /**
+ * Create a new {@code Tokenizer} instance.
+ *
+ * @param scanner The scanner to manage
+ * @param indices The indices to keep/drop
+ * @param keep Whether the indices should be kept (true) or dropped (false)
+ */
+ public Tokenizer(Scanner scanner, Set<Integer> indices, boolean keep) {
+ this.scanner = scanner;
+ this.indices = checkIndices(indices);
+ this.keep = keep;
+ this.current = -1;
+ }
+
+ private static Set<Integer> checkIndices(Set<Integer> indices) {
+ for (Integer index : indices) {
+ if (index < 0) {
+ throw new IllegalArgumentException("All tokenizer indices must be non-negative");
+ }
+ }
+ return indices;
+ }
+
+ private void advance() {
+ if (indices.isEmpty()) {
+ return;
+ }
+ current++;
+ while (scanner.hasNext() &&
+ (keep && !indices.contains(current)) || (!keep && indices.contains(current))) {
+ scanner.next();
+ current++;
+ }
+ }
+
+ /**
+ * Returns true if the underlying {@code Scanner} has any tokens remaining.
+ */
+ public boolean hasNext() {
+ return scanner.hasNext();
+ }
+
+ /**
+ * Advance this {@code Tokenizer} and return the next String from the {@code Scanner}.
+ *
+ * @return The next String from the {@code Scanner}
+ */
+ public String next() {
+ advance();
+ return scanner.next();
+ }
+
+ /**
+ * Advance this {@code Tokenizer} and return the next Long from the {@code Scanner}.
+ *
+ * @return The next Long from the {@code Scanner}
+ */
+ public Long nextLong() {
+ advance();
+ return scanner.nextLong();
+ }
+
+ /**
+ * Advance this {@code Tokenizer} and return the next Boolean from the {@code Scanner}.
+ *
+ * @return The next Boolean from the {@code Scanner}
+ */
+ public Boolean nextBoolean() {
+ advance();
+ return scanner.nextBoolean();
+ }
+
+ /**
+ * Advance this {@code Tokenizer} and return the next Double from the {@code Scanner}.
+ *
+ * @return The next Double from the {@code Scanner}
+ */
+ public Double nextDouble() {
+ advance();
+ return scanner.nextDouble();
+ }
+
+ /**
+ * Advance this {@code Tokenizer} and return the next Float from the {@code Scanner}.
+ *
+ * @return The next Float from the {@code Scanner}
+ */
+ public Float nextFloat() {
+ advance();
+ return scanner.nextFloat();
+ }
+
+ /**
+ * Advance this {@code Tokenizer} and return the next Integer from the {@code Scanner}.
+ *
+ * @return The next Integer from the {@code Scanner}
+ */
+ public Integer nextInt() {
+ advance();
+ return scanner.nextInt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java
new file mode 100644
index 0000000..f43478d
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import java.io.Serializable;
+import java.util.Locale;
+import java.util.Scanner;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Factory class that constructs {@link Tokenizer} instances for input strings that use a fixed
+ * set of delimiters, skip patterns, locales, and sets of indices to keep or drop.
+ */
+public class TokenizerFactory implements Serializable {
+
+ private static TokenizerFactory DEFAULT_INSTANCE = new TokenizerFactory(null, null, null,
+ ImmutableSet.<Integer>of(), true);
+
+ private final String delim;
+ private final String skip;
+ private final Locale locale;
+ private final Set<Integer> indices;
+ private final boolean keep;
+
+ /**
+ * Returns a default {@code TokenizerFactory} that uses whitespace as a delimiter and does
+ * not skip any input fields.
+ * @return The default {@code TokenizerFactory}
+ */
+ public static TokenizerFactory getDefaultInstance() { return DEFAULT_INSTANCE; }
+
+ private TokenizerFactory(String delim, String skip, Locale locale,
+ Set<Integer> indices, boolean keep) {
+ this.delim = delim;
+ this.skip = skip;
+ this.locale = locale;
+ this.indices = indices;
+ this.keep = keep;
+ }
+
+ /**
+ * Return a {@code Scanner} instance that wraps the input string and uses the delimiter,
+ * skip, and locale settings for this {@code TokenizerFactory} instance.
+ *
+ * @param input The input string
+ * @return A new {@code Scanner} instance with appropriate settings
+ */
+ public Tokenizer create(String input) {
+ Scanner s = new Scanner(input);
+ if (delim != null) {
+ s.useDelimiter(delim);
+ }
+ if (skip != null) {
+ s.skip(skip);
+ }
+ if (locale != null) {
+ s.useLocale(locale);
+ }
+ return new Tokenizer(s, indices, keep);
+ }
+
+ /**
+ * Factory method for creating a {@code TokenizerFactory.Builder} instance.
+ * @return A new {@code TokenizerFactory.Builder}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * A class for constructing new {@code TokenizerFactory} instances using the Builder pattern.
+ */
+ public static class Builder {
+ private String delim;
+ private String skip;
+ private Locale locale;
+ private Set<Integer> indices = ImmutableSet.of();
+ private boolean keep;
+
+ /**
+ * Sets the delimiter used by the {@code TokenizerFactory} instances constructed by
+ * this instance.
+ * @param delim The delimiter to use, which may be a regular expression
+ * @return This {@code Builder} instance
+ */
+ public Builder delimiter(String delim) {
+ this.delim = delim;
+ return this;
+ }
+
+ /**
+ * Sets the regular expression that determines which input characters should be
+ * ignored by the {@code Scanner} that is returned by the constructed
+ * {@code TokenizerFactory}.
+ *
+ * @param skip The regular expression of input values to ignore
+ * @return This {@code Builder} instance
+ */
+ public Builder skip(String skip) {
+ this.skip = skip;
+ return this;
+ }
+
+ /**
+ * Sets the {@code Locale} to use with the {@code TokenizerFactory} returned by
+ * this {@code Builder} instance.
+ *
+ * @param locale The locale to use
+ * @return This {@code Builder} instance
+ */
+ public Builder locale(Locale locale) {
+ this.locale = locale;
+ return this;
+ }
+
+ /**
+ * Keep only the specified fields found by the input scanner, counting from
+ * zero.
+ *
+ * @param indices The indices to keep
+ * @return This {@code Builder} instance
+ */
+ public Builder keep(Integer... indices) {
+ Preconditions.checkArgument(this.indices.isEmpty(),
+ "Cannot set keep indices more than once");
+ this.indices = ImmutableSet.copyOf(indices);
+ this.keep = true;
+ return this;
+ }
+
+ /**
+ * Drop the specified fields found by the input scanner, counting from zero.
+ *
+ * @param indices The indices to drop
+ * @return This {@code Builder} instance
+ */
+ public Builder drop(Integer... indices) {
+ Preconditions.checkArgument(this.indices.isEmpty(),
+ "Cannot set drop indices more than once");
+ this.indices = ImmutableSet.copyOf(indices);
+ this.keep = false;
+ return this;
+ }
+
+ /**
+ * Returns a new {@code TokenizerFactory} with settings determined by this
+ * {@code Builder} instance.
+ * @return A new {@code TokenizerFactory}
+ */
+ public TokenizerFactory build() {
+ return new TokenizerFactory(delim, skip, locale, indices, keep);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java b/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java
new file mode 100644
index 0000000..4da7521
--- /dev/null
+++ b/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.contrib.text;
+
+import static org.apache.crunch.contrib.text.Extractors.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.contrib.text.Parse;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ *
+ */
+public class ParseTest {
+
+ @Test
+ public void testInt() {
+ assertEquals(Integer.valueOf(1729), xint().extract("1729"));
+ assertEquals(Integer.valueOf(321), xint(321).extract("foo"));
+ }
+
+ @Test
+ public void testString() {
+ assertEquals("bar", xstring().extract("bar"));
+ }
+
+ @Test
+ public void testPairWithDrop() {
+ TokenizerFactory sf = TokenizerFactory.builder().delimiter(",").drop(0, 2).build();
+ assertEquals(Pair.of(1, "abc"), xpair(sf, xint(), xstring()).extract("foo,1,17.29,abc"));
+ }
+
+ @Test
+ public void testTripsWithSkip() {
+ TokenizerFactory sf = TokenizerFactory.builder().delimiter(";").skip("^foo").build();
+ assertEquals(Tuple3.of(17, "abc", 3.4f),
+ xtriple(sf, xint(), xstring(), xfloat()).extract("foo17;abc;3.4"));
+ }
+
+ @Test
+ public void testTripsWithKeep() {
+ TokenizerFactory sf = TokenizerFactory.builder().delimiter(";").keep(1, 2, 3).build();
+ assertEquals(Tuple3.of(17, "abc", 3.4f),
+ xtriple(sf, xint(), xstring(), xfloat()).extract("foo;17;abc;3.4"));
+ }
+
+ @Test
+ public void testQuadsWithWhitespace() {
+ TokenizerFactory sf = TokenizerFactory.getDefaultInstance();
+ assertEquals(Tuple4.of(1.3, "foo", true, 1L),
+ xquad(sf, xdouble(), xstring(), xboolean(), xlong()).extract("1.3 foo true 1"));
+ }
+
+ @Test
+ public void testTupleN() {
+ TokenizerFactory sf = TokenizerFactory.builder().delimiter(",").build();
+ assertEquals(new TupleN(1, false, true, 2, 3),
+ xtupleN(sf, xint(), xboolean(), xboolean(), xint(), xint()).extract("1,false,true,2,3"));
+ }
+
+ @Test
+ public void testCollections() {
+ TokenizerFactory sf = TokenizerFactory.builder().delimiter(";").build();
+ // Use 3000 as the default for values we can't parse
+ Extractor<Collection<Integer>> x = xcollect(sf, xint(3000));
+
+ assertEquals(ImmutableList.of(1, 2, 3), x.extract("1;2;3"));
+ assertFalse(x.errorOnLastRecord());
+ assertEquals(ImmutableList.of(17, 29, 3000), x.extract("17;29;a"));
+ assertTrue(x.errorOnLastRecord());
+ assertEquals(1, x.getStats().getErrorCount());
+ }
+
+ @Test
+ public void testNestedComposites() {
+ TokenizerFactory outer = TokenizerFactory.builder().delimiter(";").build();
+ TokenizerFactory inner = TokenizerFactory.builder().delimiter(",").build();
+ Extractor<Pair<Pair<Long, Integer>, Tuple3<String, Integer, Float>>> extractor =
+ xpair(outer, xpair(inner, xlong(), xint()), xtriple(inner, xstring(), xint(), xfloat()));
+ assertEquals(Pair.of(Pair.of(1L, 2), Tuple3.of("a", 17, 29f)),
+ extractor.extract("1,2;a,17,29"));
+ }
+
+ @Test
+ public void testParse() {
+ TokenizerFactory sf = TokenizerFactory.builder().delimiter(",").build();
+ PCollection<String> lines = MemPipeline.typedCollectionOf(Avros.strings(), "1,3.0");
+ Iterable<Pair<Integer, Float>> it = Parse.parse("test", lines,
+ xpair(sf, xint(), xfloat())).materialize();
+ assertEquals(ImmutableList.of(Pair.of(1, 3.0f)), it);
+ }
+}