You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/03/22 14:11:56 UTC
[flink] branch release-1.13 updated: [FLINK-26536][python] Fix RemoteKeyedStateBackend#merge_namespaces to handle properly for the timerservice
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 9abd874 [FLINK-26536][python] Fix RemoteKeyedStateBackend#merge_namespaces to handle properly for the timerservice
9abd874 is described below
commit 9abd874ee1c1d9ef9294a23b67d5ab7b4e956008
Author: Juntao Hu <ma...@gmail.com>
AuthorDate: Tue Mar 8 17:18:57 2022 +0800
[FLINK-26536][python] Fix RemoteKeyedStateBackend#merge_namespaces to handle properly for the timerservice
This closes #19173.
(cherry picked from commit a54c11040dd0103ffce5146a09a2ab6103134906)
---
.../pyflink/datastream/tests/test_data_stream.py | 52 +++++++++++++++++++---
.../fn_execution/datastream/merging_window_set.py | 2 +-
flink-python/pyflink/fn_execution/state_impl.py | 9 ++--
.../streaming/api/utils/PythonOperatorUtils.java | 11 +++++
.../utils/output/OutputWithTimerRowHandler.java | 8 ++--
5 files changed, 66 insertions(+), 16 deletions(-)
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 1e3ce18..25099d3 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -855,11 +855,30 @@ class DataStreamTests(object):
self.env.execute('test_time_window')
result = self.test_sink.get_results()
- expected_result = ['(hi,1)', '(hi,1)', '(hi,2)', '(hi,3)']
+ expected_result = ['(hi,1)', '(hi,3)', '(hi,3)']
result.sort()
expected_result.sort()
self.assertEqual(expected_result, result)
+ def test_session_window_late_merge(self):
+ self.env.set_parallelism(1)
+ data_stream = self.env.from_collection([
+ ('hi', 0), ('hi', 2), ('hi', 1)],
+ type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream
+ watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+ .with_timestamp_assigner(SecondColumnTimestampAssigner())
+ data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+ .key_by(lambda x: x[0], key_type=Types.STRING()) \
+ .window(SimpleMergeTimeWindowAssigner()) \
+ .process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
+ .add_sink(self.test_sink)
+
+ self.env.execute('test_session_window_late_merge')
+ results = self.test_sink.get_results()
+ expected = ['(hi,3)']
+ results.sort()
+ self.assertEqual(expected, results)
+
class StreamingModeDataStreamTests(DataStreamTests, PyFlinkStreamingTestCase):
def test_data_stream_name(self):
@@ -1545,12 +1564,31 @@ class SimpleMergeTimeWindowAssigner(MergingWindowAssigner[tuple, TimeWindow]):
def merge_windows(self,
windows: Iterable[TimeWindow],
callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None:
- window_list = [w for w in windows]
- window_list.sort()
- for i in range(1, len(window_list)):
- if window_list[i - 1].end > window_list[i].start:
- callback.merge([window_list[i - 1], window_list[i]],
- TimeWindow(window_list[i - 1].start, window_list[i].end))
+ sorted_windows = list(windows)
+ sorted_windows.sort()
+ merged = []
+ current_merge = None
+ current_merge_set = set()
+
+ for candidate in sorted_windows:
+ if current_merge is None:
+ current_merge = candidate
+ current_merge_set.add(candidate)
+ elif current_merge.intersects(candidate):
+ current_merge = current_merge.cover(candidate)
+ current_merge_set.add(candidate)
+ else:
+ merged.append((current_merge, current_merge_set))
+ current_merge = candidate
+ current_merge_set = set()
+ current_merge_set.add(candidate)
+
+ if current_merge is not None:
+ merged.append((current_merge, current_merge_set))
+
+ for merge_key, merge_set in merged:
+ if len(merge_set) > 1:
+ callback.merge(merge_set, merge_key)
def assign_windows(self,
element: tuple,
diff --git a/flink-python/pyflink/fn_execution/datastream/merging_window_set.py b/flink-python/pyflink/fn_execution/datastream/merging_window_set.py
index 0bc33f8..14d95f1 100644
--- a/flink-python/pyflink/fn_execution/datastream/merging_window_set.py
+++ b/flink-python/pyflink/fn_execution/datastream/merging_window_set.py
@@ -102,7 +102,7 @@ class MergingWindowSet(Generic[W]):
self._mapping[merge_result] = merged_state_window
merged_state_windows.remove(merged_state_window)
- if merge_result not in merged_windows and len(merged_windows) == 1:
+ if merge_result not in merged_windows or len(merged_windows) != 1:
merge_function.merge(
merge_result,
merged_windows,
diff --git a/flink-python/pyflink/fn_execution/state_impl.py b/flink-python/pyflink/fn_execution/state_impl.py
index 79d6de9..50a2115 100644
--- a/flink-python/pyflink/fn_execution/state_impl.py
+++ b/flink-python/pyflink/fn_execution/state_impl.py
@@ -1063,13 +1063,14 @@ class RemoteKeyedStateBackend(object):
self._map_state_handler.clear(self._clear_iterator_mark)
def merge_namespaces(self, state: SynchronousMergingRuntimeState, target, sources):
+ for source in sources:
+ state.set_current_namespace(source)
+ self.commit_internal_state(state.get_internal_state())
state.set_current_namespace(target)
self.commit_internal_state(state.get_internal_state())
encoded_target_namespace = self._encode_namespace(target)
- encoded_namespaces = [encoded_target_namespace]
- for source in sources:
- encoded_namespaces.append(self._encode_namespace(source))
- self.clear_state_cache(state, encoded_namespaces)
+ encoded_namespaces = [self._encode_namespace(source) for source in sources]
+ self.clear_state_cache(state, [encoded_target_namespace] + encoded_namespaces)
state_key = self.get_bag_state_key(
state.name, self._encoded_current_key, encoded_target_namespace)
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java
index 76a8a45..d5206fb 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
@@ -203,6 +205,15 @@ public enum PythonOperatorUtils {
}
}
+ /** Set the current key for the timer service. */
+ public static <K, N> void setCurrentKeyForTimerService(
+ InternalTimerService<N> internalTimerService, K currentKey) throws Exception {
+ if (internalTimerService instanceof BatchExecutionInternalTimeService) {
+ ((BatchExecutionInternalTimeService<K, N>) internalTimerService)
+ .setCurrentKey(currentKey);
+ }
+ }
+
public static <K> boolean inBatchExecutionMode(KeyedStateBackend<K> stateBackend) {
return stateBackend instanceof BatchExecutionKeyedStateBackend;
}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/OutputWithTimerRowHandler.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/OutputWithTimerRowHandler.java
index 7350e7a..a5ac5cf 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/OutputWithTimerRowHandler.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/OutputWithTimerRowHandler.java
@@ -29,10 +29,9 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.types.Row;
-import java.io.IOException;
-
/** This handler can accepts the runner output which contains timer registration event. */
public class OutputWithTimerRowHandler {
@@ -59,7 +58,7 @@ public class OutputWithTimerRowHandler {
this.baisWrapper = new DataInputViewStreamWrapper(bais);
}
- public void accept(Row runnerOutput, long timestamp) throws IOException {
+ public void accept(Row runnerOutput, long timestamp) throws Exception {
switch (RunnerOutputType.valueOf((byte) runnerOutput.getField(0))) {
case NORMAL_RECORD:
onData(timestamp, runnerOutput.getField(1));
@@ -87,9 +86,10 @@ public class OutputWithTimerRowHandler {
}
private void onTimerOperation(
- TimerOperandType operandType, long time, Row key, Object namespace) {
+ TimerOperandType operandType, long time, Row key, Object namespace) throws Exception {
synchronized (keyedStateBackend) {
keyContext.setCurrentKey(key);
+ PythonOperatorUtils.setCurrentKeyForTimerService(internalTimerService, key);
switch (operandType) {
case REGISTER_EVENT_TIMER:
internalTimerService.registerEventTimeTimer(namespace, time);