You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/08/08 12:59:11 UTC
[flink] branch master updated: [FLINK-28788][python] Support SideOutput in Thread Mode
This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7047916a487 [FLINK-28788][python] Support SideOutput in Thread Mode
7047916a487 is described below
commit 7047916a487ed68c8e01ad005384b787501345cb
Author: huangxingbo <hx...@apache.org>
AuthorDate: Mon Aug 8 12:58:33 2022 +0800
[FLINK-28788][python] Support SideOutput in Thread Mode
This closes #20488.
---
.../pyflink/datastream/tests/test_data_stream.py | 318 ++++++++++-----------
.../pyflink/datastream/tests/test_window.py | 14 +-
.../fn_execution/datastream/embedded/operations.py | 41 ++-
.../datastream/embedded/process_function.py | 6 +-
.../datastream/embedded/side_output_context.py | 46 +++
.../fn_execution/embedded/operation_utils.py | 6 +-
.../pyflink/fn_execution/embedded/operations.py | 4 +
.../chain/PythonOperatorChainingOptimizer.java | 45 ++-
.../apache/flink/python/util/PythonConfigUtil.java | 9 +-
.../python/DataStreamPythonFunctionOperator.java | 9 +
...ctEmbeddedDataStreamPythonFunctionOperator.java | 71 +++++
...ractOneInputEmbeddedPythonFunctionOperator.java | 2 +
...ractTwoInputEmbeddedPythonFunctionOperator.java | 2 +
.../embedded/EmbeddedPythonCoProcessOperator.java | 2 +-
.../EmbeddedPythonKeyedCoProcessOperator.java | 2 +-
.../EmbeddedPythonKeyedProcessOperator.java | 2 +-
.../embedded/EmbeddedPythonProcessOperator.java | 2 +-
.../embedded/EmbeddedPythonWindowOperator.java | 2 +-
...ctExternalDataStreamPythonFunctionOperator.java | 8 +-
19 files changed, 371 insertions(+), 220 deletions(-)
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 5b62715e3af..79165d40490 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -526,6 +526,165 @@ class DataStreamTests(object):
]
self.assert_equals_sorted(expected, self.test_sink.get_results())
+ def test_side_output_chained_with_upstream_operator(self):
+ tag = OutputTag("side", Types.INT())
+
+ ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
+ type_info=Types.ROW([Types.STRING(), Types.INT()]))
+
+ class MyProcessFunction(ProcessFunction):
+
+ def process_element(self, value, ctx: 'ProcessFunction.Context'):
+ yield value[0]
+ yield tag, value[1]
+
+ ds2 = ds.map(lambda e: (e[0], e[1]+1)) \
+ .process(MyProcessFunction(), output_type=Types.STRING())
+ main_sink = DataStreamTestSinkFunction()
+ ds2.add_sink(main_sink)
+ side_sink = DataStreamTestSinkFunction()
+ ds2.get_side_output(tag).add_sink(side_sink)
+
+ self.env.execute("test_side_output_chained_with_upstream_operator")
+ main_expected = ['a', 'b', 'c']
+ self.assert_equals_sorted(main_expected, main_sink.get_results())
+ side_expected = ['1', '2', '3']
+ self.assert_equals_sorted(side_expected, side_sink.get_results())
+
+ def test_process_multiple_side_output(self):
+ tag1 = OutputTag("side1", Types.INT())
+ tag2 = OutputTag("side2", Types.STRING())
+
+ ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
+ type_info=Types.ROW([Types.STRING(), Types.INT()]))
+
+ class MyProcessFunction(ProcessFunction):
+
+ def process_element(self, value, ctx: 'ProcessFunction.Context'):
+ yield value[0]
+ yield tag1, value[1]
+ yield tag2, value[0] + str(value[1])
+
+ ds2 = ds.process(MyProcessFunction(), output_type=Types.STRING())
+ main_sink = DataStreamTestSinkFunction()
+ ds2.add_sink(main_sink)
+ side1_sink = DataStreamTestSinkFunction()
+ ds2.get_side_output(tag1).add_sink(side1_sink)
+ side2_sink = DataStreamTestSinkFunction()
+ ds2.get_side_output(tag2).add_sink(side2_sink)
+
+ self.env.execute("test_process_multiple_side_output")
+ main_expected = ['a', 'b', 'c']
+ self.assert_equals_sorted(main_expected, main_sink.get_results())
+ side1_expected = ['0', '1', '2']
+ self.assert_equals_sorted(side1_expected, side1_sink.get_results())
+ side2_expected = ['a0', 'b1', 'c2']
+ self.assert_equals_sorted(side2_expected, side2_sink.get_results())
+
+ def test_co_process_side_output(self):
+ tag = OutputTag("side", Types.INT())
+
+ class MyCoProcessFunction(CoProcessFunction):
+
+ def process_element1(self, value, ctx: 'CoProcessFunction.Context'):
+ yield value[0]
+ yield tag, value[1]
+
+ def process_element2(self, value, ctx: 'CoProcessFunction.Context'):
+ yield value[1]
+ yield tag, value[0]
+
+ ds1 = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
+ type_info=Types.ROW([Types.STRING(), Types.INT()]))
+ ds2 = self.env.from_collection([(3, 'c'), (1, 'a'), (0, 'd')],
+ type_info=Types.ROW([Types.INT(), Types.STRING()]))
+ ds3 = ds1.connect(ds2).process(MyCoProcessFunction(), output_type=Types.STRING())
+ ds3.add_sink(self.test_sink)
+ side_sink = DataStreamTestSinkFunction()
+ ds3.get_side_output(tag).add_sink(side_sink)
+
+ self.env.execute("test_co_process_side_output")
+ main_expected = ['a', 'a', 'b', 'c', 'c', 'd']
+ self.assert_equals_sorted(main_expected, self.test_sink.get_results())
+ side_expected = ['0', '0', '1', '1', '2', '3']
+ self.assert_equals_sorted(side_expected, side_sink.get_results())
+
+ def test_keyed_process_side_output(self):
+ tag = OutputTag("side", Types.INT())
+
+ ds = self.env.from_collection([('a', 1), ('b', 2), ('a', 3), ('b', 4)],
+ type_info=Types.ROW([Types.STRING(), Types.INT()]))
+
+ class MyKeyedProcessFunction(KeyedProcessFunction):
+
+ def __init__(self):
+ self.reducing_state = None # type: ReducingState
+
+ def open(self, context: RuntimeContext):
+ self.reducing_state = context.get_reducing_state(
+ ReducingStateDescriptor("reduce", lambda i, j: i+j, Types.INT())
+ )
+
+ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
+ yield value[1]
+ self.reducing_state.add(value[1])
+ yield tag, self.reducing_state.get()
+
+ ds2 = ds.key_by(lambda e: e[0]).process(MyKeyedProcessFunction(),
+ output_type=Types.INT())
+ main_sink = DataStreamTestSinkFunction()
+ ds2.add_sink(main_sink)
+ side_sink = DataStreamTestSinkFunction()
+ ds2.get_side_output(tag).add_sink(side_sink)
+
+ self.env.execute("test_keyed_process_side_output")
+ main_expected = ['1', '2', '3', '4']
+ self.assert_equals_sorted(main_expected, main_sink.get_results())
+ side_expected = ['1', '2', '4', '6']
+ self.assert_equals_sorted(side_expected, side_sink.get_results())
+
+ def test_keyed_co_process_side_output(self):
+ tag = OutputTag("side", Types.INT())
+
+ ds1 = self.env.from_collection([('a', 1), ('b', 2), ('a', 3), ('b', 4)],
+ type_info=Types.ROW([Types.STRING(), Types.INT()]))
+ ds2 = self.env.from_collection([(8, 'a'), (7, 'b'), (6, 'a'), (5, 'b')],
+ type_info=Types.ROW([Types.INT(), Types.STRING()]))
+
+ class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
+
+ def __init__(self):
+ self.reducing_state = None # type: ReducingState
+
+ def open(self, context: RuntimeContext):
+ self.reducing_state = context.get_reducing_state(
+ ReducingStateDescriptor("reduce", lambda i, j: i+j, Types.INT())
+ )
+
+ def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'):
+ yield value[1]
+ self.reducing_state.add(1)
+ yield tag, self.reducing_state.get()
+
+ def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'):
+ yield value[0]
+ self.reducing_state.add(1)
+ yield tag, self.reducing_state.get()
+
+ ds3 = ds1.key_by(lambda e: e[0])\
+ .connect(ds2.key_by(lambda e: e[1]))\
+ .process(MyKeyedCoProcessFunction(), output_type=Types.INT())
+ main_sink = DataStreamTestSinkFunction()
+ ds3.add_sink(main_sink)
+ side_sink = DataStreamTestSinkFunction()
+ ds3.get_side_output(tag).add_sink(side_sink)
+
+ self.env.execute("test_keyed_co_process_side_output")
+ main_expected = ['1', '2', '3', '4', '5', '6', '7', '8']
+ self.assert_equals_sorted(main_expected, main_sink.get_results())
+ side_expected = ['1', '1', '2', '2', '3', '3', '4', '4']
+ self.assert_equals_sorted(side_expected, side_sink.get_results())
+
class DataStreamStreamingTests(DataStreamTests):
@@ -996,165 +1155,6 @@ class ProcessDataStreamTests(DataStreamTests):
side_expected = ['0', '1', '2']
self.assert_equals_sorted(side_expected, side_sink.get_results())
- def test_side_output_chained_with_upstream_operator(self):
- tag = OutputTag("side", Types.INT())
-
- ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
- type_info=Types.ROW([Types.STRING(), Types.INT()]))
-
- class MyProcessFunction(ProcessFunction):
-
- def process_element(self, value, ctx: 'ProcessFunction.Context'):
- yield value[0]
- yield tag, value[1]
-
- ds2 = ds.map(lambda e: (e[0], e[1]+1)) \
- .process(MyProcessFunction(), output_type=Types.STRING())
- main_sink = DataStreamTestSinkFunction()
- ds2.add_sink(main_sink)
- side_sink = DataStreamTestSinkFunction()
- ds2.get_side_output(tag).add_sink(side_sink)
-
- self.env.execute("test_side_output_chained_with_upstream_operator")
- main_expected = ['a', 'b', 'c']
- self.assert_equals_sorted(main_expected, main_sink.get_results())
- side_expected = ['1', '2', '3']
- self.assert_equals_sorted(side_expected, side_sink.get_results())
-
- def test_process_multiple_side_output(self):
- tag1 = OutputTag("side1", Types.INT())
- tag2 = OutputTag("side2", Types.STRING())
-
- ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
- type_info=Types.ROW([Types.STRING(), Types.INT()]))
-
- class MyProcessFunction(ProcessFunction):
-
- def process_element(self, value, ctx: 'ProcessFunction.Context'):
- yield value[0]
- yield tag1, value[1]
- yield tag2, value[0] + str(value[1])
-
- ds2 = ds.process(MyProcessFunction(), output_type=Types.STRING())
- main_sink = DataStreamTestSinkFunction()
- ds2.add_sink(main_sink)
- side1_sink = DataStreamTestSinkFunction()
- ds2.get_side_output(tag1).add_sink(side1_sink)
- side2_sink = DataStreamTestSinkFunction()
- ds2.get_side_output(tag2).add_sink(side2_sink)
-
- self.env.execute("test_process_multiple_side_output")
- main_expected = ['a', 'b', 'c']
- self.assert_equals_sorted(main_expected, main_sink.get_results())
- side1_expected = ['0', '1', '2']
- self.assert_equals_sorted(side1_expected, side1_sink.get_results())
- side2_expected = ['a0', 'b1', 'c2']
- self.assert_equals_sorted(side2_expected, side2_sink.get_results())
-
- def test_co_process_side_output(self):
- tag = OutputTag("side", Types.INT())
-
- class MyCoProcessFunction(CoProcessFunction):
-
- def process_element1(self, value, ctx: 'CoProcessFunction.Context'):
- yield value[0]
- yield tag, value[1]
-
- def process_element2(self, value, ctx: 'CoProcessFunction.Context'):
- yield value[1]
- yield tag, value[0]
-
- ds1 = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
- type_info=Types.ROW([Types.STRING(), Types.INT()]))
- ds2 = self.env.from_collection([(3, 'c'), (1, 'a'), (0, 'd')],
- type_info=Types.ROW([Types.INT(), Types.STRING()]))
- ds3 = ds1.connect(ds2).process(MyCoProcessFunction(), output_type=Types.STRING())
- ds3.add_sink(self.test_sink)
- side_sink = DataStreamTestSinkFunction()
- ds3.get_side_output(tag).add_sink(side_sink)
-
- self.env.execute("test_co_process_side_output")
- main_expected = ['a', 'a', 'b', 'c', 'c', 'd']
- self.assert_equals_sorted(main_expected, self.test_sink.get_results())
- side_expected = ['0', '0', '1', '1', '2', '3']
- self.assert_equals_sorted(side_expected, side_sink.get_results())
-
- def test_keyed_process_side_output(self):
- tag = OutputTag("side", Types.INT())
-
- ds = self.env.from_collection([('a', 1), ('b', 2), ('a', 3), ('b', 4)],
- type_info=Types.ROW([Types.STRING(), Types.INT()]))
-
- class MyKeyedProcessFunction(KeyedProcessFunction):
-
- def __init__(self):
- self.reducing_state = None # type: ReducingState
-
- def open(self, context: RuntimeContext):
- self.reducing_state = context.get_reducing_state(
- ReducingStateDescriptor("reduce", lambda i, j: i+j, Types.INT())
- )
-
- def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
- yield value[1]
- self.reducing_state.add(value[1])
- yield tag, self.reducing_state.get()
-
- ds2 = ds.key_by(lambda e: e[0]).process(MyKeyedProcessFunction(),
- output_type=Types.INT())
- main_sink = DataStreamTestSinkFunction()
- ds2.add_sink(main_sink)
- side_sink = DataStreamTestSinkFunction()
- ds2.get_side_output(tag).add_sink(side_sink)
-
- self.env.execute("test_keyed_process_side_output")
- main_expected = ['1', '2', '3', '4']
- self.assert_equals_sorted(main_expected, main_sink.get_results())
- side_expected = ['1', '2', '4', '6']
- self.assert_equals_sorted(side_expected, side_sink.get_results())
-
- def test_keyed_co_process_side_output(self):
- tag = OutputTag("side", Types.INT())
-
- ds1 = self.env.from_collection([('a', 1), ('b', 2), ('a', 3), ('b', 4)],
- type_info=Types.ROW([Types.STRING(), Types.INT()]))
- ds2 = self.env.from_collection([(8, 'a'), (7, 'b'), (6, 'a'), (5, 'b')],
- type_info=Types.ROW([Types.INT(), Types.STRING()]))
-
- class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
-
- def __init__(self):
- self.reducing_state = None # type: ReducingState
-
- def open(self, context: RuntimeContext):
- self.reducing_state = context.get_reducing_state(
- ReducingStateDescriptor("reduce", lambda i, j: i+j, Types.INT())
- )
-
- def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'):
- yield value[1]
- self.reducing_state.add(1)
- yield tag, self.reducing_state.get()
-
- def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'):
- yield value[0]
- self.reducing_state.add(1)
- yield tag, self.reducing_state.get()
-
- ds3 = ds1.key_by(lambda e: e[0])\
- .connect(ds2.key_by(lambda e: e[1]))\
- .process(MyKeyedCoProcessFunction(), output_type=Types.INT())
- main_sink = DataStreamTestSinkFunction()
- ds3.add_sink(main_sink)
- side_sink = DataStreamTestSinkFunction()
- ds3.get_side_output(tag).add_sink(side_sink)
-
- self.env.execute("test_keyed_co_process_side_output")
- main_expected = ['1', '2', '3', '4', '5', '6', '7', '8']
- self.assert_equals_sorted(main_expected, main_sink.get_results())
- side_expected = ['1', '1', '2', '2', '3', '3', '4', '4']
- self.assert_equals_sorted(side_expected, side_sink.get_results())
-
class ProcessDataStreamStreamingTests(DataStreamStreamingTests, ProcessDataStreamTests,
PyFlinkStreamingTestCase):
diff --git a/flink-python/pyflink/datastream/tests/test_window.py b/flink-python/pyflink/datastream/tests/test_window.py
index 886e8456949..04081080653 100644
--- a/flink-python/pyflink/datastream/tests/test_window.py
+++ b/flink-python/pyflink/datastream/tests/test_window.py
@@ -557,13 +557,6 @@ class WindowTests(object):
'key a timestamp sum 15']
self.assert_equals_sorted(expected, results)
-
-class ProcessWindowTests(WindowTests, PyFlinkStreamingTestCase):
- def setUp(self) -> None:
- super(ProcessWindowTests, self).setUp()
- config = get_j_env_configuration(self.env._j_stream_execution_environment)
- config.setString("python.execution-mode", "process")
-
def test_side_output_late_data(self):
self.env.set_parallelism(1)
config = Configuration(
@@ -600,6 +593,13 @@ class ProcessWindowTests(WindowTests, PyFlinkStreamingTestCase):
self.assert_equals_sorted(side_expected, side_sink.get_results())
+class ProcessWindowTests(WindowTests, PyFlinkStreamingTestCase):
+ def setUp(self) -> None:
+ super(ProcessWindowTests, self).setUp()
+ config = get_j_env_configuration(self.env._j_stream_execution_environment)
+ config.setString("python.execution-mode", "process")
+
+
@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
class EmbeddedWindowTests(WindowTests, PyFlinkStreamingTestCase):
def setUp(self) -> None:
diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py
index beef4be98a8..c986e58152e 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from pyflink.datastream import OutputTag
from pyflink.datastream.window import WindowOperationDescriptor
from pyflink.fn_execution import pickle
from pyflink.fn_execution.coders import TimeWindowCoder, CountWindowCoder
@@ -27,6 +28,7 @@ from pyflink.fn_execution.datastream.embedded.process_function import (
InternalKeyedBroadcastProcessFunctionReadOnlyContext,
InternalKeyedBroadcastProcessFunctionOnTimerContext)
from pyflink.fn_execution.datastream.embedded.runtime_context import StreamingRuntimeContext
+from pyflink.fn_execution.datastream.embedded.side_output_context import SideOutputContext
from pyflink.fn_execution.datastream.embedded.timerservice_impl import InternalTimerServiceImpl
from pyflink.fn_execution.datastream.window.window_operator import WindowOperator
from pyflink.fn_execution.embedded.converters import (TimeWindowConverter, CountWindowConverter,
@@ -84,7 +86,7 @@ class TwoInputOperation(operations.TwoInputOperation):
def extract_process_function(
user_defined_function_proto, j_runtime_context, j_function_context, j_timer_context,
- job_parameters, j_keyed_state_backend, j_operator_state_backend):
+ j_side_output_context, job_parameters, j_keyed_state_backend, j_operator_state_backend):
from pyflink.fn_execution import flink_fn_execution_pb2
user_defined_func = pickle.loads(user_defined_function_proto.payload)
@@ -95,6 +97,20 @@ def extract_process_function(
runtime_context = StreamingRuntimeContext.of(j_runtime_context, job_parameters)
+ if j_side_output_context:
+ side_output_context = SideOutputContext(j_side_output_context)
+
+ def process_func(values):
+ for value in values:
+ if isinstance(value, tuple) and isinstance(value[0], OutputTag):
+ output_tag = value[0] # type: OutputTag
+ side_output_context.collect(output_tag.tag_id, value[1])
+ else:
+ yield value
+ else:
+ def process_func(values):
+ yield from values
+
def open_func():
if hasattr(user_defined_func, "open"):
user_defined_func.open(runtime_context)
@@ -109,7 +125,7 @@ def extract_process_function(
process_element = user_defined_func.process_element
def process_element_func(value):
- yield from process_element(value, function_context)
+ yield from process_func(process_element(value, function_context))
return OneInputOperation(open_func, close_func, process_element_func)
@@ -133,10 +149,10 @@ def extract_process_function(
return user_defined_func.process_element(value[1], context)
def on_timer_func(timestamp):
- yield from on_timer(timestamp, timer_context)
+ yield from process_func(on_timer(timestamp, timer_context))
def process_element_func(value):
- yield from process_element(value, function_context)
+ yield from process_func(process_element(value, function_context))
return OneInputOperation(open_func, close_func, process_element_func, on_timer_func)
@@ -147,10 +163,10 @@ def extract_process_function(
process_element2 = user_defined_func.process_element2
def process_element_func1(value):
- yield from process_element1(value, function_context)
+ yield from process_func(process_element1(value, function_context))
def process_element_func2(value):
- yield from process_element2(value, function_context)
+ yield from process_func(process_element2(value, function_context))
return TwoInputOperation(
open_func, close_func, process_element_func1, process_element_func2)
@@ -203,13 +219,13 @@ def extract_process_function(
return user_defined_func.process_element2(value[1], context)
def on_timer_func(timestamp):
- yield from on_timer(timestamp, timer_context)
+ yield from process_func(on_timer(timestamp, timer_context))
def process_element_func1(value):
- yield from process_element1(value, function_context)
+ yield from process_func(process_element1(value, function_context))
def process_element_func2(value):
- yield from process_element2(value, function_context)
+ yield from process_func(process_element2(value, function_context))
return TwoInputOperation(
open_func, close_func, process_element_func1, process_element_func2, on_timer_func)
@@ -307,18 +323,19 @@ def extract_process_function(
window_operator.close()
def process_element_func(value):
- yield from window_operator.process_element(value[1], function_context.timestamp())
+ yield from process_func(
+ window_operator.process_element(value[1], function_context.timestamp()))
if window_assigner.is_event_time():
def on_timer_func(timestamp):
window = window_timer_context.window()
key = window_timer_context.get_current_key()
- yield from window_operator.on_event_time(timestamp, key, window)
+ yield from process_func(window_operator.on_event_time(timestamp, key, window))
else:
def on_timer_func(timestamp):
window = window_timer_context.window()
key = window_timer_context.get_current_key()
- yield from window_operator.on_processing_time(timestamp, key, window)
+ yield from process_func(window_operator.on_processing_time(timestamp, key, window))
return OneInputOperation(open_func, close_func, process_element_func, on_timer_func)
diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py b/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py
index b23b2b3110f..417aa156012 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py
@@ -65,7 +65,7 @@ class InternalKeyedProcessFunctionContext(KeyedProcessFunction.Context,
def __init__(self, context, key_type_info):
self._context = context
self._timer_service = TimerServiceImpl(self._context.timerService())
- self._key_converter = from_type_info(key_type_info)
+ self._key_converter = from_type_info_proto(key_type_info)
def get_current_key(self):
return self._key_converter.to_internal(self._context.getCurrentKey())
@@ -85,7 +85,7 @@ class InternalKeyedProcessFunctionOnTimerContext(KeyedProcessFunction.OnTimerCon
def __init__(self, context, key_type_info):
self._context = context
self._timer_service = TimerServiceImpl(self._context.timerService())
- self._key_converter = from_type_info(key_type_info)
+ self._key_converter = from_type_info_proto(key_type_info)
def timer_service(self) -> TimerService:
return self._timer_service
@@ -103,7 +103,7 @@ class InternalKeyedProcessFunctionOnTimerContext(KeyedProcessFunction.OnTimerCon
class InternalWindowTimerContext(object):
def __init__(self, context, key_type_info, window_converter):
self._context = context
- self._key_converter = from_type_info(key_type_info)
+ self._key_converter = from_type_info_proto(key_type_info)
self._window_converter = window_converter
def timestamp(self) -> int:
diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/side_output_context.py b/flink-python/pyflink/fn_execution/datastream/embedded/side_output_context.py
new file mode 100644
index 00000000000..3580436ed56
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/side_output_context.py
@@ -0,0 +1,46 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict
+
+from pyflink.fn_execution.embedded.converters import from_type_info_proto, DataConverter
+
+
+class SideOutputContext(object):
+ def __init__(self, context):
+ self._context = context
+ self._side_output_converters = (
+ {tag_id: from_type_info_proto(_parse_type_info_proto(payload))
+ for tag_id, payload in
+ context.getAllSideOutputTypeInfoPayloads().items()}) # type: Dict[str, DataConverter]
+
+ def collect(self, tag_id: str, record):
+ try:
+ self._context.collectSideOutputById(
+ tag_id,
+ self._side_output_converters[tag_id].to_external(record))
+ except KeyError:
+ raise Exception("Unknown OutputTag id {0}, supported OutputTag ids are {1}".format(
+ tag_id, list(self._side_output_converters.keys())))
+
+
+def _parse_type_info_proto(type_info_payload):
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
+ type_info = flink_fn_execution_pb2.TypeInfo()
+ type_info.ParseFromString(type_info_payload)
+ return type_info
diff --git a/flink-python/pyflink/fn_execution/embedded/operation_utils.py b/flink-python/pyflink/fn_execution/embedded/operation_utils.py
index 0dadbb359b9..2f6218131f0 100644
--- a/flink-python/pyflink/fn_execution/embedded/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/embedded/operation_utils.py
@@ -103,7 +103,7 @@ def create_table_operation_from_proto(proto, input_coder_info, output_coder_into
def create_one_input_user_defined_data_stream_function_from_protos(
function_infos, input_coder_info, output_coder_info, runtime_context,
- function_context, timer_context, job_parameters, keyed_state_backend,
+ function_context, timer_context, side_output_context, job_parameters, keyed_state_backend,
operator_state_backend):
serialized_fns = [pare_user_defined_data_stream_function_proto(proto)
for proto in function_infos]
@@ -119,6 +119,7 @@ def create_one_input_user_defined_data_stream_function_from_protos(
runtime_context,
function_context,
timer_context,
+ side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend)
@@ -128,7 +129,7 @@ def create_one_input_user_defined_data_stream_function_from_protos(
def create_two_input_user_defined_data_stream_function_from_protos(
function_infos, input_coder_info1, input_coder_info2, output_coder_info, runtime_context,
- function_context, timer_context, job_parameters, keyed_state_backend,
+ function_context, timer_context, side_output_context, job_parameters, keyed_state_backend,
operator_state_backend):
serialized_fns = [pare_user_defined_data_stream_function_proto(proto)
for proto in function_infos]
@@ -150,6 +151,7 @@ def create_two_input_user_defined_data_stream_function_from_protos(
runtime_context,
function_context,
timer_context,
+ side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend)
diff --git a/flink-python/pyflink/fn_execution/embedded/operations.py b/flink-python/pyflink/fn_execution/embedded/operations.py
index be7771f8aca..d4a251db973 100644
--- a/flink-python/pyflink/fn_execution/embedded/operations.py
+++ b/flink-python/pyflink/fn_execution/embedded/operations.py
@@ -66,6 +66,7 @@ class OneInputFunctionOperation(FunctionOperation):
runtime_context,
function_context,
timer_context,
+ side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend):
@@ -75,6 +76,7 @@ class OneInputFunctionOperation(FunctionOperation):
runtime_context,
function_context,
timer_context,
+ side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend)
@@ -100,6 +102,7 @@ class TwoInputFunctionOperation(FunctionOperation):
runtime_context,
function_context,
timer_context,
+ side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend):
@@ -109,6 +112,7 @@ class TwoInputFunctionOperation(FunctionOperation):
runtime_context,
function_context,
timer_context,
+ side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend)
diff --git a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
index a4e99e9a39b..f8f4a35abbe 100644
--- a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
+++ b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionI
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator;
@@ -268,19 +267,22 @@ public class PythonOperatorChainingOptimizer {
@SuppressWarnings("unchecked")
private static Transformation<?> createChainedTransformation(
Transformation<?> upTransform, Transformation<?> downTransform) {
- StreamOperator<?> upOperator =
- ((SimpleOperatorFactory<?>) getOperatorFactory(upTransform)).getOperator();
+ DataStreamPythonFunctionOperator<?> upOperator =
+ (DataStreamPythonFunctionOperator<?>)
+ ((SimpleOperatorFactory<?>) getOperatorFactory(upTransform)).getOperator();
- StreamOperator<?> downOperator =
- ((SimpleOperatorFactory<?>) getOperatorFactory(downTransform)).getOperator();
+ DataStreamPythonFunctionOperator<?> downOperator =
+ (DataStreamPythonFunctionOperator<?>)
+ ((SimpleOperatorFactory<?>) getOperatorFactory(downTransform))
+ .getOperator();
assert arePythonOperatorsInSameExecutionEnvironment(upOperator, downOperator);
final DataStreamPythonFunctionInfo upPythonFunctionInfo =
- ((DataStreamPythonFunctionOperator) upOperator).getPythonFunctionInfo().copy();
+ upOperator.getPythonFunctionInfo().copy();
final DataStreamPythonFunctionInfo downPythonFunctionInfo =
- ((DataStreamPythonFunctionOperator) downOperator).getPythonFunctionInfo().copy();
+ downOperator.getPythonFunctionInfo().copy();
DataStreamPythonFunctionInfo headPythonFunctionInfoOfDownOperator = downPythonFunctionInfo;
while (headPythonFunctionInfoOfDownOperator.getInputs().length != 0) {
@@ -292,17 +294,10 @@ public class PythonOperatorChainingOptimizer {
new DataStreamPythonFunctionInfo[] {upPythonFunctionInfo});
final DataStreamPythonFunctionOperator chainedOperator =
- ((DataStreamPythonFunctionOperator) upOperator)
- .copy(
- downPythonFunctionInfo,
- ((DataStreamPythonFunctionOperator) downOperator)
- .getProducedType());
- if (chainedOperator instanceof AbstractExternalDataStreamPythonFunctionOperator) {
- ((AbstractExternalDataStreamPythonFunctionOperator) chainedOperator)
- .addSideOutputTags(
- ((AbstractExternalDataStreamPythonFunctionOperator) downOperator)
- .getSideOutputTags());
- }
+ upOperator.copy(
+ downPythonFunctionInfo,
+ ((DataStreamPythonFunctionOperator) downOperator).getProducedType());
+ chainedOperator.addSideOutputTags(downOperator.getSideOutputTags());
PhysicalTransformation<?> chainedTransformation;
if (upOperator instanceof OneInputStreamOperator) {
@@ -411,11 +406,14 @@ public class PythonOperatorChainingOptimizer {
return false;
}
- StreamOperator<?> upOperator =
- ((SimpleOperatorFactory<?>) getOperatorFactory(upTransform)).getOperator();
+ DataStreamPythonFunctionOperator<?> upOperator =
+ (DataStreamPythonFunctionOperator<?>)
+ ((SimpleOperatorFactory<?>) getOperatorFactory(upTransform)).getOperator();
- StreamOperator<?> downOperator =
- ((SimpleOperatorFactory<?>) getOperatorFactory(downTransform)).getOperator();
+ DataStreamPythonFunctionOperator<?> downOperator =
+ (DataStreamPythonFunctionOperator<?>)
+ ((SimpleOperatorFactory<?>) getOperatorFactory(downTransform))
+ .getOperator();
if (!arePythonOperatorsInSameExecutionEnvironment(upOperator, downOperator)) {
return false;
@@ -435,7 +433,8 @@ public class PythonOperatorChainingOptimizer {
}
private static boolean arePythonOperatorsInSameExecutionEnvironment(
- StreamOperator<?> upOperator, StreamOperator<?> downOperator) {
+ DataStreamPythonFunctionOperator<?> upOperator,
+ DataStreamPythonFunctionOperator<?> downOperator) {
return upOperator instanceof AbstractExternalDataStreamPythonFunctionOperator
&& downOperator instanceof AbstractExternalDataStreamPythonFunctionOperator
|| upOperator instanceof AbstractEmbeddedDataStreamPythonFunctionOperator
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index e94b2c69e33..733b874859c 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator;
-import org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator;
import org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -60,6 +59,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -150,11 +150,12 @@ public class PythonConfigUtil {
final Transformation<?> upTransform =
Iterables.getOnlyElement(sideTransform.getInputs());
if (PythonConfigUtil.isPythonDataStreamOperator(upTransform)) {
- final AbstractExternalDataStreamPythonFunctionOperator<?> upOperator =
- (AbstractExternalDataStreamPythonFunctionOperator<?>)
+ final DataStreamPythonFunctionOperator<?> upOperator =
+ (DataStreamPythonFunctionOperator<?>)
((SimpleOperatorFactory<?>) getOperatorFactory(upTransform))
.getOperator();
- upOperator.addSideOutputTag(sideTransform.getOutputTag());
+ upOperator.addSideOutputTags(
+ Collections.singletonList(sideTransform.getOutputTag()));
}
}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/DataStreamPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/DataStreamPythonFunctionOperator.java
index 4b68f111f24..0afe77c5288 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/DataStreamPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/DataStreamPythonFunctionOperator.java
@@ -22,6 +22,9 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.util.OutputTag;
+
+import java.util.Collection;
/** Interface for Python DataStream operators. */
@Internal
@@ -36,6 +39,12 @@ public interface DataStreamPythonFunctionOperator<OUT> extends ResultTypeQueryab
/** Returns the underlying {@link DataStreamPythonFunctionInfo}. */
DataStreamPythonFunctionInfo getPythonFunctionInfo();
+ /** Add a collection of {@link OutputTag}s to the operator. */
+ void addSideOutputTags(Collection<OutputTag<?>> outputTags);
+
+ /** Gets the {@link OutputTag}s belongs to the operator. */
+ Collection<OutputTag<?>> getSideOutputTags();
+
/**
* Make a copy of the DataStreamPythonFunctionOperator with the given pythonFunctionInfo and
* outputTypeInfo. This is used for chaining optimization which may need to update the
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedDataStreamPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedDataStreamPythonFunctionOperator.java
index 75ed3e89718..356f4c4e212 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedDataStreamPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedDataStreamPythonFunctionOperator.java
@@ -21,13 +21,17 @@ package org.apache.flink.streaming.api.operators.python.embedded;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -46,6 +50,8 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
/** The serialized python function to be executed. */
private final DataStreamPythonFunctionInfo pythonFunctionInfo;
+ private final Map<String, OutputTag<?>> sideOutputTags;
+
/** The TypeInformation of output data. */
protected final TypeInformation<OUT> outputTypeInfo;
@@ -56,6 +62,10 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
protected transient TimestampedCollector<OUT> collector;
+ protected transient boolean hasSideOutput;
+
+ protected transient SideOutputContext sideOutputContext;
+
public AbstractEmbeddedDataStreamPythonFunctionOperator(
Configuration config,
DataStreamPythonFunctionInfo pythonFunctionInfo,
@@ -63,10 +73,18 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
super(config);
this.pythonFunctionInfo = Preconditions.checkNotNull(pythonFunctionInfo);
this.outputTypeInfo = Preconditions.checkNotNull(outputTypeInfo);
+ this.sideOutputTags = new HashMap<>();
}
@Override
public void open() throws Exception {
+ hasSideOutput = !sideOutputTags.isEmpty();
+
+ if (hasSideOutput) {
+ sideOutputContext = new SideOutputContext();
+ sideOutputContext.open();
+ }
+
super.open();
outputDataConverter =
@@ -90,6 +108,18 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
return pythonFunctionInfo;
}
+ @Override
+ public void addSideOutputTags(Collection<OutputTag<?>> outputTags) {
+ for (OutputTag<?> outputTag : outputTags) {
+ sideOutputTags.put(outputTag.getId(), outputTag);
+ }
+ }
+
+ @Override
+ public Collection<OutputTag<?>> getSideOutputTags() {
+ return sideOutputTags.values();
+ }
+
public Map<String, String> getJobParameters() {
Map<String, String> jobParameters = new HashMap<>();
if (numPartitions != null) {
@@ -104,4 +134,45 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
}
return jobParameters;
}
+
+ private class SideOutputContext {
+
+ private Map<String, byte[]> sideOutputTypeInfoPayloads = new HashMap<>();
+
+ private Map<String, PythonTypeUtils.DataConverter<Object, Object>>
+ sideOutputDataConverters = new HashMap<>();
+
+ @SuppressWarnings("unchecked")
+ public void open() {
+ for (Map.Entry<String, OutputTag<?>> entry : sideOutputTags.entrySet()) {
+ sideOutputTypeInfoPayloads.put(
+ entry.getKey(), getSideOutputTypeInfoPayload(entry.getValue()));
+
+ sideOutputDataConverters.put(
+ entry.getKey(),
+ PythonTypeUtils.TypeInfoToDataConverter.typeInfoDataConverter(
+ (TypeInformation) entry.getValue().getTypeInfo()));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void collectSideOutputById(String id, Object record) {
+ OutputTag<?> outputTag = sideOutputTags.get(id);
+ PythonTypeUtils.DataConverter<Object, Object> sideOutputDataConverter =
+ sideOutputDataConverters.get(id);
+ collector.collect(
+ outputTag, new StreamRecord(sideOutputDataConverter.toInternal(record)));
+ }
+
+ public Map<String, byte[]> getAllSideOutputTypeInfoPayloads() {
+ return sideOutputTypeInfoPayloads;
+ }
+
+ private byte[] getSideOutputTypeInfoPayload(OutputTag<?> outputTag) {
+ FlinkFnApi.TypeInfo outputTypeInfo =
+ PythonTypeUtils.TypeInfoToProtoConverter.toTypeInfoProto(
+ outputTag.getTypeInfo(), getRuntimeContext().getUserCodeClassLoader());
+ return outputTypeInfo.toByteArray();
+ }
+ }
}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
index b02f3bfcb32..6b06d2e5ebf 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
@@ -117,6 +117,7 @@ public abstract class AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT>
interpreter.set("job_parameters", getJobParameters());
interpreter.set("keyed_state_backend", getKeyedStateBackend());
interpreter.set("operator_state_backend", getOperatorStateBackend());
+ interpreter.set("side_output_context", sideOutputContext);
interpreter.exec(
"from pyflink.fn_execution.embedded.operation_utils import create_one_input_user_defined_data_stream_function_from_protos");
@@ -129,6 +130,7 @@ public abstract class AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT>
+ "runtime_context,"
+ "function_context,"
+ "timer_context,"
+ + "side_output_context,"
+ "job_parameters,"
+ "keyed_state_backend,"
+ "operator_state_backend)");
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
index b0767a309fe..6de7e6b8213 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
@@ -135,6 +135,7 @@ public abstract class AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O
interpreter.set("runtime_context", getRuntimeContext());
interpreter.set("function_context", getFunctionContext());
interpreter.set("timer_context", getTimerContext());
+ interpreter.set("side_output_context", sideOutputContext);
interpreter.set("keyed_state_backend", getKeyedStateBackend());
interpreter.set("job_parameters", getJobParameters());
interpreter.set("operator_state_backend", getOperatorStateBackend());
@@ -151,6 +152,7 @@ public abstract class AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O
+ "runtime_context,"
+ "function_context,"
+ "timer_context,"
+ + "side_output_context,"
+ "job_parameters,"
+ "keyed_state_backend,"
+ "operator_state_backend)");
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonCoProcessOperator.java
index c6b0d78116c..840c751afe0 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonCoProcessOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonCoProcessOperator.java
@@ -77,7 +77,7 @@ public class EmbeddedPythonCoProcessOperator<IN1, IN2, OUT>
inBatchExecutionMode(getKeyedStateBackend()),
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED),
- false,
+ hasSideOutput,
config.get(STATE_CACHE_SIZE),
config.get(MAP_STATE_READ_CACHE_SIZE),
config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
index 65075c37ff9..edeee270208 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
@@ -105,7 +105,7 @@ public class EmbeddedPythonKeyedCoProcessOperator<K, IN1, IN2, OUT>
inBatchExecutionMode(getKeyedStateBackend()),
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED),
- false,
+ hasSideOutput,
config.get(STATE_CACHE_SIZE),
config.get(MAP_STATE_READ_CACHE_SIZE),
config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
index 76fdf58058a..d0db39d37cc 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
@@ -104,7 +104,7 @@ public class EmbeddedPythonKeyedProcessOperator<K, IN, OUT>
inBatchExecutionMode(getKeyedStateBackend()),
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED),
- false,
+ hasSideOutput,
config.get(STATE_CACHE_SIZE),
config.get(MAP_STATE_READ_CACHE_SIZE),
config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonProcessOperator.java
index efcceaddc29..acab1c2f531 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonProcessOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonProcessOperator.java
@@ -76,7 +76,7 @@ public class EmbeddedPythonProcessOperator<IN, OUT>
inBatchExecutionMode(getKeyedStateBackend()),
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED),
- false,
+ hasSideOutput,
config.get(STATE_CACHE_SIZE),
config.get(MAP_STATE_READ_CACHE_SIZE),
config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
index 94c68045205..4756d538333 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
@@ -105,7 +105,7 @@ public class EmbeddedPythonWindowOperator<K, IN, OUT, W extends Window>
inBatchExecutionMode(getKeyedStateBackend()),
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED),
- false,
+ hasSideOutput,
config.get(STATE_CACHE_SIZE),
config.get(MAP_STATE_READ_CACHE_SIZE),
config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalDataStreamPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalDataStreamPythonFunctionOperator.java
index 31d1b8d5875..73712e895ee 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalDataStreamPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalDataStreamPythonFunctionOperator.java
@@ -117,17 +117,15 @@ public abstract class AbstractExternalDataStreamPythonFunctionOperator<OUT>
// Side outputs
// ----------------------------------------------------------------------
- public void addSideOutputTag(OutputTag<?> outputTag) {
- sideOutputTags.put(outputTag.getId(), outputTag);
- }
-
protected OutputTag<?> getOutputTagById(String id) {
Preconditions.checkArgument(sideOutputTags.containsKey(id));
return sideOutputTags.get(id);
}
public void addSideOutputTags(Collection<OutputTag<?>> outputTags) {
- outputTags.forEach(this::addSideOutputTag);
+ for (OutputTag<?> outputTag : outputTags) {
+ sideOutputTags.put(outputTag.getId(), outputTag);
+ }
}
public Collection<OutputTag<?>> getSideOutputTags() {