You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/03 00:25:01 UTC
git commit: CRUNCH-29: Add a string concatenating aggregator for use
with CombineFns. Contributed by Gauthier Ambard.
Updated Branches:
refs/heads/master fbc741f10 -> a7b98a9e0
CRUNCH-29: Add a string concatenating aggregator for use with CombineFns. Contributed by Gauthier Ambard.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a7b98a9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a7b98a9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a7b98a9e
Branch: refs/heads/master
Commit: a7b98a9e0ffc50d6bd2ae643607a33d1f3de72d3
Parents: fbc741f
Author: jwills <jw...@apache.org>
Authored: Wed Aug 1 09:42:26 2012 -0700
Committer: jwills <jw...@apache.org>
Committed: Thu Aug 2 15:09:49 2012 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/crunch/CombineFn.java | 48 +++++++++++++++
.../test/java/org/apache/crunch/CombineFnTest.java | 23 +++++++
2 files changed, 71 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a7b98a9e/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
index fe65d74..27183a9 100644
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -25,6 +25,7 @@ import java.util.SortedSet;
import org.apache.crunch.util.Tuples;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -334,6 +335,22 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
return aggregator(new LastNAggregator<V>(n));
}
+
+ /**
+ * Used to concatenate strings, with a separator between each strings.
+ *
+ * @param separator
+ * the separator which will be appended between each string
+ * @param skipNull
+ * define if we should skip null values. Throw
+ * NullPointerException if set to false and there is a null
+ * value.
+ * @return
+ */
+ public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull) {
+ return aggregator(new StringConcatAggregator(separator, skipNull));
+ }
+
public static class SumLongs implements Aggregator<Long> {
private long sum = 0;
@@ -849,4 +866,35 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
return ImmutableList.copyOf(elements);
}
}
+
+ public static class StringConcatAggregator implements Aggregator<String> {
+ private final String separator;
+ private final boolean skipNulls;
+ private final LinkedList<String> list = new LinkedList<String>();
+
+ private transient Joiner joiner;
+
+ public StringConcatAggregator(final String separator, final boolean skipNulls) {
+ this.separator = separator;
+ this.skipNulls = skipNulls;
+ }
+
+ @Override
+ public void reset() {
+ if (joiner == null) {
+ joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
+ }
+ list.clear();
+ }
+
+ @Override
+ public void update(final String next) {
+ list.add(next);
+ }
+
+ @Override
+ public Iterable<String> results() {
+ return ImmutableList.of(joiner.join(list));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a7b98a9e/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
index 4f08bbe..82bdf00 100644
--- a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
+++ b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
@@ -35,6 +35,7 @@ import static org.apache.crunch.CombineFn.SUM_LONGS;
import static org.junit.Assert.assertEquals;
import java.math.BigInteger;
+import java.util.Arrays;
import java.util.List;
import org.apache.crunch.CombineFn.Aggregator;
@@ -45,6 +46,7 @@ import org.apache.crunch.CombineFn.MaxNAggregator;
import org.apache.crunch.CombineFn.MinNAggregator;
import org.apache.crunch.CombineFn.PairAggregator;
import org.apache.crunch.CombineFn.QuadAggregator;
+import org.apache.crunch.CombineFn.StringConcatAggregator;
import org.apache.crunch.CombineFn.TripAggregator;
import org.apache.crunch.CombineFn.TupleNAggregator;
import org.junit.Test;
@@ -183,4 +185,25 @@ public class CombineFnTest {
MIN_DOUBLES.create(), MAX_LONGS.create());
assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L), Iterables.getOnlyElement(applyAggregator(a, input)));
}
+
+ @Test
+ public void testConcatenation() {
+ String[] arrayNull = new String[] { null, "" };
+ assertEquals(ImmutableList.of("foofoobarbar"), applyAggregator(
+ new StringConcatAggregator("", true), ImmutableList.of("foo", "foobar", "bar")));
+
+ assertEquals(ImmutableList.of("foo/foobar/bar"), applyAggregator(
+ new StringConcatAggregator("/", false), ImmutableList.of("foo", "foobar", "bar")));
+ assertEquals(ImmutableList.of(" "), applyAggregator(
+ new StringConcatAggregator(" ", true), ImmutableList.of(" ", "")));
+ assertEquals(ImmutableList.of(""), applyAggregator(
+ new StringConcatAggregator(" ", true), Arrays.asList(arrayNull)));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testConcatenationNullException() {
+ String[] arrayNull = new String[] { null, "" };
+ assertEquals(ImmutableList.of(""), applyAggregator(
+ new StringConcatAggregator(" ", false), Arrays.asList(arrayNull)));
+ }
}