You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/09/27 20:45:18 UTC

crunch git commit: CRUNCH-563: Add support for BigDecimal aggregators. Contributed by Vasu Doppalapudi.

Repository: crunch
Updated Branches:
  refs/heads/master e884bf238 -> f629b8e6b


CRUNCH-563: Add support for BigDecimal aggregators. Contributed by Vasu Doppalapudi.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f629b8e6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f629b8e6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f629b8e6

Branch: refs/heads/master
Commit: f629b8e6bc1bf9f0fc3f5f6e3c5d28a01c593855
Parents: e884bf2
Author: Josh Wills <jw...@apache.org>
Authored: Sun Sep 27 11:45:03 2015 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Sep 27 11:45:03 2015 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/WordCountIT.java  | 41 ++++++++++++-
 crunch-core/src/it/resources/bigdecimal.txt     |  6 ++
 .../java/org/apache/crunch/fn/Aggregators.java  | 64 ++++++++++++++++++++
 .../java/org/apache/crunch/types/PTypes.java    | 40 ++++++++++++
 .../org/apache/crunch/fn/AggregatorsTest.java   | 21 +++++++
 5 files changed, 170 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
index c4e1d58..4c77c41 100644
--- a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
@@ -34,8 +35,10 @@ import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -68,7 +71,6 @@ public class WordCountIT {
 
   public static PTable<String, Long> substr(PTable<String, Long> ptable) {
     return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-
       @Override
       public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
         if (!input.first().isEmpty()) {
@@ -77,9 +79,19 @@ public class WordCountIT {
       }
     }, ptable.getPTableType());
   }
+  
+  public static PTable<String, BigDecimal> convDecimal(PCollection<String> ptable) {
+    return ptable.parallelDo(new DoFn<String, Pair<String, BigDecimal>>() {
+      @Override
+      public void process(String input, Emitter<Pair<String, BigDecimal>> emitter) {
+        emitter.emit(Pair.of(input.split("~")[0], new BigDecimal(input.split("~")[1])));
+      }
+    }, Writables.tableOf(Writables.strings(), PTypes.bigDecimal(WritableTypeFamily.getInstance())));
+  }
 
   private boolean runSecond = false;
   private boolean useToOutput = false;
+  private boolean testBigDecimal = false;
 
   @Test
   public void testWritables() throws IOException {
@@ -98,6 +110,14 @@ public class WordCountIT {
     useToOutput = true;
     run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
   }
+  
+  @Test
+  public void testWritablesForBigDecimal() throws IOException {
+    runSecond = false;
+    useToOutput = true;
+    testBigDecimal = true;
+    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  }
 
   @Test
   public void testAvro() throws IOException {
@@ -149,10 +169,23 @@ public class WordCountIT {
       PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(Aggregators.SUM_LONGS());
       pipeline.writeTextFile(we, substrPath);
     }
+    
+    PTable<String, BigDecimal> bd = null;
+    if (testBigDecimal) {
+      String decimalInputPath = tmpDir.copyResourceFileName("bigdecimal.txt");
+	  PCollection<String> testBd = pipeline.read(At.textFile(decimalInputPath, typeFamily.strings()));
+      bd = convDecimal(testBd).groupByKey().combineValues(Aggregators.SUM_BIGDECIMALS());
+    }
+    
     PipelineResult res = pipeline.done();
     assertTrue(res.succeeded());
     List<PipelineResult.StageResult> stageResults = res.getStageResults();
-    if (runSecond) {
+    if (testBigDecimal) {
+      assertEquals(1, stageResults.size());
+      assertEquals(
+          ImmutableList.of(Pair.of("A", bigDecimal("3.579")), Pair.of("B", bigDecimal("11.579")),
+          Pair.of("C", bigDecimal("15.642"))), Lists.newArrayList(bd.materialize()));
+    } else if (runSecond) {
       assertEquals(2, stageResults.size());
     } else {
       assertEquals(1, stageResults.size());
@@ -170,4 +203,8 @@ public class WordCountIT {
     }
     assertTrue(passed);
   }
+  
+  private static BigDecimal bigDecimal(String value) {
+    return new BigDecimal(value);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/it/resources/bigdecimal.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/bigdecimal.txt b/crunch-core/src/it/resources/bigdecimal.txt
new file mode 100644
index 0000000..f4712d4
--- /dev/null
+++ b/crunch-core/src/it/resources/bigdecimal.txt
@@ -0,0 +1,6 @@
+A~1.234
+B~2.345
+C~7.321
+A~2.345
+B~9.234
+C~8.321
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
index c5b0c21..cca3ddb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.fn;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.LinkedList;
 import java.util.List;
@@ -109,6 +110,14 @@ public final class Aggregators {
   public static Aggregator<BigInteger> SUM_BIGINTS() {
     return new SumBigInts();
   }
+  
+  /**
+   * Sum up all {@link BigDecimal} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigDecimal> SUM_BIGDECIMALS() {
+    return new SumBigDecimals();
+  }
 
   /**
    * Return the maximum of all given {@link Comparable} values.
@@ -207,6 +216,24 @@ public final class Aggregators {
   public static Aggregator<BigInteger> MAX_BIGINTS(int n) {
     return new MaxNAggregator<BigInteger>(n);
   }
+  
+  /**
+   * Return the maximum of all given {@link BigDecimal} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigDecimal> MAX_BIGDECIMALS() {
+    return new MaxComparables<BigDecimal>();
+  }
+
+  /**
+   * Return the {@code n} largest {@link BigDecimal} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigDecimal> MAX_BIGDECIMALS(int n) {
+    return new MaxNAggregator<BigDecimal>(n);
+  }
 
   /**
    * Return the {@code n} largest values (or fewer if there are fewer
@@ -327,6 +354,24 @@ public final class Aggregators {
   public static Aggregator<BigInteger> MIN_BIGINTS(int n) {
     return new MinNAggregator<BigInteger>(n);
   }
+  
+  /**
+   * Return the minimum of all given {@link BigDecimal} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigDecimal> MIN_BIGDECIMALS() {
+    return new MinComparables<BigDecimal>();
+  }
+
+  /**
+   * Return the {@code n} smallest {@link BigDecimal} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigDecimal> MIN_BIGDECIMALS(int n) {
+    return new MinNAggregator<BigDecimal>(n);
+  }
 
   /**
    * Return the {@code n} smallest values (or fewer if there are fewer
@@ -637,6 +682,25 @@ public final class Aggregators {
       return ImmutableList.of(sum);
     }
   }
+  
+  private static class SumBigDecimals extends SimpleAggregator<BigDecimal> {
+    private BigDecimal sum = BigDecimal.ZERO;
+
+    @Override
+    public void reset() {
+      sum = BigDecimal.ZERO;
+    }
+
+    @Override
+    public void update(BigDecimal next) {
+      sum = sum.add(next);
+    }
+
+    @Override
+    public Iterable<BigDecimal> results() {
+      return ImmutableList.of(sum);
+    }
+  }
 
   private static class MaxComparables<C extends Comparable<C>> extends SimpleAggregator<C> {
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
index 82604ac..8715e28 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.types;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.UUID;
@@ -49,6 +50,13 @@ public class PTypes {
   public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
     return typeFamily.derivedImmutable(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
   }
+  
+  /**
+   * A PType for Java's {@link BigDecimal} type.
+   */
+  public static PType<BigDecimal> bigDecimal(PTypeFamily typeFamily) {
+    return typeFamily.derivedImmutable(BigDecimal.class, BYTE_TO_BIGDECIMAL, BIGDECIMAL_TO_BYTE, typeFamily.bytes());
+  }
 
   /**
    * A PType for Java's {@link UUID} type.
@@ -115,6 +123,20 @@ public class PTypes {
     }
   };
 
+  public static final MapFn<ByteBuffer, BigDecimal> BYTE_TO_BIGDECIMAL = new MapFn<ByteBuffer, BigDecimal>() {
+    @Override
+    public BigDecimal map(ByteBuffer input) {
+      return input == null ? null : byteBufferToBigDecimal(input);
+    }
+  };
+
+  public static final MapFn<BigDecimal, ByteBuffer> BIGDECIMAL_TO_BYTE = new MapFn<BigDecimal, ByteBuffer>() {
+    @Override
+    public ByteBuffer map(BigDecimal input) {
+      return input == null ? null : bigDecimalToByteBuffer(input);
+    }
+  };
+
   private static class JacksonInputMapFn<T> extends MapFn<String, T> {
 
     private final Class<T> clazz;
@@ -298,4 +320,22 @@ public class PTypes {
       return bb;
     }
   };
+
+  private static BigDecimal byteBufferToBigDecimal(ByteBuffer input) {
+    int scale = input.getInt();
+    byte[] bytes = new byte[input.remaining()];
+    input.get(bytes, 0, input.remaining());
+    BigInteger bi = new BigInteger(bytes);
+    BigDecimal bigDecValue = new BigDecimal(bi, scale);
+    return bigDecValue;
+  }
+
+  private static ByteBuffer bigDecimalToByteBuffer(BigDecimal input) {
+    byte[] unScaledBytes = input.unscaledValue().toByteArray();
+    byte[] scaleBytes = ByteBuffer.allocate(4).putInt(input.scale()).array();
+    byte[] bytes = new byte[scaleBytes.length + unScaledBytes.length];
+    System.arraycopy(scaleBytes, 0, bytes, 0, scaleBytes.length);
+    System.arraycopy(unScaledBytes, 0, bytes, scaleBytes.length, unScaledBytes.length);
+    return ByteBuffer.wrap(bytes);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
index 57dc8f0..973cbb1 100644
--- a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.List;
@@ -51,6 +52,7 @@ public class AggregatorsTest {
     assertThat(sapply(SUM_FLOATS(), 1f, 2f, 3f, -4f), is(2f));
     assertThat(sapply(SUM_DOUBLES(), 0.1, 0.2, 0.3), is(closeTo(0.6, 0.00001)));
     assertThat(sapply(SUM_BIGINTS(), bigInt("7"), bigInt("3")), is(bigInt("10")));
+    assertThat(sapply(SUM_BIGDECIMALS(), bigDecimal("1.122"), bigDecimal("0.654")), is(bigDecimal("1.776")));
   }
 
   @Test
@@ -61,6 +63,7 @@ public class AggregatorsTest {
     assertThat(sapply(SUM_FLOATS(), 29f, 17f, 1729f), is(1775.0f));
     assertThat(sapply(SUM_DOUBLES(), 29.0, 17.0, 1729.0), is(1775.0));
     assertThat(sapply(SUM_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1775")));
+    assertThat(sapply(SUM_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("1777.739")));
   }
 
   @Test
@@ -71,6 +74,7 @@ public class AggregatorsTest {
     assertThat(sapply(MAX_DOUBLES(), 29.0, 17.0, 1729.0), is(1729.0));
     assertThat(sapply(MAX_FLOATS(), 29f, 1745f, 17f, 1729f), is(1745.0f));
     assertThat(sapply(MAX_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1729")));
+    assertThat(sapply(MAX_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("1729.876")));
     assertThat(sapply(Aggregators.<String>MAX_COMPARABLES(), "b", "a", "d", "c"), is("d"));
   }
 
@@ -82,6 +86,7 @@ public class AggregatorsTest {
     assertThat(sapply(MIN_DOUBLES(), 29.0, 17.0, 1729.0), is(17.0));
     assertThat(sapply(MIN_INTS(), 29, 170, 1729), is(29));
     assertThat(sapply(MIN_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("17")));
+    assertThat(sapply(MIN_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("17.876")));
     assertThat(sapply(Aggregators.<String>MIN_COMPARABLES(), "b", "a", "d", "c"), is("a"));
   }
 
@@ -153,8 +158,14 @@ public class AggregatorsTest {
         Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
     Aggregator<Tuple3<Float, Double, Double>> a = Aggregators.tripAggregator(
         MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES());
+    
+    List<Tuple3<Float, BigDecimal, BigDecimal>> input1 = ImmutableList.of(Tuple3.of(17.29f, bigDecimal("12.2"), bigDecimal("0.1")),
+        Tuple3.of(3.0f, bigDecimal("1.2"), bigDecimal("3.14")), Tuple3.of(-1.0f, bigDecimal("14.5"), bigDecimal("-0.98")));
+    Aggregator<Tuple3<Float, BigDecimal, BigDecimal>> b = Aggregators.tripAggregator(
+        MAX_FLOATS(), MAX_BIGDECIMALS(), MIN_BIGDECIMALS());
 
     assertThat(sapply(a, input), is(Tuple3.of(17.29f, 14.5, -0.98)));
+    assertThat(sapply(b, input1), is(Tuple3.of(17.29f, bigDecimal("14.5"), bigDecimal("-0.98"))));
   }
 
   @Test
@@ -163,8 +174,14 @@ public class AggregatorsTest {
         Tuple4.of(3.0f, 1.2, 3.14, 2), Tuple4.of(-1.0f, 14.5, -0.98, 3));
     Aggregator<Tuple4<Float, Double, Double, Integer>> a = Aggregators.quadAggregator(
         MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS());
+    
+    List<Tuple4<BigDecimal, Double, Double, Integer>> input1 = ImmutableList.of(Tuple4.of(bigDecimal("17.29"), 12.2, 0.1, 1),
+        Tuple4.of(bigDecimal("3.0"), 1.2, 3.14, 2), Tuple4.of(bigDecimal("-1.0"), 14.5, -0.98, 3));
+    Aggregator<Tuple4<BigDecimal, Double, Double, Integer>> b = Aggregators.quadAggregator(
+        MAX_BIGDECIMALS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS());
 
     assertThat(sapply(a, input), is(Tuple4.of(17.29f, 14.5, -0.98, 6)));
+    assertThat(sapply(b, input1), is(Tuple4.of(bigDecimal("17.29"), 14.5, -0.98, 6)));
   }
 
   @Test
@@ -233,4 +250,8 @@ public class AggregatorsTest {
   private static BigInteger bigInt(String value) {
     return new BigInteger(value);
   }
+  
+  private static BigDecimal bigDecimal(String value) {
+    return new BigDecimal(value);
+  }
 }