You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 17:34:53 UTC
[3/3] incubator-beam git commit: Fixed Combine display data
Fixed Combine display data
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6adaebf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6adaebf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6adaebf7
Branch: refs/heads/master
Commit: 6adaebf76a6f6caef66dc60a56b94e2734689723
Parents: 2c0bbfe
Author: Ian Zhou <ia...@google.com>
Authored: Thu Aug 18 13:50:52 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 2 10:34:37 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Combine.java | 53 ++++++++++++++++++--
.../apache/beam/sdk/transforms/CombineTest.java | 19 +++++++
2 files changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6adaebf7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 26f0f66..d432e15 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -1815,7 +1816,14 @@ public class Combine {
*/
public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) {
return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData,
- new SerializableFunction<K, Integer>() {
+ new SimpleFunction<K, Integer>() {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ builder.addIfNotDefault(DisplayData.item("fanout", hotKeyFanout)
+ .withLabel("Key Fanout Size"), 0);
+ }
+
@Override
public Integer apply(K unused) {
return hotKeyFanout;
@@ -1904,7 +1912,7 @@ public class Combine {
new InputOrAccum.InputOrAccumCoder<InputT, AccumT>(
inputCoder.getValueCoder(), accumCoder);
- // A CombineFn's mergeAccumulator can be applied in a tree-like fashon.
+ // A CombineFn's mergeAccumulator can be applied in a tree-like fashion.
// Here we shard the key using an integer nonce, combine on that partial
// set of values, then drop the nonce and do a final combine of the
// aggregates. We do this by splitting the original CombineFn into two,
@@ -1944,6 +1952,16 @@ public class Combine {
throws CannotProvideCoderException {
return accumCoder;
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+ .withLabel("Fanout Function"));
+ if (hotKeyFanout instanceof HasDisplayData) {
+ ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+ }
+ }
};
postCombine =
new KeyedCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@@ -1988,6 +2006,15 @@ public class Combine {
throws CannotProvideCoderException {
return accumCoder;
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+ .withLabel("Fanout Function"));
+ if (hotKeyFanout instanceof HasDisplayData) {
+ ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+ }
+ }
};
} else {
final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext =
@@ -2028,6 +2055,15 @@ public class Combine {
throws CannotProvideCoderException {
return accumCoder;
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+ .withLabel("Fanout Function"));
+ if (hotKeyFanout instanceof HasDisplayData) {
+ ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+ }
+ }
};
postCombine =
new KeyedCombineFnWithContext<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@@ -2073,6 +2109,15 @@ public class Combine {
throws CannotProvideCoderException {
return accumCoder;
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
+ .withLabel("Fanout Function"));
+ if (hotKeyFanout instanceof HasDisplayData) {
+ ((HasDisplayData) hotKeyFanout).populateDisplayData(builder);
+ }
+ }
};
}
@@ -2117,7 +2162,7 @@ public class Combine {
.setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()),
inputCoder.getValueCoder()))
.setWindowingStrategyInternal(preCombineStrategy)
- .apply("PreCombineHot", Combine.perKey(hotPreCombine))
+ .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData))
.apply("StripNonce", MapElements.via(
new SimpleFunction<KV<KV<K, Integer>, AccumT>,
KV<K, InputOrAccum<InputT, AccumT>>>() {
@@ -2147,7 +2192,7 @@ public class Combine {
// Combine the union of the pre-processed hot and cold key results.
return PCollectionList.of(precombinedHot).and(preprocessedCold)
.apply(Flatten.<KV<K, InputOrAccum<InputT, AccumT>>>pCollections())
- .apply("PostCombine", Combine.perKey(postCombine));
+ .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6adaebf7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 77a1d6b..be061af 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -731,6 +731,25 @@ public class CombineTest implements Serializable {
displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
}
+ @Test
+ @Category(RunnableOnService.class)
+ public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() {
+ int hotKeyFanout = 2;
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+ CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts();
+ PTransform<PCollection<KV<Integer, Integer>>, PCollection<KV<Integer, Set<Integer>>>> combine =
+ Combine.<Integer, Integer, Set<Integer>>perKey(combineFn).withHotKeyFanout(hotKeyFanout);
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(combine,
+ KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+
+ assertThat("Combine.perKey.withHotKeyFanout should include the combineFn in its primitive "
+ + "transform", displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
+ assertThat("Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive "
+ + "transform", displayData, hasItem(hasDisplayItem("fanout", hotKeyFanout)));
+ }
+
////////////////////////////////////////////////////////////////////////////
// Test classes, for different kinds of combining fns.