You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/13 18:20:31 UTC

[1/2] incubator-beam git commit: [BEAM-285] Avoid boxing and unboxing in transforms Min and Max

Repository: incubator-beam
Updated Branches:
  refs/heads/master 1a7cd4112 -> 8f5ce28dc


[BEAM-285] Avoid boxing and unboxing in transforms Min and Max


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/21a5b44c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/21a5b44c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/21a5b44c

Branch: refs/heads/master
Commit: 21a5b44c3b541ba6c89df5649afe00412df73d10
Parents: 1a7cd41
Author: Pei He <pe...@google.com>
Authored: Mon Jul 11 20:31:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jul 13 11:20:18 2016 -0700

----------------------------------------------------------------------
 examples/pom.xml                                |  2 +-
 .../org/apache/beam/sdk/transforms/Max.java     | 42 ++++++++++++------
 .../org/apache/beam/sdk/transforms/Min.java     | 45 ++++++++++++++------
 .../beam/sdk/util/state/StateTagTest.java       |  5 +--
 4 files changed, 66 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/21a5b44c/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index dbf89e4..ee12b69 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -48,4 +48,4 @@
       </profile>
   </profiles>
 
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/21a5b44c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index c05bd17..52617b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -218,10 +218,16 @@ public class Max {
    * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MaxIntegerFn extends MaxFn<Integer> implements
-      CounterProvider<Integer> {
-    public MaxIntegerFn() {
-      super(Integer.MIN_VALUE, new Top.Largest<Integer>());
+  public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn
+      implements CounterProvider<Integer> {
+    @Override
+    public int apply(int left, int right) {
+      return left >= right ? left : right;
+    }
+
+    @Override
+    public int identity() {
+      return Integer.MIN_VALUE;
     }
 
     @Override
@@ -234,10 +240,16 @@ public class Max {
    * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MaxLongFn extends MaxFn<Long> implements
-      CounterProvider<Long> {
-    public MaxLongFn() {
-      super(Long.MIN_VALUE, new Top.Largest<Long>());
+  public static class MaxLongFn extends Combine.BinaryCombineLongFn
+      implements CounterProvider<Long> {
+    @Override
+    public long apply(long left, long right) {
+      return left >= right ? left : right;
+    }
+
+    @Override
+    public long identity() {
+      return Long.MIN_VALUE;
     }
 
     @Override
@@ -250,10 +262,16 @@ public class Max {
    * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MaxDoubleFn extends MaxFn<Double> implements
-      CounterProvider<Double> {
-    public MaxDoubleFn() {
-      super(Double.NEGATIVE_INFINITY, new Top.Largest<Double>());
+  public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn
+      implements CounterProvider<Double> {
+    @Override
+    public double apply(double left, double right) {
+      return left >= right ? left : right;
+    }
+
+    @Override
+    public double identity() {
+      return Double.NEGATIVE_INFINITY;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/21a5b44c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 7a6a8a2..3159134 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -218,10 +218,17 @@ public class Min {
    * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MinIntegerFn extends MinFn<Integer> implements
-      CounterProvider<Integer> {
-    public MinIntegerFn() {
-      super(Integer.MAX_VALUE, new Top.Largest<Integer>());
+  public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn
+      implements CounterProvider<Integer> {
+
+    @Override
+    public int apply(int left, int right) {
+      return left <= right ? left : right;
+    }
+
+    @Override
+    public int identity() {
+      return Integer.MAX_VALUE;
     }
 
     @Override
@@ -234,10 +241,17 @@ public class Min {
    * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MinLongFn extends MinFn<Long> implements
-      CounterProvider<Long> {
-    public MinLongFn() {
-      super(Long.MAX_VALUE, new Top.Largest<Long>());
+  public static class MinLongFn extends Combine.BinaryCombineLongFn
+      implements CounterProvider<Long> {
+
+    @Override
+    public long apply(long left, long right) {
+      return left <= right ? left : right;
+    }
+
+    @Override
+    public long identity() {
+      return Long.MAX_VALUE;
     }
 
     @Override
@@ -250,10 +264,17 @@ public class Min {
    * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MinDoubleFn extends MinFn<Double> implements
-      CounterProvider<Double> {
-    public MinDoubleFn() {
-      super(Double.POSITIVE_INFINITY, new Top.Largest<Double>());
+  public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn
+      implements CounterProvider<Double> {
+
+    @Override
+    public double apply(double left, double right) {
+      return left <= right ? left : right;
+    }
+
+    @Override
+    public double identity() {
+      return Double.POSITIVE_INFINITY;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/21a5b44c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
index df67709..ec7698d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateTagTest.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Combine.Holder;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
 import org.apache.beam.sdk.transforms.Min;
@@ -134,8 +133,8 @@ public class StateTagTest {
     MaxIntegerFn maxFn = new Max.MaxIntegerFn();
     MinIntegerFn minFn = new Min.MinIntegerFn();
 
-    Coder<Holder<Integer>> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
-    Coder<Holder<Integer>> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
+    Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
+    Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
 
     StateTag<?, ?> fooCoder1Max1 = StateTags.keyedCombiningValueWithContext(
             "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).<String>asKeyedFn());


[2/2] incubator-beam git commit: Closes #552

Posted by dh...@apache.org.
Closes #552


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8f5ce28d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8f5ce28d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8f5ce28d

Branch: refs/heads/master
Commit: 8f5ce28dcb82a9f96185f783aaf5dca332475b78
Parents: 1a7cd41 21a5b44
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jul 13 11:20:19 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jul 13 11:20:19 2016 -0700

----------------------------------------------------------------------
 examples/pom.xml                                |  2 +-
 .../org/apache/beam/sdk/transforms/Max.java     | 42 ++++++++++++------
 .../org/apache/beam/sdk/transforms/Min.java     | 45 ++++++++++++++------
 .../beam/sdk/util/state/StateTagTest.java       |  5 +--
 4 files changed, 66 insertions(+), 28 deletions(-)
----------------------------------------------------------------------