You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/12/13 22:31:08 UTC
(beam) branch master updated: Add support to use side inputs with Combine.PerKeyWithHotKeyFanout (#28867)
This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a9f5ab14d06 Add support to use side inputs with Combine.PerKeyWithHotKeyFanout (#28867)
a9f5ab14d06 is described below
commit a9f5ab14d0689568c6822f93bec0c2ca7658cb57
Author: Marc <53...@users.noreply.github.com>
AuthorDate: Wed Dec 13 23:31:01 2023 +0100
Add support to use side inputs with Combine.PerKeyWithHotKeyFanout (#28867)
---
.../org/apache/beam/sdk/transforms/Combine.java | 24 ++++++++++++++++++----
.../apache/beam/sdk/transforms/CombineTest.java | 24 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index ffbfac460dc..f1a964fa5a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1555,7 +1555,7 @@ public class Combine {
*/
public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(
SerializableFunction<? super K, Integer> hotKeyFanout) {
- return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData, hotKeyFanout, fewKeys);
+ return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData, hotKeyFanout, fewKeys, sideInputs);
}
/**
@@ -1578,7 +1578,8 @@ public class Combine {
return hotKeyFanout;
}
},
- fewKeys);
+ fewKeys,
+ sideInputs);
}
/** Returns the {@link GlobalCombineFn} used by this Combine operation. */
@@ -1624,18 +1625,20 @@ public class Combine {
private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final SerializableFunction<? super K, Integer> hotKeyFanout;
-
private final boolean fewKeys;
+ private final List<PCollectionView<?>> sideInputs;
private PerKeyWithHotKeyFanout(
GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
SerializableFunction<? super K, Integer> hotKeyFanout,
- boolean fewKeys) {
+ boolean fewKeys,
+ List<PCollectionView<?>> sideInputs) {
this.fn = fn;
this.fnDisplayData = fnDisplayData;
this.hotKeyFanout = hotKeyFanout;
this.fewKeys = fewKeys;
+ this.sideInputs = sideInputs;
}
@Override
@@ -1928,6 +1931,10 @@ public class Combine {
fewKeys
? Combine.fewKeys(hotPreCombine, fnDisplayData)
: Combine.perKey(hotPreCombine, fnDisplayData);
+ if (!sideInputs.isEmpty()) {
+ hotPreCombineTransform = hotPreCombineTransform.withSideInputs(sideInputs);
+ }
+
PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot =
split
.get(hot)
@@ -1975,6 +1982,10 @@ public class Combine {
fewKeys
? Combine.fewKeys(postCombine, fnDisplayData)
: Combine.perKey(postCombine, fnDisplayData);
+ if (!sideInputs.isEmpty()) {
+ postCombineTransform = postCombineTransform.withSideInputs(sideInputs);
+ }
+
return PCollectionList.of(precombinedHot)
.and(preprocessedCold)
.apply(Flatten.pCollections())
@@ -1993,6 +2004,11 @@ public class Combine {
DisplayData.item("fanoutFn", hotKeyFanout.getClass()).withLabel("Fanout Function"));
}
+ /** Returns the side inputs used by this Combine operation. */
+ public List<PCollectionView<?>> getSideInputs() {
+ return sideInputs;
+ }
+
/**
* Used to store either an input or accumulator value, for flattening the hot and cold key
* paths.
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 024fedd177e..f070378a64e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -1031,6 +1031,30 @@ public class CombineTest implements Serializable {
assertEquals(Collections.singletonList(view), combine.getSideInputs());
}
+
+ @Test
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
+ public void testHotKeyCombineWithSideInputs() {
+ PCollection<KV<String, Integer>> input =
+ createInput(
+ pipeline,
+ Arrays.asList(
+ KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)));
+ PCollection<Integer> sum =
+ input.apply(Values.create()).apply("Sum", Combine.globally(new SumInts()));
+ PCollectionView<Integer> sumView = sum.apply(View.asSingleton());
+
+ PCollection<KV<String, String>> combinePerKeyWithSideInputsAndHotKey =
+ input.apply(
+ Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(sumView))
+ .withSideInputs(sumView)
+ .withHotKeyFanout(1));
+
+ PAssert.that(combinePerKeyWithSideInputsAndHotKey)
+ .containsInAnyOrder(Arrays.asList(KV.of("a", "20:114"), KV.of("b", "20:113")));
+
+ pipeline.run();
+ }
}
/** Tests validating windowing behaviors. */