You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/13 18:20:31 UTC
[1/2] incubator-beam git commit: [BEAM-285] Avoid boxing and unboxing
in transforms Min and Max
Repository: incubator-beam
Updated Branches:
refs/heads/master 1a7cd4112 -> 8f5ce28dc
[BEAM-285] Avoid boxing and unboxing in transforms Min and Max
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/21a5b44c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/21a5b44c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/21a5b44c
Branch: refs/heads/master
Commit: 21a5b44c3b541ba6c89df5649afe00412df73d10
Parents: 1a7cd41
Author: Pei He <pe...@google.com>
Authored: Mon Jul 11 20:31:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jul 13 11:20:18 2016 -0700
----------------------------------------------------------------------
examples/pom.xml | 2 +-
.../org/apache/beam/sdk/transforms/Max.java | 42 ++++++++++++------
.../org/apache/beam/sdk/transforms/Min.java | 45 ++++++++++++++------
.../beam/sdk/util/state/StateTagTest.java | 5 +--
4 files changed, 66 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/21a5b44c/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index dbf89e4..ee12b69 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -48,4 +48,4 @@
</profile>
</profiles>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/21a5b44c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index c05bd17..52617b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -218,10 +218,16 @@ public class Max {
* A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MaxIntegerFn extends MaxFn<Integer> implements
- CounterProvider<Integer> {
- public MaxIntegerFn() {
- super(Integer.MIN_VALUE, new Top.Largest<Integer>());
+ public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn
+ implements CounterProvider<Integer> {
+ @Override
+ public int apply(int left, int right) {
+ return left >= right ? left : right;
+ }
+
+ @Override
+ public int identity() {
+ return Integer.MIN_VALUE;
}
@Override
@@ -234,10 +240,16 @@ public class Max {
* A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MaxLongFn extends MaxFn<Long> implements
- CounterProvider<Long> {
- public MaxLongFn() {
- super(Long.MIN_VALUE, new Top.Largest<Long>());
+ public static class MaxLongFn extends Combine.BinaryCombineLongFn
+ implements CounterProvider<Long> {
+ @Override
+ public long apply(long left, long right) {
+ return left >= right ? left : right;
+ }
+
+ @Override
+ public long identity() {
+ return Long.MIN_VALUE;
}
@Override
@@ -250,10 +262,16 @@ public class Max {
* A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MaxDoubleFn extends MaxFn<Double> implements
- CounterProvider<Double> {
- public MaxDoubleFn() {
- super(Double.NEGATIVE_INFINITY, new Top.Largest<Double>());
+ public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn
+ implements CounterProvider<Double> {
+ @Override
+ public double apply(double left, double right) {
+ return left >= right ? left : right;
+ }
+
+ @Override
+ public double identity() {
+ return Double.NEGATIVE_INFINITY;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/21a5b44c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 7a6a8a2..3159134 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -218,10 +218,17 @@ public class Min {
* A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MinIntegerFn extends MinFn<Integer> implements
- CounterProvider<Integer> {
- public MinIntegerFn() {
- super(Integer.MAX_VALUE, new Top.Largest<Integer>());
+ public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn
+ implements CounterProvider<Integer> {
+
+ @Override
+ public int apply(int left, int right) {
+ return left <= right ? left : right;
+ }
+
+ @Override
+ public int identity() {
+ return Integer.MAX_VALUE;
}
@Override
@@ -234,10 +241,17 @@ public class Min {
* A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MinLongFn extends MinFn<Long> implements
- CounterProvider<Long> {
- public MinLongFn() {
- super(Long.MAX_VALUE, new Top.Largest<Long>());
+ public static class MinLongFn extends Combine.BinaryCombineLongFn
+ implements CounterProvider<Long> {
+
+ @Override
+ public long apply(long left, long right) {
+ return left <= right ? left : right;
+ }
+
+ @Override
+ public long identity() {
+ return Long.MAX_VALUE;
}
@Override
@@ -250,10 +264,17 @@ public class Min {
* A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MinDoubleFn extends MinFn<Double> implements
- CounterProvider<Double> {
- public MinDoubleFn() {
- super(Double.POSITIVE_INFINITY, new Top.Largest<Double>());
+ public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn
+ implements CounterProvider<Double> {
+
+ @Override
+ public double apply(double left, double right) {
+ return left <= right ? left : right;
+ }
+
+ @Override
+ public double identity() {
+ return Double.POSITIVE_INFINITY;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/21a5b44c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
index df67709..ec7698d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Combine.Holder;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
import org.apache.beam.sdk.transforms.Min;
@@ -134,8 +133,8 @@ public class StateTagTest {
MaxIntegerFn maxFn = new Max.MaxIntegerFn();
MinIntegerFn minFn = new Min.MinIntegerFn();
- Coder<Holder<Integer>> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
- Coder<Holder<Integer>> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
+ Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
+ Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
StateTag<?, ?> fooCoder1Max1 = StateTags.keyedCombiningValueWithContext(
"foo", accum1, CombineFnUtil.toFnWithContext(maxFn).<String>asKeyedFn());
[2/2] incubator-beam git commit: Closes #552
Posted by dh...@apache.org.
Closes #552
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8f5ce28d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8f5ce28d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8f5ce28d
Branch: refs/heads/master
Commit: 8f5ce28dcb82a9f96185f783aaf5dca332475b78
Parents: 1a7cd41 21a5b44
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jul 13 11:20:19 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jul 13 11:20:19 2016 -0700
----------------------------------------------------------------------
examples/pom.xml | 2 +-
.../org/apache/beam/sdk/transforms/Max.java | 42 ++++++++++++------
.../org/apache/beam/sdk/transforms/Min.java | 45 ++++++++++++++------
.../beam/sdk/util/state/StateTagTest.java | 5 +--
4 files changed, 66 insertions(+), 28 deletions(-)
----------------------------------------------------------------------