You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/03 00:25:01 UTC

git commit: CRUNCH-29: Add a string concatenating aggregator for use with CombineFns. Contributed by Gauthier Ambard.

Updated Branches:
  refs/heads/master fbc741f10 -> a7b98a9e0


CRUNCH-29: Add a string concatenating aggregator for use with CombineFns. Contributed by Gauthier Ambard.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a7b98a9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a7b98a9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a7b98a9e

Branch: refs/heads/master
Commit: a7b98a9e0ffc50d6bd2ae643607a33d1f3de72d3
Parents: fbc741f
Author: jwills <jw...@apache.org>
Authored: Wed Aug 1 09:42:26 2012 -0700
Committer: jwills <jw...@apache.org>
Committed: Thu Aug 2 15:09:49 2012 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/crunch/CombineFn.java |   48 +++++++++++++++
 .../test/java/org/apache/crunch/CombineFnTest.java |   23 +++++++
 2 files changed, 71 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a7b98a9e/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
index fe65d74..27183a9 100644
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -25,6 +25,7 @@ import java.util.SortedSet;
 
 import org.apache.crunch.util.Tuples;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -334,6 +335,22 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     return aggregator(new LastNAggregator<V>(n));
   }
 
+
+  /**
+   * Used to concatenate strings, with a separator between each strings.
+   * 
+   * @param separator
+   *            the separator which will be appended between each string
+   * @param skipNull
+   *            define if we should skip null values. Throw
+   *            NullPointerException if set to false and there is a null
+   *            value.
+   * @return
+   */
+  public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull) {
+    return aggregator(new StringConcatAggregator(separator, skipNull));
+  }
+
   public static class SumLongs implements Aggregator<Long> {
     private long sum = 0;
 
@@ -849,4 +866,35 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
       return ImmutableList.copyOf(elements);
     }
   }
+
+  public static class StringConcatAggregator implements Aggregator<String> {
+    private final String separator;
+    private final boolean skipNulls;
+    private final LinkedList<String> list = new LinkedList<String>();
+
+    private transient Joiner joiner;
+    
+    public StringConcatAggregator(final String separator, final boolean skipNulls) {
+      this.separator = separator;
+      this.skipNulls = skipNulls;
+    }
+
+    @Override
+    public void reset() {
+      if (joiner == null) {
+        joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
+      }
+      list.clear();
+    }
+
+    @Override
+    public void update(final String next) {
+      list.add(next);
+    }
+
+    @Override
+    public Iterable<String> results() {
+      return ImmutableList.of(joiner.join(list));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a7b98a9e/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
index 4f08bbe..82bdf00 100644
--- a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
+++ b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
@@ -35,6 +35,7 @@ import static org.apache.crunch.CombineFn.SUM_LONGS;
 import static org.junit.Assert.assertEquals;
 
 import java.math.BigInteger;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.crunch.CombineFn.Aggregator;
@@ -45,6 +46,7 @@ import org.apache.crunch.CombineFn.MaxNAggregator;
 import org.apache.crunch.CombineFn.MinNAggregator;
 import org.apache.crunch.CombineFn.PairAggregator;
 import org.apache.crunch.CombineFn.QuadAggregator;
+import org.apache.crunch.CombineFn.StringConcatAggregator;
 import org.apache.crunch.CombineFn.TripAggregator;
 import org.apache.crunch.CombineFn.TupleNAggregator;
 import org.junit.Test;
@@ -183,4 +185,25 @@ public class CombineFnTest {
         MIN_DOUBLES.create(), MAX_LONGS.create());
     assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L), Iterables.getOnlyElement(applyAggregator(a, input)));
   }
+
+  @Test
+  public void testConcatenation() {
+    String[] arrayNull = new String[] { null, "" };
+    assertEquals(ImmutableList.of("foofoobarbar"), applyAggregator(
+        new StringConcatAggregator("", true), ImmutableList.of("foo", "foobar", "bar")));
+
+    assertEquals(ImmutableList.of("foo/foobar/bar"), applyAggregator(
+        new StringConcatAggregator("/", false), ImmutableList.of("foo", "foobar", "bar")));
+    assertEquals(ImmutableList.of("  "), applyAggregator(
+        new StringConcatAggregator(" ", true), ImmutableList.of(" ", "")));
+    assertEquals(ImmutableList.of(""), applyAggregator(
+        new StringConcatAggregator(" ", true), Arrays.asList(arrayNull)));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testConcatenationNullException() {
+    String[] arrayNull = new String[] { null, "" };
+    assertEquals(ImmutableList.of(""), applyAggregator(
+        new StringConcatAggregator(" ", false), Arrays.asList(arrayNull)));
+  }
 }