You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/01/06 11:35:39 UTC

[flink] branch release-1.13 updated: [FLINK-25513][python] Handle properly for None result in flat_map and map of ConnectedStream

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 1dc9faf  [FLINK-25513][python] Handle properly for None result in flat_map and map of ConnectedStream
1dc9faf is described below

commit 1dc9faf75907c9a6b4051335b5a3083802ea053a
Author: atptour2017 <49...@qq.com>
AuthorDate: Thu Jan 6 13:45:02 2022 +0800

    [FLINK-25513][python] Handle properly for None result in flat_map and map of ConnectedStream
    
    This closes #18280.
---
 flink-python/pyflink/datastream/data_stream.py | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py
index 48e8113..68ed512 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -1303,10 +1303,14 @@ class ConnectedStreams(object):
                     self._underlying.open(runtime_context)
 
                 def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'):
-                    yield self._underlying.map1(value)
+                    result = self._underlying.map1(value)
+                    if result is not None:
+                        yield result
 
                 def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'):
-                    yield self._underlying.map2(value)
+                    result = self._underlying.map2(value)
+                    if result is not None:
+                        yield result
 
                 def close(self):
                     self._underlying.close()
@@ -1350,10 +1354,14 @@ class ConnectedStreams(object):
                     self._underlying.open(runtime_context)
 
                 def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'):
-                    yield from self._underlying.flat_map1(value)
+                    result = self._underlying.flat_map1(value)
+                    if result:
+                        yield from result
 
                 def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'):
-                    yield from self._underlying.flat_map2(value)
+                    result = self._underlying.flat_map2(value)
+                    if result:
+                        yield from result
 
                 def close(self):
                     self._underlying.close()