You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2021/03/12 01:47:03 UTC

[beam] branch master updated: [BEAM-11887] Change SortingFlinkCombineRunner usage to only for Sessions (#14120)

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2da7448  [BEAM-11887] Change SortingFlinkCombineRunner usage to only for Sessions (#14120)
2da7448 is described below

commit 2da7448e1a4be2870eeb0e994937beb7e4d4ad1c
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Thu Mar 11 17:46:00 2021 -0800

    [BEAM-11887] Change SortingFlinkCombineRunner usage to only for Sessions (#14120)
---
 .../functions/FlinkMergingNonShuffleReduceFunction.java          | 4 ++--
 .../flink/translation/functions/FlinkPartialReduceFunction.java  | 9 ++++-----
 .../runners/flink/translation/functions/FlinkReduceFunction.java | 9 ++++-----
 3 files changed, 10 insertions(+), 12 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index b34649f..b1b95c6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -83,7 +83,7 @@ public class FlinkMergingNonShuffleReduceFunction<
         new FlinkSideInputReader(sideInputs, getRuntimeContext());
 
     AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> reduceRunner;
-    if (windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+    if (windowingStrategy.getWindowFn() instanceof Sessions) {
       reduceRunner = new SortingFlinkCombineRunner<>();
     } else {
       reduceRunner = new HashingFlinkCombineRunner<>();
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index f98b9df..7b0f5d5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -98,11 +98,10 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
     if (groupedByWindow) {
       reduceRunner = new SingleWindowFlinkCombineRunner<>();
     } else {
-      if (windowingStrategy.needsMerge()
-          && !windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
-        reduceRunner = new HashingFlinkCombineRunner<>();
-      } else {
+      if (windowingStrategy.needsMerge() && windowingStrategy.getWindowFn() instanceof Sessions) {
         reduceRunner = new SortingFlinkCombineRunner<>();
+      } else {
+        reduceRunner = new HashingFlinkCombineRunner<>();
       }
     }
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 80ce7ef..1399869 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -98,11 +98,10 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
     if (groupedByWindow) {
       reduceRunner = new SingleWindowFlinkCombineRunner<>();
     } else {
-      if (windowingStrategy.needsMerge()
-          && !windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
-        reduceRunner = new HashingFlinkCombineRunner<>();
-      } else {
+      if (windowingStrategy.needsMerge() && windowingStrategy.getWindowFn() instanceof Sessions) {
         reduceRunner = new SortingFlinkCombineRunner<>();
+      } else {
+        reduceRunner = new HashingFlinkCombineRunner<>();
       }
     }