You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by he...@apache.org on 2020/08/16 13:13:13 UTC

[flink] branch master updated: [FLINK-18945][python] Support CoFlatMap for Python DataStream API (#13152)

This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ecc3429  [FLINK-18945][python] Support CoFlatMap for Python DataStream API (#13152)
ecc3429 is described below

commit ecc342904404c272939196cb9b30803cc518faa6
Author: Hequn Cheng <he...@alibaba-inc.com>
AuthorDate: Sun Aug 16 21:12:04 2020 +0800

    [FLINK-18945][python] Support CoFlatMap for Python DataStream API (#13152)
---
 flink-python/pyflink/datastream/data_stream.py     | 38 ++++++++++++++--
 flink-python/pyflink/datastream/functions.py       | 53 +++++++++++++++++++++-
 .../pyflink/datastream/tests/test_data_stream.py   | 44 +++++++++++++++++-
 .../pyflink/fn_execution/operation_utils.py        |  8 ++++
 4 files changed, 136 insertions(+), 7 deletions(-)

diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py
index 17abb70..a5f9c1d 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -24,7 +24,7 @@ from pyflink.common.typeinfo import TypeInformation
 from pyflink.datastream.functions import _get_python_env, FlatMapFunctionWrapper, FlatMapFunction, \
     MapFunction, MapFunctionWrapper, Function, FunctionWrapper, SinkFunction, FilterFunction, \
     FilterFunctionWrapper, KeySelectorFunctionWrapper, KeySelector, ReduceFunction, \
-    ReduceFunctionWrapper, CoMapFunction
+    ReduceFunctionWrapper, CoMapFunction, CoFlatMapFunction
 from pyflink.java_gateway import get_gateway
 
 
@@ -799,7 +799,7 @@ class ConnectedStreams(object):
         self.stream1 = stream1
         self.stream2 = stream2
 
-    def map(self, func: CoMapFunction, type_info: TypeInformation = None) \
+    def map(self, func: CoMapFunction, output_type: TypeInformation = None) \
             -> 'DataStream':
         """
         Applies a CoMap transformation on a `ConnectedStreams` and maps the output to a common
@@ -819,8 +819,38 @@ class ConnectedStreams(object):
         j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
         from pyflink.fn_execution import flink_fn_execution_pb2
         j_operator, j_output_type = self._get_connected_stream_operator(
-            func, type_info, func_name, flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP)
-        return DataStream(j_connected_stream.transform("Co-Process", j_output_type, j_operator))
+            func,
+            output_type,
+            func_name,
+            flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP)
+        return DataStream(j_connected_stream.transform("Co-Map", j_output_type, j_operator))
+
+    def flat_map(self, func: CoFlatMapFunction, output_type: TypeInformation = None) \
+            -> 'DataStream':
+        """
+        Applies a CoFlatMap transformation on a `ConnectedStreams` and maps the output to a
+        common type. The transformation calls a `CoFlatMapFunction.flatMap1` for each element
+        of the first input and `CoFlatMapFunction.flatMap2` for each element of the second
+        input. Each CoFlatMapFunction call returns any number of elements including none.
+
+        :param func: The CoFlatMapFunction used to jointly transform the two input DataStreams
+        :param output_type: `TypeInformation` for the result type of the function.
+        :return: The transformed `DataStream`
+        """
+
+        if not isinstance(func, CoFlatMapFunction):
+            raise TypeError("The input must be a CoFlatMapFunction!")
+        func_name = str(func)
+
+        # get connected stream
+        j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
+        from pyflink.fn_execution import flink_fn_execution_pb2
+        j_operator, j_output_type = self._get_connected_stream_operator(
+            func,
+            output_type,
+            func_name,
+            flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_FLAT_MAP)
+        return DataStream(j_connected_stream.transform("Co-Flat Map", j_output_type, j_operator))
 
     def _get_connected_stream_operator(self, func: Union[Function, FunctionWrapper],
                                        type_info: TypeInformation, func_name: str,
diff --git a/flink-python/pyflink/datastream/functions.py b/flink-python/pyflink/datastream/functions.py
index 10433ee..76f5025 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -63,7 +63,7 @@ class CoMapFunction(Function):
     The same instance of the transformation function is used to transform both of
     the connected streams. That way, the stream transformations can share state.
 
-    The basic syntax for using a MapFunction is as follows:
+    The basic syntax for using a CoMapFunction is as follows:
     ::
         >>> ds1 = ...
         >>> ds2 = ...
@@ -121,6 +121,57 @@ class FlatMapFunction(Function):
         pass
 
 
+class CoFlatMapFunction(Function):
+    """
+    A CoFlatMapFunction implements a flat-map transformation over two connected streams.
+
+    The same instance of the transformation function is used to transform both of the
+    connected streams. That way, the stream transformations can share state.
+
+    An example for the use of connected streams would be to apply rules that change over time
+    onto elements of a stream. One of the connected streams has the rules, the other stream the
+    elements to apply the rules to. The operation on the connected stream maintains the
+    current set of rules in the state. It may receive either a rule update (from the first stream)
+    and update the state, or a data element (from the second stream) and apply the rules in the
+    state to the element. The result of applying the rules would be emitted.
+
+    The basic syntax for using a CoFlatMapFunction is as follows:
+    ::
+        >>> ds1 = ...
+        >>> ds2 = ...
+
+        >>> class MyCoFlatMapFunction(CoFlatMapFunction):
+        >>>     def flat_map1(self, value):
+        >>>         for i in range(value):
+        >>>             yield i
+        >>>     def flat_map2(self, value):
+        >>>         for i in range(value):
+        >>>             yield i
+
+        >>> new_ds = ds1.connect(ds2).flat_map(MyCoFlatMapFunction())
+    """
+
+    @abc.abstractmethod
+    def flat_map1(self, value):
+        """
+        This method is called for each element in the first of the connected streams.
+
+        :param value: The input value.
+        :return: A genertaor
+        """
+        pass
+
+    @abc.abstractmethod
+    def flat_map2(self, value):
+        """
+        This method is called for each element in the second of the connected streams.
+
+        :param value: The input value.
+        :return: A genertaor
+        """
+        pass
+
+
 class ReduceFunction(Function):
     """
     Base interface for Reduce functions. Reduce functions combine groups of elements to a single
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py
index d05b2e7..c6c3665 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -22,7 +22,7 @@ from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.functions import FilterFunction
 from pyflink.datastream.functions import KeySelector
 from pyflink.datastream.functions import MapFunction, FlatMapFunction
-from pyflink.datastream.functions import CoMapFunction
+from pyflink.datastream.functions import CoMapFunction, CoFlatMapFunction
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import PyFlinkTestCase
@@ -126,7 +126,7 @@ class DataStreamTests(PyFlinkTestCase):
                                        type_info=Types.ROW([Types.INT(), Types.INT()]))
         ds2 = self.env.from_collection([("a", "a"), ("b", "b"), ("c", "c")],
                                        type_info=Types.ROW([Types.STRING(), Types.STRING()]))
-        ds1.connect(ds2).map(MyCoMapFunction(), type_info=Types.STRING()).add_sink(self.test_sink)
+        ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.STRING()).add_sink(self.test_sink)
         self.env.execute('co_map_function_test')
         results = self.test_sink.get_results(False)
         expected = ['2', '3', '4', 'a', 'b', 'c']
@@ -178,6 +178,35 @@ class DataStreamTests(PyFlinkTestCase):
         expected.sort()
         self.assertEqual(expected, results)
 
+    def test_co_flat_map_function_without_data_types(self):
+        self.env.set_parallelism(1)
+        ds1 = self.env.from_collection([(1, 1), (2, 2), (3, 3)],
+                                       type_info=Types.ROW([Types.INT(), Types.INT()]))
+        ds2 = self.env.from_collection([("a", "a"), ("b", "b"), ("c", "c")],
+                                       type_info=Types.ROW([Types.STRING(), Types.STRING()]))
+        ds1.connect(ds2).flat_map(MyCoFlatMapFunction()).add_sink(self.test_sink)
+        self.env.execute('co_flat_map_function_test')
+        results = self.test_sink.get_results(True)
+        expected = ['2', '2', '3', '3', '4', '4', 'b']
+        expected.sort()
+        results.sort()
+        self.assertEqual(expected, results)
+
+    def test_co_flat_map_function_with_data_types(self):
+        self.env.set_parallelism(1)
+        ds1 = self.env.from_collection([(1, 1), (2, 2), (3, 3)],
+                                       type_info=Types.ROW([Types.INT(), Types.INT()]))
+        ds2 = self.env.from_collection([("a", "a"), ("b", "b"), ("c", "c")],
+                                       type_info=Types.ROW([Types.STRING(), Types.STRING()]))
+        ds1.connect(ds2).flat_map(MyCoFlatMapFunction(), output_type=Types.STRING())\
+            .add_sink(self.test_sink)
+        self.env.execute('co_flat_map_function_test')
+        results = self.test_sink.get_results(False)
+        expected = ['2', '2', '3', '3', '4', '4', 'b']
+        expected.sort()
+        results.sort()
+        self.assertEqual(expected, results)
+
     def test_filter_without_data_types(self):
         ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')])
         ds.filter(MyFilterFunction()).add_sink(self.test_sink)
@@ -469,3 +498,14 @@ class MyCoMapFunction(CoMapFunction):
 
     def map2(self, value):
         return value[0]
+
+
+class MyCoFlatMapFunction(CoFlatMapFunction):
+
+    def flat_map1(self, value):
+        yield str(value[0] + 1)
+        yield str(value[0] + 1)
+
+    def flat_map2(self, value):
+        if value[0] == 'b':
+            yield value[0]
diff --git a/flink-python/pyflink/fn_execution/operation_utils.py b/flink-python/pyflink/fn_execution/operation_utils.py
index c361681..a85cd55 100644
--- a/flink-python/pyflink/fn_execution/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/operation_utils.py
@@ -114,6 +114,14 @@ def extract_data_stream_stateless_funcs(udfs):
         def wrap_func(value):
             return co_map_func.map1(value[1]) if value[0] else co_map_func.map2(value[2])
         func = wrap_func
+    elif func_type == udf.CO_FLAT_MAP:
+        co_flat_map_func = cloudpickle.loads(udfs[0].payload)
+
+        def wrap_func(value):
+            return co_flat_map_func.flat_map1(
+                value[1]) if value[0] else co_flat_map_func.flat_map2(
+                value[2])
+        func = wrap_func
     return func