You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/09/27 20:45:18 UTC
crunch git commit: CRUNCH-563: Add support for BigDecimal
aggregators. Contributed by Vasu Doppalapudi.
Repository: crunch
Updated Branches:
refs/heads/master e884bf238 -> f629b8e6b
CRUNCH-563: Add support for BigDecimal aggregators. Contributed by Vasu Doppalapudi.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f629b8e6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f629b8e6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f629b8e6
Branch: refs/heads/master
Commit: f629b8e6bc1bf9f0fc3f5f6e3c5d28a01c593855
Parents: e884bf2
Author: Josh Wills <jw...@apache.org>
Authored: Sun Sep 27 11:45:03 2015 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Sep 27 11:45:03 2015 -0700
----------------------------------------------------------------------
.../it/java/org/apache/crunch/WordCountIT.java | 41 ++++++++++++-
crunch-core/src/it/resources/bigdecimal.txt | 6 ++
.../java/org/apache/crunch/fn/Aggregators.java | 64 ++++++++++++++++++++
.../java/org/apache/crunch/types/PTypes.java | 40 ++++++++++++
.../org/apache/crunch/fn/AggregatorsTest.java | 21 +++++++
5 files changed, 170 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
index c4e1d58..4c77c41 100644
--- a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
@@ -34,8 +35,10 @@ import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypes;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
import org.junit.Rule;
import org.junit.Test;
@@ -68,7 +71,6 @@ public class WordCountIT {
public static PTable<String, Long> substr(PTable<String, Long> ptable) {
return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-
@Override
public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
if (!input.first().isEmpty()) {
@@ -77,9 +79,19 @@ public class WordCountIT {
}
}, ptable.getPTableType());
}
+
+ public static PTable<String, BigDecimal> convDecimal(PCollection<String> ptable) {
+ return ptable.parallelDo(new DoFn<String, Pair<String, BigDecimal>>() {
+ @Override
+ public void process(String input, Emitter<Pair<String, BigDecimal>> emitter) {
+ emitter.emit(Pair.of(input.split("~")[0], new BigDecimal(input.split("~")[1])));
+ }
+ }, Writables.tableOf(Writables.strings(), PTypes.bigDecimal(WritableTypeFamily.getInstance())));
+ }
private boolean runSecond = false;
private boolean useToOutput = false;
+ private boolean testBigDecimal = false;
@Test
public void testWritables() throws IOException {
@@ -98,6 +110,14 @@ public class WordCountIT {
useToOutput = true;
run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
}
+
+ @Test
+ public void testWritablesForBigDecimal() throws IOException {
+ runSecond = false;
+ useToOutput = true;
+ testBigDecimal = true;
+ run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+ }
@Test
public void testAvro() throws IOException {
@@ -149,10 +169,23 @@ public class WordCountIT {
PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(Aggregators.SUM_LONGS());
pipeline.writeTextFile(we, substrPath);
}
+
+ PTable<String, BigDecimal> bd = null;
+ if (testBigDecimal) {
+ String decimalInputPath = tmpDir.copyResourceFileName("bigdecimal.txt");
+ PCollection<String> testBd = pipeline.read(At.textFile(decimalInputPath, typeFamily.strings()));
+ bd = convDecimal(testBd).groupByKey().combineValues(Aggregators.SUM_BIGDECIMALS());
+ }
+
PipelineResult res = pipeline.done();
assertTrue(res.succeeded());
List<PipelineResult.StageResult> stageResults = res.getStageResults();
- if (runSecond) {
+ if (testBigDecimal) {
+ assertEquals(1, stageResults.size());
+ assertEquals(
+ ImmutableList.of(Pair.of("A", bigDecimal("3.579")), Pair.of("B", bigDecimal("11.579")),
+ Pair.of("C", bigDecimal("15.642"))), Lists.newArrayList(bd.materialize()));
+ } else if (runSecond) {
assertEquals(2, stageResults.size());
} else {
assertEquals(1, stageResults.size());
@@ -170,4 +203,8 @@ public class WordCountIT {
}
assertTrue(passed);
}
+
+ private static BigDecimal bigDecimal(String value) {
+ return new BigDecimal(value);
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/it/resources/bigdecimal.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/bigdecimal.txt b/crunch-core/src/it/resources/bigdecimal.txt
new file mode 100644
index 0000000..f4712d4
--- /dev/null
+++ b/crunch-core/src/it/resources/bigdecimal.txt
@@ -0,0 +1,6 @@
+A~1.234
+B~2.345
+C~7.321
+A~2.345
+B~9.234
+C~8.321
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
index c5b0c21..cca3ddb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -17,6 +17,7 @@
*/
package org.apache.crunch.fn;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.LinkedList;
import java.util.List;
@@ -109,6 +110,14 @@ public final class Aggregators {
public static Aggregator<BigInteger> SUM_BIGINTS() {
return new SumBigInts();
}
+
+ /**
+ * Sum up all {@link BigDecimal} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigDecimal> SUM_BIGDECIMALS() {
+ return new SumBigDecimals();
+ }
/**
* Return the maximum of all given {@link Comparable} values.
@@ -207,6 +216,24 @@ public final class Aggregators {
public static Aggregator<BigInteger> MAX_BIGINTS(int n) {
return new MaxNAggregator<BigInteger>(n);
}
+
+ /**
+ * Return the maximum of all given {@link BigDecimal} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigDecimal> MAX_BIGDECIMALS() {
+ return new MaxComparables<BigDecimal>();
+ }
+
+ /**
+ * Return the {@code n} largest {@link BigDecimal} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigDecimal> MAX_BIGDECIMALS(int n) {
+ return new MaxNAggregator<BigDecimal>(n);
+ }
/**
* Return the {@code n} largest values (or fewer if there are fewer
@@ -327,6 +354,24 @@ public final class Aggregators {
public static Aggregator<BigInteger> MIN_BIGINTS(int n) {
return new MinNAggregator<BigInteger>(n);
}
+
+ /**
+ * Return the minimum of all given {@link BigDecimal} values.
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigDecimal> MIN_BIGDECIMALS() {
+ return new MinComparables<BigDecimal>();
+ }
+
+ /**
+ * Return the {@code n} smallest {@link BigDecimal} values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @return The newly constructed instance
+ */
+ public static Aggregator<BigDecimal> MIN_BIGDECIMALS(int n) {
+ return new MinNAggregator<BigDecimal>(n);
+ }
/**
* Return the {@code n} smallest values (or fewer if there are fewer
@@ -637,6 +682,25 @@ public final class Aggregators {
return ImmutableList.of(sum);
}
}
+
+ private static class SumBigDecimals extends SimpleAggregator<BigDecimal> {
+ private BigDecimal sum = BigDecimal.ZERO;
+
+ @Override
+ public void reset() {
+ sum = BigDecimal.ZERO;
+ }
+
+ @Override
+ public void update(BigDecimal next) {
+ sum = sum.add(next);
+ }
+
+ @Override
+ public Iterable<BigDecimal> results() {
+ return ImmutableList.of(sum);
+ }
+ }
private static class MaxComparables<C extends Comparable<C>> extends SimpleAggregator<C> {
http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
index 82604ac..8715e28 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
@@ -17,6 +17,7 @@
*/
package org.apache.crunch.types;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.UUID;
@@ -49,6 +50,13 @@ public class PTypes {
public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
return typeFamily.derivedImmutable(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
}
+
+ /**
+ * A PType for Java's {@link BigDecimal} type.
+ */
+ public static PType<BigDecimal> bigDecimal(PTypeFamily typeFamily) {
+ return typeFamily.derivedImmutable(BigDecimal.class, BYTE_TO_BIGDECIMAL, BIGDECIMAL_TO_BYTE, typeFamily.bytes());
+ }
/**
* A PType for Java's {@link UUID} type.
@@ -115,6 +123,20 @@ public class PTypes {
}
};
+ public static final MapFn<ByteBuffer, BigDecimal> BYTE_TO_BIGDECIMAL = new MapFn<ByteBuffer, BigDecimal>() {
+ @Override
+ public BigDecimal map(ByteBuffer input) {
+ return input == null ? null : byteBufferToBigDecimal(input);
+ }
+ };
+
+ public static final MapFn<BigDecimal, ByteBuffer> BIGDECIMAL_TO_BYTE = new MapFn<BigDecimal, ByteBuffer>() {
+ @Override
+ public ByteBuffer map(BigDecimal input) {
+ return input == null ? null : bigDecimalToByteBuffer(input);
+ }
+ };
+
private static class JacksonInputMapFn<T> extends MapFn<String, T> {
private final Class<T> clazz;
@@ -298,4 +320,22 @@ public class PTypes {
return bb;
}
};
+
+ private static BigDecimal byteBufferToBigDecimal(ByteBuffer input) {
+ int scale = input.getInt();
+ byte[] bytes = new byte[input.remaining()];
+ input.get(bytes, 0, input.remaining());
+ BigInteger bi = new BigInteger(bytes);
+ BigDecimal bigDecValue = new BigDecimal(bi, scale);
+ return bigDecValue;
+ }
+
+ private static ByteBuffer bigDecimalToByteBuffer(BigDecimal input) {
+ byte[] unScaledBytes = input.unscaledValue().toByteArray();
+ byte[] scaleBytes = ByteBuffer.allocate(4).putInt(input.scale()).array();
+ byte[] bytes = new byte[scaleBytes.length + unScaledBytes.length];
+ System.arraycopy(scaleBytes, 0, bytes, 0, scaleBytes.length);
+ System.arraycopy(unScaledBytes, 0, bytes, scaleBytes.length, unScaledBytes.length);
+ return ByteBuffer.wrap(bytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
index 57dc8f0..973cbb1 100644
--- a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
@@ -51,6 +52,7 @@ public class AggregatorsTest {
assertThat(sapply(SUM_FLOATS(), 1f, 2f, 3f, -4f), is(2f));
assertThat(sapply(SUM_DOUBLES(), 0.1, 0.2, 0.3), is(closeTo(0.6, 0.00001)));
assertThat(sapply(SUM_BIGINTS(), bigInt("7"), bigInt("3")), is(bigInt("10")));
+ assertThat(sapply(SUM_BIGDECIMALS(), bigDecimal("1.122"), bigDecimal("0.654")), is(bigDecimal("1.776")));
}
@Test
@@ -61,6 +63,7 @@ public class AggregatorsTest {
assertThat(sapply(SUM_FLOATS(), 29f, 17f, 1729f), is(1775.0f));
assertThat(sapply(SUM_DOUBLES(), 29.0, 17.0, 1729.0), is(1775.0));
assertThat(sapply(SUM_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1775")));
+ assertThat(sapply(SUM_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("1777.739")));
}
@Test
@@ -71,6 +74,7 @@ public class AggregatorsTest {
assertThat(sapply(MAX_DOUBLES(), 29.0, 17.0, 1729.0), is(1729.0));
assertThat(sapply(MAX_FLOATS(), 29f, 1745f, 17f, 1729f), is(1745.0f));
assertThat(sapply(MAX_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1729")));
+ assertThat(sapply(MAX_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("1729.876")));
assertThat(sapply(Aggregators.<String>MAX_COMPARABLES(), "b", "a", "d", "c"), is("d"));
}
@@ -82,6 +86,7 @@ public class AggregatorsTest {
assertThat(sapply(MIN_DOUBLES(), 29.0, 17.0, 1729.0), is(17.0));
assertThat(sapply(MIN_INTS(), 29, 170, 1729), is(29));
assertThat(sapply(MIN_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("17")));
+ assertThat(sapply(MIN_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("17.876")));
assertThat(sapply(Aggregators.<String>MIN_COMPARABLES(), "b", "a", "d", "c"), is("a"));
}
@@ -153,8 +158,14 @@ public class AggregatorsTest {
Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
Aggregator<Tuple3<Float, Double, Double>> a = Aggregators.tripAggregator(
MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES());
+
+ List<Tuple3<Float, BigDecimal, BigDecimal>> input1 = ImmutableList.of(Tuple3.of(17.29f, bigDecimal("12.2"), bigDecimal("0.1")),
+ Tuple3.of(3.0f, bigDecimal("1.2"), bigDecimal("3.14")), Tuple3.of(-1.0f, bigDecimal("14.5"), bigDecimal("-0.98")));
+ Aggregator<Tuple3<Float, BigDecimal, BigDecimal>> b = Aggregators.tripAggregator(
+ MAX_FLOATS(), MAX_BIGDECIMALS(), MIN_BIGDECIMALS());
assertThat(sapply(a, input), is(Tuple3.of(17.29f, 14.5, -0.98)));
+ assertThat(sapply(b, input1), is(Tuple3.of(17.29f, bigDecimal("14.5"), bigDecimal("-0.98"))));
}
@Test
@@ -163,8 +174,14 @@ public class AggregatorsTest {
Tuple4.of(3.0f, 1.2, 3.14, 2), Tuple4.of(-1.0f, 14.5, -0.98, 3));
Aggregator<Tuple4<Float, Double, Double, Integer>> a = Aggregators.quadAggregator(
MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS());
+
+ List<Tuple4<BigDecimal, Double, Double, Integer>> input1 = ImmutableList.of(Tuple4.of(bigDecimal("17.29"), 12.2, 0.1, 1),
+ Tuple4.of(bigDecimal("3.0"), 1.2, 3.14, 2), Tuple4.of(bigDecimal("-1.0"), 14.5, -0.98, 3));
+ Aggregator<Tuple4<BigDecimal, Double, Double, Integer>> b = Aggregators.quadAggregator(
+ MAX_BIGDECIMALS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS());
assertThat(sapply(a, input), is(Tuple4.of(17.29f, 14.5, -0.98, 6)));
+ assertThat(sapply(b, input1), is(Tuple4.of(bigDecimal("17.29"), 14.5, -0.98, 6)));
}
@Test
@@ -233,4 +250,8 @@ public class AggregatorsTest {
private static BigInteger bigInt(String value) {
return new BigInteger(value);
}
+
+ private static BigDecimal bigDecimal(String value) {
+ return new BigDecimal(value);
+ }
}