You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2021/03/12 01:47:03 UTC
[beam] branch master updated: [BEAM-11887] Change
SortingFlinkCombineRunner usage to only for Sessions (#14120)
This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2da7448 [BEAM-11887] Change SortingFlinkCombineRunner usage to only for Sessions (#14120)
2da7448 is described below
commit 2da7448e1a4be2870eeb0e994937beb7e4d4ad1c
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Thu Mar 11 17:46:00 2021 -0800
[BEAM-11887] Change SortingFlinkCombineRunner usage to only for Sessions (#14120)
---
.../functions/FlinkMergingNonShuffleReduceFunction.java | 4 ++--
.../flink/translation/functions/FlinkPartialReduceFunction.java | 9 ++++-----
.../runners/flink/translation/functions/FlinkReduceFunction.java | 9 ++++-----
3 files changed, 10 insertions(+), 12 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index b34649f..b1b95c6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
@@ -83,7 +83,7 @@ public class FlinkMergingNonShuffleReduceFunction<
new FlinkSideInputReader(sideInputs, getRuntimeContext());
AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> reduceRunner;
- if (windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+ if (windowingStrategy.getWindowFn() instanceof Sessions) {
reduceRunner = new SortingFlinkCombineRunner<>();
} else {
reduceRunner = new HashingFlinkCombineRunner<>();
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index f98b9df..7b0f5d5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
@@ -98,11 +98,10 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
if (groupedByWindow) {
reduceRunner = new SingleWindowFlinkCombineRunner<>();
} else {
- if (windowingStrategy.needsMerge()
- && !windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
- reduceRunner = new HashingFlinkCombineRunner<>();
- } else {
+ if (windowingStrategy.needsMerge() && windowingStrategy.getWindowFn() instanceof Sessions) {
reduceRunner = new SortingFlinkCombineRunner<>();
+ } else {
+ reduceRunner = new HashingFlinkCombineRunner<>();
}
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 80ce7ef..1399869 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
@@ -98,11 +98,10 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
if (groupedByWindow) {
reduceRunner = new SingleWindowFlinkCombineRunner<>();
} else {
- if (windowingStrategy.needsMerge()
- && !windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
- reduceRunner = new HashingFlinkCombineRunner<>();
- } else {
+ if (windowingStrategy.needsMerge() && windowingStrategy.getWindowFn() instanceof Sessions) {
reduceRunner = new SortingFlinkCombineRunner<>();
+ } else {
+ reduceRunner = new HashingFlinkCombineRunner<>();
}
}