You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 17:34:51 UTC

[1/3] incubator-beam git commit: Closes #906

Repository: incubator-beam
Updated Branches:
  refs/heads/master 2c0bbfeec -> 3ef7a35b7


Closes #906


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ef7a35b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ef7a35b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ef7a35b

Branch: refs/heads/master
Commit: 3ef7a35b72e051d1a062720071726d502a78a9a3
Parents: 2c0bbfe d24d51f
Author: Dan Halperin <dh...@google.com>
Authored: Fri Sep 2 10:34:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 2 10:34:37 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Combine.java | 40 +++++++++++++++++---
 .../apache/beam/sdk/transforms/CombineTest.java | 19 ++++++++++
 2 files changed, 53 insertions(+), 6 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Delegate populateDipslayData to wrapped combineFn's

Posted by dh...@apache.org.
Delegate populateDipslayData to wrapped combineFn's


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d24d51fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d24d51fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d24d51fb

Branch: refs/heads/master
Commit: d24d51fb210f0dd7254f6ddd9608eb1b53f7300c
Parents: 6adaebf
Author: Scott Wegner <sw...@google.com>
Authored: Fri Aug 19 13:32:45 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 2 10:34:37 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Combine.java | 39 ++++++--------------
 1 file changed, 11 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d24d51fb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index d432e15..2b89372 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1740,7 +1740,7 @@ public class Combine {
   public static class PerKey<K, InputT, OutputT>
     extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
-    private final transient PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+    private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
     private final DisplayData.Item<? extends Class<?>> fnDisplayData;
     private final boolean fewKeys;
     private final List<PCollectionView<?>> sideInputs;
@@ -1820,8 +1820,8 @@ public class Combine {
             @Override
             public void populateDisplayData(Builder builder) {
               super.populateDisplayData(builder);
-              builder.addIfNotDefault(DisplayData.item("fanout", hotKeyFanout)
-                  .withLabel("Key Fanout Size"), 0);
+              builder.add(DisplayData.item("fanout", hotKeyFanout)
+                  .withLabel("Key Fanout Size"));
             }
 
             @Override
@@ -1866,7 +1866,7 @@ public class Combine {
   public static class PerKeyWithHotKeyFanout<K, InputT, OutputT>
       extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
-    private final transient PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+    private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
     private final DisplayData.Item<? extends Class<?>> fnDisplayData;
     private final SerializableFunction<? super K, Integer> hotKeyFanout;
 
@@ -1955,12 +1955,7 @@ public class Combine {
 
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                super.populateDisplayData(builder);
-                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
-                    .withLabel("Fanout Function"));
-                if (hotKeyFanout instanceof HasDisplayData) {
-                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
-                }
+                builder.include(PerKeyWithHotKeyFanout.this);
               }
             };
         postCombine =
@@ -2008,12 +2003,7 @@ public class Combine {
               }
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                super.populateDisplayData(builder);
-                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
-                    .withLabel("Fanout Function"));
-                if (hotKeyFanout instanceof HasDisplayData) {
-                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
-                }
+                builder.include(PerKeyWithHotKeyFanout.this);
               }
             };
       } else {
@@ -2057,12 +2047,7 @@ public class Combine {
               }
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                super.populateDisplayData(builder);
-                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
-                    .withLabel("Fanout Function"));
-                if (hotKeyFanout instanceof HasDisplayData) {
-                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
-                }
+                builder.include(PerKeyWithHotKeyFanout.this);
               }
             };
         postCombine =
@@ -2111,12 +2096,7 @@ public class Combine {
               }
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                super.populateDisplayData(builder);
-                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
-                    .withLabel("Fanout Function"));
-                if (hotKeyFanout instanceof HasDisplayData) {
-                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
-                }
+                builder.include(PerKeyWithHotKeyFanout.this);
               }
             };
       }
@@ -2200,6 +2180,9 @@ public class Combine {
       super.populateDisplayData(builder);
 
       Combine.populateDisplayData(builder, fn, fnDisplayData);
+      if (hotKeyFanout instanceof HasDisplayData) {
+        builder.include((HasDisplayData) hotKeyFanout);
+      }
       builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
         .withLabel("Fanout Function"));
     }


[3/3] incubator-beam git commit: Fixed Combine display data

Posted by dh...@apache.org.
Fixed Combine display data


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6adaebf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6adaebf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6adaebf7

Branch: refs/heads/master
Commit: 6adaebf76a6f6caef66dc60a56b94e2734689723
Parents: 2c0bbfe
Author: Ian Zhou <ia...@google.com>
Authored: Thu Aug 18 13:50:52 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 2 10:34:37 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Combine.java | 53 ++++++++++++++++++--
 .../apache/beam/sdk/transforms/CombineTest.java | 19 +++++++
 2 files changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6adaebf7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 26f0f66..d432e15 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -1815,7 +1816,14 @@ public class Combine {
      */
     public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) {
       return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData,
-          new SerializableFunction<K, Integer>() {
+          new SimpleFunction<K, Integer>() {
+            @Override
+            public void populateDisplayData(Builder builder) {
+              super.populateDisplayData(builder);
+              builder.addIfNotDefault(DisplayData.item("fanout", hotKeyFanout)
+                  .withLabel("Key Fanout Size"), 0);
+            }
+
             @Override
             public Integer apply(K unused) {
               return hotKeyFanout;
@@ -1904,7 +1912,7 @@ public class Combine {
           new InputOrAccum.InputOrAccumCoder<InputT, AccumT>(
               inputCoder.getValueCoder(), accumCoder);
 
-      // A CombineFn's mergeAccumulator can be applied in a tree-like fashon.
+      // A CombineFn's mergeAccumulator can be applied in a tree-like fashion.
       // Here we shard the key using an integer nonce, combine on that partial
       // set of values, then drop the nonce and do a final combine of the
       // aggregates.  We do this by splitting the original CombineFn into two,
@@ -1944,6 +1952,16 @@ public class Combine {
                   throws CannotProvideCoderException {
                 return accumCoder;
               }
+
+              @Override
+              public void populateDisplayData(DisplayData.Builder builder) {
+                super.populateDisplayData(builder);
+                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+                    .withLabel("Fanout Function"));
+                if (hotKeyFanout instanceof HasDisplayData) {
+                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+                }
+              }
             };
         postCombine =
             new KeyedCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@@ -1988,6 +2006,15 @@ public class Combine {
                       throws CannotProvideCoderException {
                 return accumCoder;
               }
+              @Override
+              public void populateDisplayData(DisplayData.Builder builder) {
+                super.populateDisplayData(builder);
+                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+                    .withLabel("Fanout Function"));
+                if (hotKeyFanout instanceof HasDisplayData) {
+                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+                }
+              }
             };
       } else {
         final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext =
@@ -2028,6 +2055,15 @@ public class Combine {
                   throws CannotProvideCoderException {
                 return accumCoder;
               }
+              @Override
+              public void populateDisplayData(DisplayData.Builder builder) {
+                super.populateDisplayData(builder);
+                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+                    .withLabel("Fanout Function"));
+                if (hotKeyFanout instanceof HasDisplayData) {
+                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+                }
+              }
             };
         postCombine =
             new KeyedCombineFnWithContext<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@@ -2073,6 +2109,15 @@ public class Combine {
                   throws CannotProvideCoderException {
                 return accumCoder;
               }
+              @Override
+              public void populateDisplayData(DisplayData.Builder builder) {
+                super.populateDisplayData(builder);
+                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+                    .withLabel("Fanout Function"));
+                if (hotKeyFanout instanceof HasDisplayData) {
+                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+                }
+              }
             };
       }
 
@@ -2117,7 +2162,7 @@ public class Combine {
           .setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()),
                                inputCoder.getValueCoder()))
           .setWindowingStrategyInternal(preCombineStrategy)
-          .apply("PreCombineHot", Combine.perKey(hotPreCombine))
+          .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData))
           .apply("StripNonce", MapElements.via(
               new SimpleFunction<KV<KV<K, Integer>, AccumT>,
                        KV<K, InputOrAccum<InputT, AccumT>>>() {
@@ -2147,7 +2192,7 @@ public class Combine {
       // Combine the union of the pre-processed hot and cold key results.
       return PCollectionList.of(precombinedHot).and(preprocessedCold)
           .apply(Flatten.<KV<K, InputOrAccum<InputT, AccumT>>>pCollections())
-          .apply("PostCombine", Combine.perKey(postCombine));
+          .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6adaebf7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 77a1d6b..be061af 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -731,6 +731,25 @@ public class CombineTest implements Serializable {
         displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
   }
 
+  @Test
+  @Category(RunnableOnService.class)
+  public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() {
+    int hotKeyFanout = 2;
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+    CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts();
+    PTransform<PCollection<KV<Integer, Integer>>, PCollection<KV<Integer, Set<Integer>>>> combine =
+        Combine.<Integer, Integer, Set<Integer>>perKey(combineFn).withHotKeyFanout(hotKeyFanout);
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(combine,
+        KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+
+    assertThat("Combine.perKey.withHotKeyFanout should include the combineFn in its primitive "
+        + "transform", displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
+    assertThat("Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive "
+        + "transform", displayData, hasItem(hasDisplayItem("fanout", hotKeyFanout)));
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   // Test classes, for different kinds of combining fns.