You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by he...@apache.org on 2020/08/16 13:13:13 UTC
[flink] branch master updated: [FLINK-18945][python] Support
CoFlatMap for Python DataStream API (#13152)
This is an automated email from the ASF dual-hosted git repository.
hequn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ecc3429 [FLINK-18945][python] Support CoFlatMap for Python DataStream API (#13152)
ecc3429 is described below
commit ecc342904404c272939196cb9b30803cc518faa6
Author: Hequn Cheng <he...@alibaba-inc.com>
AuthorDate: Sun Aug 16 21:12:04 2020 +0800
[FLINK-18945][python] Support CoFlatMap for Python DataStream API (#13152)
---
flink-python/pyflink/datastream/data_stream.py | 38 ++++++++++++++--
flink-python/pyflink/datastream/functions.py | 53 +++++++++++++++++++++-
.../pyflink/datastream/tests/test_data_stream.py | 44 +++++++++++++++++-
.../pyflink/fn_execution/operation_utils.py | 8 ++++
4 files changed, 136 insertions(+), 7 deletions(-)
diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py
index 17abb70..a5f9c1d 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -24,7 +24,7 @@ from pyflink.common.typeinfo import TypeInformation
from pyflink.datastream.functions import _get_python_env, FlatMapFunctionWrapper, FlatMapFunction, \
MapFunction, MapFunctionWrapper, Function, FunctionWrapper, SinkFunction, FilterFunction, \
FilterFunctionWrapper, KeySelectorFunctionWrapper, KeySelector, ReduceFunction, \
- ReduceFunctionWrapper, CoMapFunction
+ ReduceFunctionWrapper, CoMapFunction, CoFlatMapFunction
from pyflink.java_gateway import get_gateway
@@ -799,7 +799,7 @@ class ConnectedStreams(object):
self.stream1 = stream1
self.stream2 = stream2
- def map(self, func: CoMapFunction, type_info: TypeInformation = None) \
+ def map(self, func: CoMapFunction, output_type: TypeInformation = None) \
-> 'DataStream':
"""
Applies a CoMap transformation on a `ConnectedStreams` and maps the output to a common
@@ -819,8 +819,38 @@ class ConnectedStreams(object):
j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
from pyflink.fn_execution import flink_fn_execution_pb2
j_operator, j_output_type = self._get_connected_stream_operator(
- func, type_info, func_name, flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP)
- return DataStream(j_connected_stream.transform("Co-Process", j_output_type, j_operator))
+ func,
+ output_type,
+ func_name,
+ flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP)
+ return DataStream(j_connected_stream.transform("Co-Map", j_output_type, j_operator))
+
+ def flat_map(self, func: CoFlatMapFunction, output_type: TypeInformation = None) \
+ -> 'DataStream':
+ """
+ Applies a CoFlatMap transformation on a `ConnectedStreams` and maps the output to a
+ common type. The transformation calls a `CoFlatMapFunction.flatMap1` for each element
+ of the first input and `CoFlatMapFunction.flatMap2` for each element of the second
+ input. Each CoFlatMapFunction call returns any number of elements including none.
+
+ :param func: The CoFlatMapFunction used to jointly transform the two input DataStreams
+ :param output_type: `TypeInformation` for the result type of the function.
+ :return: The transformed `DataStream`
+ """
+
+ if not isinstance(func, CoFlatMapFunction):
+ raise TypeError("The input must be a CoFlatMapFunction!")
+ func_name = str(func)
+
+ # get connected stream
+ j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
+ from pyflink.fn_execution import flink_fn_execution_pb2
+ j_operator, j_output_type = self._get_connected_stream_operator(
+ func,
+ output_type,
+ func_name,
+ flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_FLAT_MAP)
+ return DataStream(j_connected_stream.transform("Co-Flat Map", j_output_type, j_operator))
def _get_connected_stream_operator(self, func: Union[Function, FunctionWrapper],
type_info: TypeInformation, func_name: str,
diff --git a/flink-python/pyflink/datastream/functions.py b/flink-python/pyflink/datastream/functions.py
index 10433ee..76f5025 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -63,7 +63,7 @@ class CoMapFunction(Function):
The same instance of the transformation function is used to transform both of
the connected streams. That way, the stream transformations can share state.
- The basic syntax for using a MapFunction is as follows:
+ The basic syntax for using a CoMapFunction is as follows:
::
>>> ds1 = ...
>>> ds2 = ...
@@ -121,6 +121,57 @@ class FlatMapFunction(Function):
pass
+class CoFlatMapFunction(Function):
+ """
+ A CoFlatMapFunction implements a flat-map transformation over two connected streams.
+
+ The same instance of the transformation function is used to transform both of the
+ connected streams. That way, the stream transformations can share state.
+
+ An example for the use of connected streams would be to apply rules that change over time
+ onto elements of a stream. One of the connected streams has the rules, the other stream the
+ elements to apply the rules to. The operation on the connected stream maintains the
+ current set of rules in the state. It may receive either a rule update (from the first stream)
+ and update the state, or a data element (from the second stream) and apply the rules in the
+ state to the element. The result of applying the rules would be emitted.
+
+ The basic syntax for using a CoFlatMapFunction is as follows:
+ ::
+ >>> ds1 = ...
+ >>> ds2 = ...
+
+ >>> class MyCoFlatMapFunction(CoFlatMapFunction):
+ >>> def flat_map1(self, value):
+ >>> for i in range(value):
+ >>> yield i
+ >>> def flat_map2(self, value):
+ >>> for i in range(value):
+ >>> yield i
+
+ >>> new_ds = ds1.connect(ds2).flat_map(MyCoFlatMapFunction())
+ """
+
+ @abc.abstractmethod
+ def flat_map1(self, value):
+ """
+ This method is called for each element in the first of the connected streams.
+
+ :param value: The input value.
+ :return: A genertaor
+ """
+ pass
+
+ @abc.abstractmethod
+ def flat_map2(self, value):
+ """
+ This method is called for each element in the second of the connected streams.
+
+ :param value: The input value.
+ :return: A genertaor
+ """
+ pass
+
+
class ReduceFunction(Function):
"""
Base interface for Reduce functions. Reduce functions combine groups of elements to a single
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py
index d05b2e7..c6c3665 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -22,7 +22,7 @@ from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import FilterFunction
from pyflink.datastream.functions import KeySelector
from pyflink.datastream.functions import MapFunction, FlatMapFunction
-from pyflink.datastream.functions import CoMapFunction
+from pyflink.datastream.functions import CoMapFunction, CoFlatMapFunction
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkTestCase
@@ -126,7 +126,7 @@ class DataStreamTests(PyFlinkTestCase):
type_info=Types.ROW([Types.INT(), Types.INT()]))
ds2 = self.env.from_collection([("a", "a"), ("b", "b"), ("c", "c")],
type_info=Types.ROW([Types.STRING(), Types.STRING()]))
- ds1.connect(ds2).map(MyCoMapFunction(), type_info=Types.STRING()).add_sink(self.test_sink)
+ ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.STRING()).add_sink(self.test_sink)
self.env.execute('co_map_function_test')
results = self.test_sink.get_results(False)
expected = ['2', '3', '4', 'a', 'b', 'c']
@@ -178,6 +178,35 @@ class DataStreamTests(PyFlinkTestCase):
expected.sort()
self.assertEqual(expected, results)
+ def test_co_flat_map_function_without_data_types(self):
+ self.env.set_parallelism(1)
+ ds1 = self.env.from_collection([(1, 1), (2, 2), (3, 3)],
+ type_info=Types.ROW([Types.INT(), Types.INT()]))
+ ds2 = self.env.from_collection([("a", "a"), ("b", "b"), ("c", "c")],
+ type_info=Types.ROW([Types.STRING(), Types.STRING()]))
+ ds1.connect(ds2).flat_map(MyCoFlatMapFunction()).add_sink(self.test_sink)
+ self.env.execute('co_flat_map_function_test')
+ results = self.test_sink.get_results(True)
+ expected = ['2', '2', '3', '3', '4', '4', 'b']
+ expected.sort()
+ results.sort()
+ self.assertEqual(expected, results)
+
+ def test_co_flat_map_function_with_data_types(self):
+ self.env.set_parallelism(1)
+ ds1 = self.env.from_collection([(1, 1), (2, 2), (3, 3)],
+ type_info=Types.ROW([Types.INT(), Types.INT()]))
+ ds2 = self.env.from_collection([("a", "a"), ("b", "b"), ("c", "c")],
+ type_info=Types.ROW([Types.STRING(), Types.STRING()]))
+ ds1.connect(ds2).flat_map(MyCoFlatMapFunction(), output_type=Types.STRING())\
+ .add_sink(self.test_sink)
+ self.env.execute('co_flat_map_function_test')
+ results = self.test_sink.get_results(False)
+ expected = ['2', '2', '3', '3', '4', '4', 'b']
+ expected.sort()
+ results.sort()
+ self.assertEqual(expected, results)
+
def test_filter_without_data_types(self):
ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')])
ds.filter(MyFilterFunction()).add_sink(self.test_sink)
@@ -469,3 +498,14 @@ class MyCoMapFunction(CoMapFunction):
def map2(self, value):
return value[0]
+
+
+class MyCoFlatMapFunction(CoFlatMapFunction):
+
+ def flat_map1(self, value):
+ yield str(value[0] + 1)
+ yield str(value[0] + 1)
+
+ def flat_map2(self, value):
+ if value[0] == 'b':
+ yield value[0]
diff --git a/flink-python/pyflink/fn_execution/operation_utils.py b/flink-python/pyflink/fn_execution/operation_utils.py
index c361681..a85cd55 100644
--- a/flink-python/pyflink/fn_execution/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/operation_utils.py
@@ -114,6 +114,14 @@ def extract_data_stream_stateless_funcs(udfs):
def wrap_func(value):
return co_map_func.map1(value[1]) if value[0] else co_map_func.map2(value[2])
func = wrap_func
+ elif func_type == udf.CO_FLAT_MAP:
+ co_flat_map_func = cloudpickle.loads(udfs[0].payload)
+
+ def wrap_func(value):
+ return co_flat_map_func.flat_map1(
+ value[1]) if value[0] else co_flat_map_func.flat_map2(
+ value[2])
+ func = wrap_func
return func