You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/03/22 14:11:56 UTC

[flink] branch release-1.13 updated: [FLINK-26536][python] Fix RemoteKeyedStateBackend#merge_namespaces to handle properly for the timerservice

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 9abd874  [FLINK-26536][python] Fix RemoteKeyedStateBackend#merge_namespaces to handle properly for the timerservice
9abd874 is described below

commit 9abd874ee1c1d9ef9294a23b67d5ab7b4e956008
Author: Juntao Hu <ma...@gmail.com>
AuthorDate: Tue Mar 8 17:18:57 2022 +0800

    [FLINK-26536][python] Fix RemoteKeyedStateBackend#merge_namespaces to handle properly for the timerservice
    
    This closes #19173.
    
    (cherry picked from commit a54c11040dd0103ffce5146a09a2ab6103134906)
---
 .../pyflink/datastream/tests/test_data_stream.py   | 52 +++++++++++++++++++---
 .../fn_execution/datastream/merging_window_set.py  |  2 +-
 flink-python/pyflink/fn_execution/state_impl.py    |  9 ++--
 .../streaming/api/utils/PythonOperatorUtils.java   | 11 +++++
 .../utils/output/OutputWithTimerRowHandler.java    |  8 ++--
 5 files changed, 66 insertions(+), 16 deletions(-)

diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 1e3ce18..25099d3 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -855,11 +855,30 @@ class DataStreamTests(object):
 
         self.env.execute('test_time_window')
         result = self.test_sink.get_results()
-        expected_result = ['(hi,1)', '(hi,1)', '(hi,2)', '(hi,3)']
+        expected_result = ['(hi,1)', '(hi,3)', '(hi,3)']
         result.sort()
         expected_result.sort()
         self.assertEqual(expected_result, result)
 
+    def test_session_window_late_merge(self):
+        self.env.set_parallelism(1)
+        data_stream = self.env.from_collection([
+            ('hi', 0), ('hi', 2), ('hi', 1)],
+            type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: DataStream
+        watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+            .with_timestamp_assigner(SecondColumnTimestampAssigner())
+        data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+            .key_by(lambda x: x[0], key_type=Types.STRING()) \
+            .window(SimpleMergeTimeWindowAssigner()) \
+            .process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
+            .add_sink(self.test_sink)
+
+        self.env.execute('test_session_window_late_merge')
+        results = self.test_sink.get_results()
+        expected = ['(hi,3)']
+        results.sort()
+        self.assertEqual(expected, results)
+
 
 class StreamingModeDataStreamTests(DataStreamTests, PyFlinkStreamingTestCase):
     def test_data_stream_name(self):
@@ -1545,12 +1564,31 @@ class SimpleMergeTimeWindowAssigner(MergingWindowAssigner[tuple, TimeWindow]):
     def merge_windows(self,
                       windows: Iterable[TimeWindow],
                       callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None:
-        window_list = [w for w in windows]
-        window_list.sort()
-        for i in range(1, len(window_list)):
-            if window_list[i - 1].end > window_list[i].start:
-                callback.merge([window_list[i - 1], window_list[i]],
-                               TimeWindow(window_list[i - 1].start, window_list[i].end))
+        sorted_windows = list(windows)
+        sorted_windows.sort()
+        merged = []
+        current_merge = None
+        current_merge_set = set()
+
+        for candidate in sorted_windows:
+            if current_merge is None:
+                current_merge = candidate
+                current_merge_set.add(candidate)
+            elif current_merge.intersects(candidate):
+                current_merge = current_merge.cover(candidate)
+                current_merge_set.add(candidate)
+            else:
+                merged.append((current_merge, current_merge_set))
+                current_merge = candidate
+                current_merge_set = set()
+                current_merge_set.add(candidate)
+
+        if current_merge is not None:
+            merged.append((current_merge, current_merge_set))
+
+        for merge_key, merge_set in merged:
+            if len(merge_set) > 1:
+                callback.merge(merge_set, merge_key)
 
     def assign_windows(self,
                        element: tuple,
diff --git a/flink-python/pyflink/fn_execution/datastream/merging_window_set.py b/flink-python/pyflink/fn_execution/datastream/merging_window_set.py
index 0bc33f8..14d95f1 100644
--- a/flink-python/pyflink/fn_execution/datastream/merging_window_set.py
+++ b/flink-python/pyflink/fn_execution/datastream/merging_window_set.py
@@ -102,7 +102,7 @@ class MergingWindowSet(Generic[W]):
             self._mapping[merge_result] = merged_state_window
             merged_state_windows.remove(merged_state_window)
 
-            if merge_result not in merged_windows and len(merged_windows) == 1:
+            if merge_result not in merged_windows or len(merged_windows) != 1:
                 merge_function.merge(
                     merge_result,
                     merged_windows,
diff --git a/flink-python/pyflink/fn_execution/state_impl.py b/flink-python/pyflink/fn_execution/state_impl.py
index 79d6de9..50a2115 100644
--- a/flink-python/pyflink/fn_execution/state_impl.py
+++ b/flink-python/pyflink/fn_execution/state_impl.py
@@ -1063,13 +1063,14 @@ class RemoteKeyedStateBackend(object):
             self._map_state_handler.clear(self._clear_iterator_mark)
 
     def merge_namespaces(self, state: SynchronousMergingRuntimeState, target, sources):
+        for source in sources:
+            state.set_current_namespace(source)
+            self.commit_internal_state(state.get_internal_state())
         state.set_current_namespace(target)
         self.commit_internal_state(state.get_internal_state())
         encoded_target_namespace = self._encode_namespace(target)
-        encoded_namespaces = [encoded_target_namespace]
-        for source in sources:
-            encoded_namespaces.append(self._encode_namespace(source))
-        self.clear_state_cache(state, encoded_namespaces)
+        encoded_namespaces = [self._encode_namespace(source) for source in sources]
+        self.clear_state_cache(state, [encoded_target_namespace] + encoded_namespaces)
 
         state_key = self.get_bag_state_key(
             state.name, self._encoded_current_key, encoded_target_namespace)
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java
index 76a8a45..d5206fb 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService;
 import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
 import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
@@ -203,6 +205,15 @@ public enum PythonOperatorUtils {
         }
     }
 
+    /** Set the current key for the timer service. */
+    public static <K, N> void setCurrentKeyForTimerService(
+            InternalTimerService<N> internalTimerService, K currentKey) throws Exception {
+        if (internalTimerService instanceof BatchExecutionInternalTimeService) {
+            ((BatchExecutionInternalTimeService<K, N>) internalTimerService)
+                    .setCurrentKey(currentKey);
+        }
+    }
+
     public static <K> boolean inBatchExecutionMode(KeyedStateBackend<K> stateBackend) {
         return stateBackend instanceof BatchExecutionKeyedStateBackend;
     }
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/OutputWithTimerRowHandler.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/OutputWithTimerRowHandler.java
index 7350e7a..a5ac5cf 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/OutputWithTimerRowHandler.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/OutputWithTimerRowHandler.java
@@ -29,10 +29,9 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.KeyContext;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
 import org.apache.flink.types.Row;
 
-import java.io.IOException;
-
 /** This handler can accepts the runner output which contains timer registration event. */
 public class OutputWithTimerRowHandler {
 
@@ -59,7 +58,7 @@ public class OutputWithTimerRowHandler {
         this.baisWrapper = new DataInputViewStreamWrapper(bais);
     }
 
-    public void accept(Row runnerOutput, long timestamp) throws IOException {
+    public void accept(Row runnerOutput, long timestamp) throws Exception {
         switch (RunnerOutputType.valueOf((byte) runnerOutput.getField(0))) {
             case NORMAL_RECORD:
                 onData(timestamp, runnerOutput.getField(1));
@@ -87,9 +86,10 @@ public class OutputWithTimerRowHandler {
     }
 
     private void onTimerOperation(
-            TimerOperandType operandType, long time, Row key, Object namespace) {
+            TimerOperandType operandType, long time, Row key, Object namespace) throws Exception {
         synchronized (keyedStateBackend) {
             keyContext.setCurrentKey(key);
+            PythonOperatorUtils.setCurrentKeyForTimerService(internalTimerService, key);
             switch (operandType) {
                 case REGISTER_EVENT_TIMER:
                     internalTimerService.registerEventTimeTimer(namespace, time);