You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/01/06 11:35:39 UTC
[flink] branch release-1.13 updated: [FLINK-25513][python] Handle properly for None result in flat_map and map of ConnectedStream
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 1dc9faf [FLINK-25513][python] Handle properly for None result in flat_map and map of ConnectedStream
1dc9faf is described below
commit 1dc9faf75907c9a6b4051335b5a3083802ea053a
Author: atptour2017 <49...@qq.com>
AuthorDate: Thu Jan 6 13:45:02 2022 +0800
[FLINK-25513][python] Handle properly for None result in flat_map and map of ConnectedStream
This closes #18280.
---
flink-python/pyflink/datastream/data_stream.py | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py
index 48e8113..68ed512 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -1303,10 +1303,14 @@ class ConnectedStreams(object):
self._underlying.open(runtime_context)
def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'):
- yield self._underlying.map1(value)
+ result = self._underlying.map1(value)
+ if result is not None:
+ yield result
def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'):
- yield self._underlying.map2(value)
+ result = self._underlying.map2(value)
+ if result is not None:
+ yield result
def close(self):
self._underlying.close()
@@ -1350,10 +1354,14 @@ class ConnectedStreams(object):
self._underlying.open(runtime_context)
def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'):
- yield from self._underlying.flat_map1(value)
+ result = self._underlying.flat_map1(value)
+ if result:
+ yield from result
def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'):
- yield from self._underlying.flat_map2(value)
+ result = self._underlying.flat_map2(value)
+ if result:
+ yield from result
def close(self):
self._underlying.close()