You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:52 UTC

[21/50] [abbrv] incubator-beam git commit: Fixed Combine display data

Fixed Combine display data


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9943fd7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9943fd7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9943fd7d

Branch: refs/heads/gearpump-runner
Commit: 9943fd7d47819d522cef248d23c8db8f42981ad3
Parents: f44fa2c
Author: Ian Zhou <ia...@google.com>
Authored: Thu Aug 18 13:50:52 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:12 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Combine.java | 53 ++++++++++++++++++--
 .../apache/beam/sdk/transforms/CombineTest.java | 19 +++++++
 2 files changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9943fd7d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 26f0f66..d432e15 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -1815,7 +1816,14 @@ public class Combine {
      */
     public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) {
       return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData,
-          new SerializableFunction<K, Integer>() {
+          new SimpleFunction<K, Integer>() {
+            @Override
+            public void populateDisplayData(Builder builder) {
+              super.populateDisplayData(builder);
+              builder.addIfNotDefault(DisplayData.item("fanout", hotKeyFanout)
+                  .withLabel("Key Fanout Size"), 0);
+            }
+
             @Override
             public Integer apply(K unused) {
               return hotKeyFanout;
@@ -1904,7 +1912,7 @@ public class Combine {
           new InputOrAccum.InputOrAccumCoder<InputT, AccumT>(
               inputCoder.getValueCoder(), accumCoder);
 
-      // A CombineFn's mergeAccumulator can be applied in a tree-like fashon.
+      // A CombineFn's mergeAccumulator can be applied in a tree-like fashion.
       // Here we shard the key using an integer nonce, combine on that partial
       // set of values, then drop the nonce and do a final combine of the
       // aggregates.  We do this by splitting the original CombineFn into two,
@@ -1944,6 +1952,16 @@ public class Combine {
                   throws CannotProvideCoderException {
                 return accumCoder;
               }
+
+              @Override
+              public void populateDisplayData(DisplayData.Builder builder) {
+                super.populateDisplayData(builder);
+                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+                    .withLabel("Fanout Function"));
+                if (hotKeyFanout instanceof HasDisplayData) {
+                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+                }
+              }
             };
         postCombine =
             new KeyedCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@@ -1988,6 +2006,15 @@ public class Combine {
                       throws CannotProvideCoderException {
                 return accumCoder;
               }
+              @Override
+              public void populateDisplayData(DisplayData.Builder builder) {
+                super.populateDisplayData(builder);
+                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+                    .withLabel("Fanout Function"));
+                if (hotKeyFanout instanceof HasDisplayData) {
+                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+                }
+              }
             };
       } else {
         final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext =
@@ -2028,6 +2055,15 @@ public class Combine {
                   throws CannotProvideCoderException {
                 return accumCoder;
               }
+              @Override
+              public void populateDisplayData(DisplayData.Builder builder) {
+                super.populateDisplayData(builder);
+                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+                    .withLabel("Fanout Function"));
+                if (hotKeyFanout instanceof HasDisplayData) {
+                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+                }
+              }
             };
         postCombine =
             new KeyedCombineFnWithContext<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@@ -2073,6 +2109,15 @@ public class Combine {
                   throws CannotProvideCoderException {
                 return accumCoder;
               }
+              @Override
+              public void populateDisplayData(DisplayData.Builder builder) {
+                super.populateDisplayData(builder);
+                builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+                    .withLabel("Fanout Function"));
+                if (hotKeyFanout instanceof HasDisplayData) {
+                  ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+                }
+              }
             };
       }
 
@@ -2117,7 +2162,7 @@ public class Combine {
           .setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()),
                                inputCoder.getValueCoder()))
           .setWindowingStrategyInternal(preCombineStrategy)
-          .apply("PreCombineHot", Combine.perKey(hotPreCombine))
+          .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData))
           .apply("StripNonce", MapElements.via(
               new SimpleFunction<KV<KV<K, Integer>, AccumT>,
                        KV<K, InputOrAccum<InputT, AccumT>>>() {
@@ -2147,7 +2192,7 @@ public class Combine {
       // Combine the union of the pre-processed hot and cold key results.
       return PCollectionList.of(precombinedHot).and(preprocessedCold)
           .apply(Flatten.<KV<K, InputOrAccum<InputT, AccumT>>>pCollections())
-          .apply("PostCombine", Combine.perKey(postCombine));
+          .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9943fd7d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 77a1d6b..be061af 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -731,6 +731,25 @@ public class CombineTest implements Serializable {
         displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
   }
 
+  @Test
+  @Category(RunnableOnService.class)
+  public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() {
+    int hotKeyFanout = 2;
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+    CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts();
+    PTransform<PCollection<KV<Integer, Integer>>, PCollection<KV<Integer, Set<Integer>>>> combine =
+        Combine.<Integer, Integer, Set<Integer>>perKey(combineFn).withHotKeyFanout(hotKeyFanout);
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(combine,
+        KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+
+    assertThat("Combine.perKey.withHotKeyFanout should include the combineFn in its primitive "
+        + "transform", displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
+    assertThat("Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive "
+        + "transform", displayData, hasItem(hasDisplayItem("fanout", hotKeyFanout)));
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   // Test classes, for different kinds of combining fns.