You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/12/13 22:31:08 UTC

(beam) branch master updated: Add support to use side inputs with Combine.PerKeyWithHotKeyFanout (#28867)

This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a9f5ab14d06 Add support to use side inputs with Combine.PerKeyWithHotKeyFanout (#28867)
a9f5ab14d06 is described below

commit a9f5ab14d0689568c6822f93bec0c2ca7658cb57
Author: Marc <53...@users.noreply.github.com>
AuthorDate: Wed Dec 13 23:31:01 2023 +0100

    Add support to use side inputs with Combine.PerKeyWithHotKeyFanout (#28867)
---
 .../org/apache/beam/sdk/transforms/Combine.java    | 24 ++++++++++++++++++----
 .../apache/beam/sdk/transforms/CombineTest.java    | 24 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index ffbfac460dc..f1a964fa5a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1555,7 +1555,7 @@ public class Combine {
      */
     public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(
         SerializableFunction<? super K, Integer> hotKeyFanout) {
-      return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData, hotKeyFanout, fewKeys);
+      return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData, hotKeyFanout, fewKeys, sideInputs);
     }
 
     /**
@@ -1578,7 +1578,8 @@ public class Combine {
               return hotKeyFanout;
             }
           },
-          fewKeys);
+          fewKeys,
+          sideInputs);
     }
 
     /** Returns the {@link GlobalCombineFn} used by this Combine operation. */
@@ -1624,18 +1625,20 @@ public class Combine {
     private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final SerializableFunction<? super K, Integer> hotKeyFanout;
-
     private final boolean fewKeys;
+    private final List<PCollectionView<?>> sideInputs;
 
     private PerKeyWithHotKeyFanout(
         GlobalCombineFn<? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         SerializableFunction<? super K, Integer> hotKeyFanout,
-        boolean fewKeys) {
+        boolean fewKeys,
+        List<PCollectionView<?>> sideInputs) {
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.hotKeyFanout = hotKeyFanout;
       this.fewKeys = fewKeys;
+      this.sideInputs = sideInputs;
     }
 
     @Override
@@ -1928,6 +1931,10 @@ public class Combine {
           fewKeys
               ? Combine.fewKeys(hotPreCombine, fnDisplayData)
               : Combine.perKey(hotPreCombine, fnDisplayData);
+      if (!sideInputs.isEmpty()) {
+        hotPreCombineTransform = hotPreCombineTransform.withSideInputs(sideInputs);
+      }
+
       PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot =
           split
               .get(hot)
@@ -1975,6 +1982,10 @@ public class Combine {
           fewKeys
               ? Combine.fewKeys(postCombine, fnDisplayData)
               : Combine.perKey(postCombine, fnDisplayData);
+      if (!sideInputs.isEmpty()) {
+        postCombineTransform = postCombineTransform.withSideInputs(sideInputs);
+      }
+
       return PCollectionList.of(precombinedHot)
           .and(preprocessedCold)
           .apply(Flatten.pCollections())
@@ -1993,6 +2004,11 @@ public class Combine {
           DisplayData.item("fanoutFn", hotKeyFanout.getClass()).withLabel("Fanout Function"));
     }
 
+    /** Returns the side inputs used by this Combine operation. */
+    public List<PCollectionView<?>> getSideInputs() {
+      return sideInputs;
+    }
+
     /**
      * Used to store either an input or accumulator value, for flattening the hot and cold key
      * paths.
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 024fedd177e..f070378a64e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -1031,6 +1031,30 @@ public class CombineTest implements Serializable {
 
       assertEquals(Collections.singletonList(view), combine.getSideInputs());
     }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
+    public void testHotKeyCombineWithSideInputs() {
+      PCollection<KV<String, Integer>> input =
+          createInput(
+              pipeline,
+              Arrays.asList(
+                  KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)));
+      PCollection<Integer> sum =
+          input.apply(Values.create()).apply("Sum", Combine.globally(new SumInts()));
+      PCollectionView<Integer> sumView = sum.apply(View.asSingleton());
+
+      PCollection<KV<String, String>> combinePerKeyWithSideInputsAndHotKey =
+          input.apply(
+              Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(sumView))
+                  .withSideInputs(sumView)
+                  .withHotKeyFanout(1));
+
+      PAssert.that(combinePerKeyWithSideInputsAndHotKey)
+          .containsInAnyOrder(Arrays.asList(KV.of("a", "20:114"), KV.of("b", "20:113")));
+
+      pipeline.run();
+    }
   }
 
   /** Tests validating windowing behaviors. */