You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/08/08 12:59:11 UTC

[flink] branch master updated: [FLINK-28788][python] Support SideOutput in Thread Mode

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7047916a487 [FLINK-28788][python] Support SideOutput in Thread Mode
7047916a487 is described below

commit 7047916a487ed68c8e01ad005384b787501345cb
Author: huangxingbo <hx...@apache.org>
AuthorDate: Mon Aug 8 12:58:33 2022 +0800

    [FLINK-28788][python] Support SideOutput in Thread Mode
    
    This closes #20488.
---
 .../pyflink/datastream/tests/test_data_stream.py   | 318 ++++++++++-----------
 .../pyflink/datastream/tests/test_window.py        |  14 +-
 .../fn_execution/datastream/embedded/operations.py |  41 ++-
 .../datastream/embedded/process_function.py        |   6 +-
 .../datastream/embedded/side_output_context.py     |  46 +++
 .../fn_execution/embedded/operation_utils.py       |   6 +-
 .../pyflink/fn_execution/embedded/operations.py    |   4 +
 .../chain/PythonOperatorChainingOptimizer.java     |  45 ++-
 .../apache/flink/python/util/PythonConfigUtil.java |   9 +-
 .../python/DataStreamPythonFunctionOperator.java   |   9 +
 ...ctEmbeddedDataStreamPythonFunctionOperator.java |  71 +++++
 ...ractOneInputEmbeddedPythonFunctionOperator.java |   2 +
 ...ractTwoInputEmbeddedPythonFunctionOperator.java |   2 +
 .../embedded/EmbeddedPythonCoProcessOperator.java  |   2 +-
 .../EmbeddedPythonKeyedCoProcessOperator.java      |   2 +-
 .../EmbeddedPythonKeyedProcessOperator.java        |   2 +-
 .../embedded/EmbeddedPythonProcessOperator.java    |   2 +-
 .../embedded/EmbeddedPythonWindowOperator.java     |   2 +-
 ...ctExternalDataStreamPythonFunctionOperator.java |   8 +-
 19 files changed, 371 insertions(+), 220 deletions(-)

diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 5b62715e3af..79165d40490 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -526,6 +526,165 @@ class DataStreamTests(object):
         ]
         self.assert_equals_sorted(expected, self.test_sink.get_results())
 
+    def test_side_output_chained_with_upstream_operator(self):
+        tag = OutputTag("side", Types.INT())
+
+        ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
+                                      type_info=Types.ROW([Types.STRING(), Types.INT()]))
+
+        class MyProcessFunction(ProcessFunction):
+
+            def process_element(self, value, ctx: 'ProcessFunction.Context'):
+                yield value[0]
+                yield tag, value[1]
+
+        ds2 = ds.map(lambda e: (e[0], e[1]+1)) \
+            .process(MyProcessFunction(), output_type=Types.STRING())
+        main_sink = DataStreamTestSinkFunction()
+        ds2.add_sink(main_sink)
+        side_sink = DataStreamTestSinkFunction()
+        ds2.get_side_output(tag).add_sink(side_sink)
+
+        self.env.execute("test_side_output_chained_with_upstream_operator")
+        main_expected = ['a', 'b', 'c']
+        self.assert_equals_sorted(main_expected, main_sink.get_results())
+        side_expected = ['1', '2', '3']
+        self.assert_equals_sorted(side_expected, side_sink.get_results())
+
+    def test_process_multiple_side_output(self):
+        tag1 = OutputTag("side1", Types.INT())
+        tag2 = OutputTag("side2", Types.STRING())
+
+        ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
+                                      type_info=Types.ROW([Types.STRING(), Types.INT()]))
+
+        class MyProcessFunction(ProcessFunction):
+
+            def process_element(self, value, ctx: 'ProcessFunction.Context'):
+                yield value[0]
+                yield tag1, value[1]
+                yield tag2, value[0] + str(value[1])
+
+        ds2 = ds.process(MyProcessFunction(), output_type=Types.STRING())
+        main_sink = DataStreamTestSinkFunction()
+        ds2.add_sink(main_sink)
+        side1_sink = DataStreamTestSinkFunction()
+        ds2.get_side_output(tag1).add_sink(side1_sink)
+        side2_sink = DataStreamTestSinkFunction()
+        ds2.get_side_output(tag2).add_sink(side2_sink)
+
+        self.env.execute("test_process_multiple_side_output")
+        main_expected = ['a', 'b', 'c']
+        self.assert_equals_sorted(main_expected, main_sink.get_results())
+        side1_expected = ['0', '1', '2']
+        self.assert_equals_sorted(side1_expected, side1_sink.get_results())
+        side2_expected = ['a0', 'b1', 'c2']
+        self.assert_equals_sorted(side2_expected, side2_sink.get_results())
+
+    def test_co_process_side_output(self):
+        tag = OutputTag("side", Types.INT())
+
+        class MyCoProcessFunction(CoProcessFunction):
+
+            def process_element1(self, value, ctx: 'CoProcessFunction.Context'):
+                yield value[0]
+                yield tag, value[1]
+
+            def process_element2(self, value, ctx: 'CoProcessFunction.Context'):
+                yield value[1]
+                yield tag, value[0]
+
+        ds1 = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
+                                       type_info=Types.ROW([Types.STRING(), Types.INT()]))
+        ds2 = self.env.from_collection([(3, 'c'), (1, 'a'), (0, 'd')],
+                                       type_info=Types.ROW([Types.INT(), Types.STRING()]))
+        ds3 = ds1.connect(ds2).process(MyCoProcessFunction(), output_type=Types.STRING())
+        ds3.add_sink(self.test_sink)
+        side_sink = DataStreamTestSinkFunction()
+        ds3.get_side_output(tag).add_sink(side_sink)
+
+        self.env.execute("test_co_process_side_output")
+        main_expected = ['a', 'a', 'b', 'c', 'c', 'd']
+        self.assert_equals_sorted(main_expected, self.test_sink.get_results())
+        side_expected = ['0', '0', '1', '1', '2', '3']
+        self.assert_equals_sorted(side_expected, side_sink.get_results())
+
+    def test_keyed_process_side_output(self):
+        tag = OutputTag("side", Types.INT())
+
+        ds = self.env.from_collection([('a', 1), ('b', 2), ('a', 3), ('b', 4)],
+                                      type_info=Types.ROW([Types.STRING(), Types.INT()]))
+
+        class MyKeyedProcessFunction(KeyedProcessFunction):
+
+            def __init__(self):
+                self.reducing_state = None  # type: ReducingState
+
+            def open(self, context: RuntimeContext):
+                self.reducing_state = context.get_reducing_state(
+                    ReducingStateDescriptor("reduce", lambda i, j: i+j, Types.INT())
+                )
+
+            def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
+                yield value[1]
+                self.reducing_state.add(value[1])
+                yield tag, self.reducing_state.get()
+
+        ds2 = ds.key_by(lambda e: e[0]).process(MyKeyedProcessFunction(),
+                                                output_type=Types.INT())
+        main_sink = DataStreamTestSinkFunction()
+        ds2.add_sink(main_sink)
+        side_sink = DataStreamTestSinkFunction()
+        ds2.get_side_output(tag).add_sink(side_sink)
+
+        self.env.execute("test_keyed_process_side_output")
+        main_expected = ['1', '2', '3', '4']
+        self.assert_equals_sorted(main_expected, main_sink.get_results())
+        side_expected = ['1', '2', '4', '6']
+        self.assert_equals_sorted(side_expected, side_sink.get_results())
+
+    def test_keyed_co_process_side_output(self):
+        tag = OutputTag("side", Types.INT())
+
+        ds1 = self.env.from_collection([('a', 1), ('b', 2), ('a', 3), ('b', 4)],
+                                       type_info=Types.ROW([Types.STRING(), Types.INT()]))
+        ds2 = self.env.from_collection([(8, 'a'), (7, 'b'), (6, 'a'), (5, 'b')],
+                                       type_info=Types.ROW([Types.INT(), Types.STRING()]))
+
+        class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
+
+            def __init__(self):
+                self.reducing_state = None  # type: ReducingState
+
+            def open(self, context: RuntimeContext):
+                self.reducing_state = context.get_reducing_state(
+                    ReducingStateDescriptor("reduce", lambda i, j: i+j, Types.INT())
+                )
+
+            def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'):
+                yield value[1]
+                self.reducing_state.add(1)
+                yield tag, self.reducing_state.get()
+
+            def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'):
+                yield value[0]
+                self.reducing_state.add(1)
+                yield tag, self.reducing_state.get()
+
+        ds3 = ds1.key_by(lambda e: e[0])\
+            .connect(ds2.key_by(lambda e: e[1]))\
+            .process(MyKeyedCoProcessFunction(), output_type=Types.INT())
+        main_sink = DataStreamTestSinkFunction()
+        ds3.add_sink(main_sink)
+        side_sink = DataStreamTestSinkFunction()
+        ds3.get_side_output(tag).add_sink(side_sink)
+
+        self.env.execute("test_keyed_co_process_side_output")
+        main_expected = ['1', '2', '3', '4', '5', '6', '7', '8']
+        self.assert_equals_sorted(main_expected, main_sink.get_results())
+        side_expected = ['1', '1', '2', '2', '3', '3', '4', '4']
+        self.assert_equals_sorted(side_expected, side_sink.get_results())
+
 
 class DataStreamStreamingTests(DataStreamTests):
 
@@ -996,165 +1155,6 @@ class ProcessDataStreamTests(DataStreamTests):
         side_expected = ['0', '1', '2']
         self.assert_equals_sorted(side_expected, side_sink.get_results())
 
-    def test_side_output_chained_with_upstream_operator(self):
-        tag = OutputTag("side", Types.INT())
-
-        ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
-                                      type_info=Types.ROW([Types.STRING(), Types.INT()]))
-
-        class MyProcessFunction(ProcessFunction):
-
-            def process_element(self, value, ctx: 'ProcessFunction.Context'):
-                yield value[0]
-                yield tag, value[1]
-
-        ds2 = ds.map(lambda e: (e[0], e[1]+1)) \
-            .process(MyProcessFunction(), output_type=Types.STRING())
-        main_sink = DataStreamTestSinkFunction()
-        ds2.add_sink(main_sink)
-        side_sink = DataStreamTestSinkFunction()
-        ds2.get_side_output(tag).add_sink(side_sink)
-
-        self.env.execute("test_side_output_chained_with_upstream_operator")
-        main_expected = ['a', 'b', 'c']
-        self.assert_equals_sorted(main_expected, main_sink.get_results())
-        side_expected = ['1', '2', '3']
-        self.assert_equals_sorted(side_expected, side_sink.get_results())
-
-    def test_process_multiple_side_output(self):
-        tag1 = OutputTag("side1", Types.INT())
-        tag2 = OutputTag("side2", Types.STRING())
-
-        ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
-                                      type_info=Types.ROW([Types.STRING(), Types.INT()]))
-
-        class MyProcessFunction(ProcessFunction):
-
-            def process_element(self, value, ctx: 'ProcessFunction.Context'):
-                yield value[0]
-                yield tag1, value[1]
-                yield tag2, value[0] + str(value[1])
-
-        ds2 = ds.process(MyProcessFunction(), output_type=Types.STRING())
-        main_sink = DataStreamTestSinkFunction()
-        ds2.add_sink(main_sink)
-        side1_sink = DataStreamTestSinkFunction()
-        ds2.get_side_output(tag1).add_sink(side1_sink)
-        side2_sink = DataStreamTestSinkFunction()
-        ds2.get_side_output(tag2).add_sink(side2_sink)
-
-        self.env.execute("test_process_multiple_side_output")
-        main_expected = ['a', 'b', 'c']
-        self.assert_equals_sorted(main_expected, main_sink.get_results())
-        side1_expected = ['0', '1', '2']
-        self.assert_equals_sorted(side1_expected, side1_sink.get_results())
-        side2_expected = ['a0', 'b1', 'c2']
-        self.assert_equals_sorted(side2_expected, side2_sink.get_results())
-
-    def test_co_process_side_output(self):
-        tag = OutputTag("side", Types.INT())
-
-        class MyCoProcessFunction(CoProcessFunction):
-
-            def process_element1(self, value, ctx: 'CoProcessFunction.Context'):
-                yield value[0]
-                yield tag, value[1]
-
-            def process_element2(self, value, ctx: 'CoProcessFunction.Context'):
-                yield value[1]
-                yield tag, value[0]
-
-        ds1 = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)],
-                                       type_info=Types.ROW([Types.STRING(), Types.INT()]))
-        ds2 = self.env.from_collection([(3, 'c'), (1, 'a'), (0, 'd')],
-                                       type_info=Types.ROW([Types.INT(), Types.STRING()]))
-        ds3 = ds1.connect(ds2).process(MyCoProcessFunction(), output_type=Types.STRING())
-        ds3.add_sink(self.test_sink)
-        side_sink = DataStreamTestSinkFunction()
-        ds3.get_side_output(tag).add_sink(side_sink)
-
-        self.env.execute("test_co_process_side_output")
-        main_expected = ['a', 'a', 'b', 'c', 'c', 'd']
-        self.assert_equals_sorted(main_expected, self.test_sink.get_results())
-        side_expected = ['0', '0', '1', '1', '2', '3']
-        self.assert_equals_sorted(side_expected, side_sink.get_results())
-
-    def test_keyed_process_side_output(self):
-        tag = OutputTag("side", Types.INT())
-
-        ds = self.env.from_collection([('a', 1), ('b', 2), ('a', 3), ('b', 4)],
-                                      type_info=Types.ROW([Types.STRING(), Types.INT()]))
-
-        class MyKeyedProcessFunction(KeyedProcessFunction):
-
-            def __init__(self):
-                self.reducing_state = None  # type: ReducingState
-
-            def open(self, context: RuntimeContext):
-                self.reducing_state = context.get_reducing_state(
-                    ReducingStateDescriptor("reduce", lambda i, j: i+j, Types.INT())
-                )
-
-            def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
-                yield value[1]
-                self.reducing_state.add(value[1])
-                yield tag, self.reducing_state.get()
-
-        ds2 = ds.key_by(lambda e: e[0]).process(MyKeyedProcessFunction(),
-                                                output_type=Types.INT())
-        main_sink = DataStreamTestSinkFunction()
-        ds2.add_sink(main_sink)
-        side_sink = DataStreamTestSinkFunction()
-        ds2.get_side_output(tag).add_sink(side_sink)
-
-        self.env.execute("test_keyed_process_side_output")
-        main_expected = ['1', '2', '3', '4']
-        self.assert_equals_sorted(main_expected, main_sink.get_results())
-        side_expected = ['1', '2', '4', '6']
-        self.assert_equals_sorted(side_expected, side_sink.get_results())
-
-    def test_keyed_co_process_side_output(self):
-        tag = OutputTag("side", Types.INT())
-
-        ds1 = self.env.from_collection([('a', 1), ('b', 2), ('a', 3), ('b', 4)],
-                                       type_info=Types.ROW([Types.STRING(), Types.INT()]))
-        ds2 = self.env.from_collection([(8, 'a'), (7, 'b'), (6, 'a'), (5, 'b')],
-                                       type_info=Types.ROW([Types.INT(), Types.STRING()]))
-
-        class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
-
-            def __init__(self):
-                self.reducing_state = None  # type: ReducingState
-
-            def open(self, context: RuntimeContext):
-                self.reducing_state = context.get_reducing_state(
-                    ReducingStateDescriptor("reduce", lambda i, j: i+j, Types.INT())
-                )
-
-            def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'):
-                yield value[1]
-                self.reducing_state.add(1)
-                yield tag, self.reducing_state.get()
-
-            def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'):
-                yield value[0]
-                self.reducing_state.add(1)
-                yield tag, self.reducing_state.get()
-
-        ds3 = ds1.key_by(lambda e: e[0])\
-            .connect(ds2.key_by(lambda e: e[1]))\
-            .process(MyKeyedCoProcessFunction(), output_type=Types.INT())
-        main_sink = DataStreamTestSinkFunction()
-        ds3.add_sink(main_sink)
-        side_sink = DataStreamTestSinkFunction()
-        ds3.get_side_output(tag).add_sink(side_sink)
-
-        self.env.execute("test_keyed_co_process_side_output")
-        main_expected = ['1', '2', '3', '4', '5', '6', '7', '8']
-        self.assert_equals_sorted(main_expected, main_sink.get_results())
-        side_expected = ['1', '1', '2', '2', '3', '3', '4', '4']
-        self.assert_equals_sorted(side_expected, side_sink.get_results())
-
 
 class ProcessDataStreamStreamingTests(DataStreamStreamingTests, ProcessDataStreamTests,
                                       PyFlinkStreamingTestCase):
diff --git a/flink-python/pyflink/datastream/tests/test_window.py b/flink-python/pyflink/datastream/tests/test_window.py
index 886e8456949..04081080653 100644
--- a/flink-python/pyflink/datastream/tests/test_window.py
+++ b/flink-python/pyflink/datastream/tests/test_window.py
@@ -557,13 +557,6 @@ class WindowTests(object):
                     'key a timestamp sum 15']
         self.assert_equals_sorted(expected, results)
 
-
-class ProcessWindowTests(WindowTests, PyFlinkStreamingTestCase):
-    def setUp(self) -> None:
-        super(ProcessWindowTests, self).setUp()
-        config = get_j_env_configuration(self.env._j_stream_execution_environment)
-        config.setString("python.execution-mode", "process")
-
     def test_side_output_late_data(self):
         self.env.set_parallelism(1)
         config = Configuration(
@@ -600,6 +593,13 @@ class ProcessWindowTests(WindowTests, PyFlinkStreamingTestCase):
         self.assert_equals_sorted(side_expected, side_sink.get_results())
 
 
+class ProcessWindowTests(WindowTests, PyFlinkStreamingTestCase):
+    def setUp(self) -> None:
+        super(ProcessWindowTests, self).setUp()
+        config = get_j_env_configuration(self.env._j_stream_execution_environment)
+        config.setString("python.execution-mode", "process")
+
+
 @pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
 class EmbeddedWindowTests(WindowTests, PyFlinkStreamingTestCase):
     def setUp(self) -> None:
diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py
index beef4be98a8..c986e58152e 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.datastream import OutputTag
 from pyflink.datastream.window import WindowOperationDescriptor
 from pyflink.fn_execution import pickle
 from pyflink.fn_execution.coders import TimeWindowCoder, CountWindowCoder
@@ -27,6 +28,7 @@ from pyflink.fn_execution.datastream.embedded.process_function import (
     InternalKeyedBroadcastProcessFunctionReadOnlyContext,
     InternalKeyedBroadcastProcessFunctionOnTimerContext)
 from pyflink.fn_execution.datastream.embedded.runtime_context import StreamingRuntimeContext
+from pyflink.fn_execution.datastream.embedded.side_output_context import SideOutputContext
 from pyflink.fn_execution.datastream.embedded.timerservice_impl import InternalTimerServiceImpl
 from pyflink.fn_execution.datastream.window.window_operator import WindowOperator
 from pyflink.fn_execution.embedded.converters import (TimeWindowConverter, CountWindowConverter,
@@ -84,7 +86,7 @@ class TwoInputOperation(operations.TwoInputOperation):
 
 def extract_process_function(
         user_defined_function_proto, j_runtime_context, j_function_context, j_timer_context,
-        job_parameters, j_keyed_state_backend, j_operator_state_backend):
+        j_side_output_context, job_parameters, j_keyed_state_backend, j_operator_state_backend):
     from pyflink.fn_execution import flink_fn_execution_pb2
 
     user_defined_func = pickle.loads(user_defined_function_proto.payload)
@@ -95,6 +97,20 @@ def extract_process_function(
 
     runtime_context = StreamingRuntimeContext.of(j_runtime_context, job_parameters)
 
+    if j_side_output_context:
+        side_output_context = SideOutputContext(j_side_output_context)
+
+        def process_func(values):
+            for value in values:
+                if isinstance(value, tuple) and isinstance(value[0], OutputTag):
+                    output_tag = value[0]  # type: OutputTag
+                    side_output_context.collect(output_tag.tag_id, value[1])
+                else:
+                    yield value
+    else:
+        def process_func(values):
+            yield from values
+
     def open_func():
         if hasattr(user_defined_func, "open"):
             user_defined_func.open(runtime_context)
@@ -109,7 +125,7 @@ def extract_process_function(
         process_element = user_defined_func.process_element
 
         def process_element_func(value):
-            yield from process_element(value, function_context)
+            yield from process_func(process_element(value, function_context))
 
         return OneInputOperation(open_func, close_func, process_element_func)
 
@@ -133,10 +149,10 @@ def extract_process_function(
             return user_defined_func.process_element(value[1], context)
 
         def on_timer_func(timestamp):
-            yield from on_timer(timestamp, timer_context)
+            yield from process_func(on_timer(timestamp, timer_context))
 
         def process_element_func(value):
-            yield from process_element(value, function_context)
+            yield from process_func(process_element(value, function_context))
 
         return OneInputOperation(open_func, close_func, process_element_func, on_timer_func)
 
@@ -147,10 +163,10 @@ def extract_process_function(
         process_element2 = user_defined_func.process_element2
 
         def process_element_func1(value):
-            yield from process_element1(value, function_context)
+            yield from process_func(process_element1(value, function_context))
 
         def process_element_func2(value):
-            yield from process_element2(value, function_context)
+            yield from process_func(process_element2(value, function_context))
 
         return TwoInputOperation(
             open_func, close_func, process_element_func1, process_element_func2)
@@ -203,13 +219,13 @@ def extract_process_function(
             return user_defined_func.process_element2(value[1], context)
 
         def on_timer_func(timestamp):
-            yield from on_timer(timestamp, timer_context)
+            yield from process_func(on_timer(timestamp, timer_context))
 
         def process_element_func1(value):
-            yield from process_element1(value, function_context)
+            yield from process_func(process_element1(value, function_context))
 
         def process_element_func2(value):
-            yield from process_element2(value, function_context)
+            yield from process_func(process_element2(value, function_context))
 
         return TwoInputOperation(
             open_func, close_func, process_element_func1, process_element_func2, on_timer_func)
@@ -307,18 +323,19 @@ def extract_process_function(
             window_operator.close()
 
         def process_element_func(value):
-            yield from window_operator.process_element(value[1], function_context.timestamp())
+            yield from process_func(
+                window_operator.process_element(value[1], function_context.timestamp()))
 
         if window_assigner.is_event_time():
             def on_timer_func(timestamp):
                 window = window_timer_context.window()
                 key = window_timer_context.get_current_key()
-                yield from window_operator.on_event_time(timestamp, key, window)
+                yield from process_func(window_operator.on_event_time(timestamp, key, window))
         else:
             def on_timer_func(timestamp):
                 window = window_timer_context.window()
                 key = window_timer_context.get_current_key()
-                yield from window_operator.on_processing_time(timestamp, key, window)
+                yield from process_func(window_operator.on_processing_time(timestamp, key, window))
 
         return OneInputOperation(open_func, close_func, process_element_func, on_timer_func)
 
diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py b/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py
index b23b2b3110f..417aa156012 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py
@@ -65,7 +65,7 @@ class InternalKeyedProcessFunctionContext(KeyedProcessFunction.Context,
     def __init__(self, context, key_type_info):
         self._context = context
         self._timer_service = TimerServiceImpl(self._context.timerService())
-        self._key_converter = from_type_info(key_type_info)
+        self._key_converter = from_type_info_proto(key_type_info)
 
     def get_current_key(self):
         return self._key_converter.to_internal(self._context.getCurrentKey())
@@ -85,7 +85,7 @@ class InternalKeyedProcessFunctionOnTimerContext(KeyedProcessFunction.OnTimerCon
     def __init__(self, context, key_type_info):
         self._context = context
         self._timer_service = TimerServiceImpl(self._context.timerService())
-        self._key_converter = from_type_info(key_type_info)
+        self._key_converter = from_type_info_proto(key_type_info)
 
     def timer_service(self) -> TimerService:
         return self._timer_service
@@ -103,7 +103,7 @@ class InternalKeyedProcessFunctionOnTimerContext(KeyedProcessFunction.OnTimerCon
 class InternalWindowTimerContext(object):
     def __init__(self, context, key_type_info, window_converter):
         self._context = context
-        self._key_converter = from_type_info(key_type_info)
+        self._key_converter = from_type_info_proto(key_type_info)
         self._window_converter = window_converter
 
     def timestamp(self) -> int:
diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/side_output_context.py b/flink-python/pyflink/fn_execution/datastream/embedded/side_output_context.py
new file mode 100644
index 00000000000..3580436ed56
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/side_output_context.py
@@ -0,0 +1,46 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict
+
+from pyflink.fn_execution.embedded.converters import from_type_info_proto, DataConverter
+
+
+class SideOutputContext(object):
+    def __init__(self, context):
+        self._context = context
+        self._side_output_converters = (
+            {tag_id: from_type_info_proto(_parse_type_info_proto(payload))
+             for tag_id, payload in
+             context.getAllSideOutputTypeInfoPayloads().items()})  # type: Dict[str, DataConverter]
+
+    def collect(self, tag_id: str, record):
+        try:
+            self._context.collectSideOutputById(
+                tag_id,
+                self._side_output_converters[tag_id].to_external(record))
+        except KeyError:
+            raise Exception("Unknown OutputTag id {0}, supported OutputTag ids are {1}".format(
+                tag_id, list(self._side_output_converters.keys())))
+
+
+def _parse_type_info_proto(type_info_payload):
+    from pyflink.fn_execution import flink_fn_execution_pb2
+
+    type_info = flink_fn_execution_pb2.TypeInfo()
+    type_info.ParseFromString(type_info_payload)
+    return type_info
diff --git a/flink-python/pyflink/fn_execution/embedded/operation_utils.py b/flink-python/pyflink/fn_execution/embedded/operation_utils.py
index 0dadbb359b9..2f6218131f0 100644
--- a/flink-python/pyflink/fn_execution/embedded/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/embedded/operation_utils.py
@@ -103,7 +103,7 @@ def create_table_operation_from_proto(proto, input_coder_info, output_coder_into
 
 def create_one_input_user_defined_data_stream_function_from_protos(
         function_infos, input_coder_info, output_coder_info, runtime_context,
-        function_context, timer_context, job_parameters, keyed_state_backend,
+        function_context, timer_context, side_output_context, job_parameters, keyed_state_backend,
         operator_state_backend):
     serialized_fns = [pare_user_defined_data_stream_function_proto(proto)
                       for proto in function_infos]
@@ -119,6 +119,7 @@ def create_one_input_user_defined_data_stream_function_from_protos(
         runtime_context,
         function_context,
         timer_context,
+        side_output_context,
         job_parameters,
         keyed_state_backend,
         operator_state_backend)
@@ -128,7 +129,7 @@ def create_one_input_user_defined_data_stream_function_from_protos(
 
 def create_two_input_user_defined_data_stream_function_from_protos(
         function_infos, input_coder_info1, input_coder_info2, output_coder_info, runtime_context,
-        function_context, timer_context, job_parameters, keyed_state_backend,
+        function_context, timer_context, side_output_context, job_parameters, keyed_state_backend,
         operator_state_backend):
     serialized_fns = [pare_user_defined_data_stream_function_proto(proto)
                       for proto in function_infos]
@@ -150,6 +151,7 @@ def create_two_input_user_defined_data_stream_function_from_protos(
         runtime_context,
         function_context,
         timer_context,
+        side_output_context,
         job_parameters,
         keyed_state_backend,
         operator_state_backend)
diff --git a/flink-python/pyflink/fn_execution/embedded/operations.py b/flink-python/pyflink/fn_execution/embedded/operations.py
index be7771f8aca..d4a251db973 100644
--- a/flink-python/pyflink/fn_execution/embedded/operations.py
+++ b/flink-python/pyflink/fn_execution/embedded/operations.py
@@ -66,6 +66,7 @@ class OneInputFunctionOperation(FunctionOperation):
                  runtime_context,
                  function_context,
                  timer_context,
+                 side_output_context,
                  job_parameters,
                  keyed_state_backend,
                  operator_state_backend):
@@ -75,6 +76,7 @@ class OneInputFunctionOperation(FunctionOperation):
                 runtime_context,
                 function_context,
                 timer_context,
+                side_output_context,
                 job_parameters,
                 keyed_state_backend,
                 operator_state_backend)
@@ -100,6 +102,7 @@ class TwoInputFunctionOperation(FunctionOperation):
                  runtime_context,
                  function_context,
                  timer_context,
+                 side_output_context,
                  job_parameters,
                  keyed_state_backend,
                  operator_state_backend):
@@ -109,6 +112,7 @@ class TwoInputFunctionOperation(FunctionOperation):
                 runtime_context,
                 function_context,
                 timer_context,
+                side_output_context,
                 job_parameters,
                 keyed_state_backend,
                 operator_state_backend)
diff --git a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
index a4e99e9a39b..f8f4a35abbe 100644
--- a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
+++ b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionI
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator;
@@ -268,19 +267,22 @@ public class PythonOperatorChainingOptimizer {
     @SuppressWarnings("unchecked")
     private static Transformation<?> createChainedTransformation(
             Transformation<?> upTransform, Transformation<?> downTransform) {
-        StreamOperator<?> upOperator =
-                ((SimpleOperatorFactory<?>) getOperatorFactory(upTransform)).getOperator();
+        DataStreamPythonFunctionOperator<?> upOperator =
+                (DataStreamPythonFunctionOperator<?>)
+                        ((SimpleOperatorFactory<?>) getOperatorFactory(upTransform)).getOperator();
 
-        StreamOperator<?> downOperator =
-                ((SimpleOperatorFactory<?>) getOperatorFactory(downTransform)).getOperator();
+        DataStreamPythonFunctionOperator<?> downOperator =
+                (DataStreamPythonFunctionOperator<?>)
+                        ((SimpleOperatorFactory<?>) getOperatorFactory(downTransform))
+                                .getOperator();
 
         assert arePythonOperatorsInSameExecutionEnvironment(upOperator, downOperator);
 
         final DataStreamPythonFunctionInfo upPythonFunctionInfo =
-                ((DataStreamPythonFunctionOperator) upOperator).getPythonFunctionInfo().copy();
+                upOperator.getPythonFunctionInfo().copy();
 
         final DataStreamPythonFunctionInfo downPythonFunctionInfo =
-                ((DataStreamPythonFunctionOperator) downOperator).getPythonFunctionInfo().copy();
+                downOperator.getPythonFunctionInfo().copy();
 
         DataStreamPythonFunctionInfo headPythonFunctionInfoOfDownOperator = downPythonFunctionInfo;
         while (headPythonFunctionInfoOfDownOperator.getInputs().length != 0) {
@@ -292,17 +294,10 @@ public class PythonOperatorChainingOptimizer {
                 new DataStreamPythonFunctionInfo[] {upPythonFunctionInfo});
 
         final DataStreamPythonFunctionOperator chainedOperator =
-                ((DataStreamPythonFunctionOperator) upOperator)
-                        .copy(
-                                downPythonFunctionInfo,
-                                ((DataStreamPythonFunctionOperator) downOperator)
-                                        .getProducedType());
-        if (chainedOperator instanceof AbstractExternalDataStreamPythonFunctionOperator) {
-            ((AbstractExternalDataStreamPythonFunctionOperator) chainedOperator)
-                    .addSideOutputTags(
-                            ((AbstractExternalDataStreamPythonFunctionOperator) downOperator)
-                                    .getSideOutputTags());
-        }
+                upOperator.copy(
+                        downPythonFunctionInfo,
+                        ((DataStreamPythonFunctionOperator) downOperator).getProducedType());
+        chainedOperator.addSideOutputTags(downOperator.getSideOutputTags());
 
         PhysicalTransformation<?> chainedTransformation;
         if (upOperator instanceof OneInputStreamOperator) {
@@ -411,11 +406,14 @@ public class PythonOperatorChainingOptimizer {
             return false;
         }
 
-        StreamOperator<?> upOperator =
-                ((SimpleOperatorFactory<?>) getOperatorFactory(upTransform)).getOperator();
+        DataStreamPythonFunctionOperator<?> upOperator =
+                (DataStreamPythonFunctionOperator<?>)
+                        ((SimpleOperatorFactory<?>) getOperatorFactory(upTransform)).getOperator();
 
-        StreamOperator<?> downOperator =
-                ((SimpleOperatorFactory<?>) getOperatorFactory(downTransform)).getOperator();
+        DataStreamPythonFunctionOperator<?> downOperator =
+                (DataStreamPythonFunctionOperator<?>)
+                        ((SimpleOperatorFactory<?>) getOperatorFactory(downTransform))
+                                .getOperator();
 
         if (!arePythonOperatorsInSameExecutionEnvironment(upOperator, downOperator)) {
             return false;
@@ -435,7 +433,8 @@ public class PythonOperatorChainingOptimizer {
     }
 
     private static boolean arePythonOperatorsInSameExecutionEnvironment(
-            StreamOperator<?> upOperator, StreamOperator<?> downOperator) {
+            DataStreamPythonFunctionOperator<?> upOperator,
+            DataStreamPythonFunctionOperator<?> downOperator) {
         return upOperator instanceof AbstractExternalDataStreamPythonFunctionOperator
                         && downOperator instanceof AbstractExternalDataStreamPythonFunctionOperator
                 || upOperator instanceof AbstractEmbeddedDataStreamPythonFunctionOperator
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index e94b2c69e33..733b874859c 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
 import org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator;
-import org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator;
 import org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator;
 import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -60,6 +59,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -150,11 +150,12 @@ public class PythonConfigUtil {
                 final Transformation<?> upTransform =
                         Iterables.getOnlyElement(sideTransform.getInputs());
                 if (PythonConfigUtil.isPythonDataStreamOperator(upTransform)) {
-                    final AbstractExternalDataStreamPythonFunctionOperator<?> upOperator =
-                            (AbstractExternalDataStreamPythonFunctionOperator<?>)
+                    final DataStreamPythonFunctionOperator<?> upOperator =
+                            (DataStreamPythonFunctionOperator<?>)
                                     ((SimpleOperatorFactory<?>) getOperatorFactory(upTransform))
                                             .getOperator();
-                    upOperator.addSideOutputTag(sideTransform.getOutputTag());
+                    upOperator.addSideOutputTags(
+                            Collections.singletonList(sideTransform.getOutputTag()));
                 }
             }
 
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/DataStreamPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/DataStreamPythonFunctionOperator.java
index 4b68f111f24..0afe77c5288 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/DataStreamPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/DataStreamPythonFunctionOperator.java
@@ -22,6 +22,9 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.util.OutputTag;
+
+import java.util.Collection;
 
 /** Interface for Python DataStream operators. */
 @Internal
@@ -36,6 +39,12 @@ public interface DataStreamPythonFunctionOperator<OUT> extends ResultTypeQueryab
     /** Returns the underlying {@link DataStreamPythonFunctionInfo}. */
     DataStreamPythonFunctionInfo getPythonFunctionInfo();
 
+    /** Add a collection of {@link OutputTag}s to the operator. */
+    void addSideOutputTags(Collection<OutputTag<?>> outputTags);
+
+    /** Gets the {@link OutputTag}s belongs to the operator. */
+    Collection<OutputTag<?>> getSideOutputTags();
+
     /**
      * Make a copy of the DataStreamPythonFunctionOperator with the given pythonFunctionInfo and
      * outputTypeInfo. This is used for chaining optimization which may need to update the
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedDataStreamPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedDataStreamPythonFunctionOperator.java
index 75ed3e89718..356f4c4e212 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedDataStreamPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedDataStreamPythonFunctionOperator.java
@@ -21,13 +21,17 @@ package org.apache.flink.streaming.api.operators.python.embedded;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator;
 import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -46,6 +50,8 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
     /** The serialized python function to be executed. */
     private final DataStreamPythonFunctionInfo pythonFunctionInfo;
 
+    private final Map<String, OutputTag<?>> sideOutputTags;
+
     /** The TypeInformation of output data. */
     protected final TypeInformation<OUT> outputTypeInfo;
 
@@ -56,6 +62,10 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
 
     protected transient TimestampedCollector<OUT> collector;
 
+    protected transient boolean hasSideOutput;
+
+    protected transient SideOutputContext sideOutputContext;
+
     public AbstractEmbeddedDataStreamPythonFunctionOperator(
             Configuration config,
             DataStreamPythonFunctionInfo pythonFunctionInfo,
@@ -63,10 +73,18 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
         super(config);
         this.pythonFunctionInfo = Preconditions.checkNotNull(pythonFunctionInfo);
         this.outputTypeInfo = Preconditions.checkNotNull(outputTypeInfo);
+        this.sideOutputTags = new HashMap<>();
     }
 
     @Override
     public void open() throws Exception {
+        hasSideOutput = !sideOutputTags.isEmpty();
+
+        if (hasSideOutput) {
+            sideOutputContext = new SideOutputContext();
+            sideOutputContext.open();
+        }
+
         super.open();
 
         outputDataConverter =
@@ -90,6 +108,18 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
         return pythonFunctionInfo;
     }
 
+    @Override
+    public void addSideOutputTags(Collection<OutputTag<?>> outputTags) {
+        for (OutputTag<?> outputTag : outputTags) {
+            sideOutputTags.put(outputTag.getId(), outputTag);
+        }
+    }
+
+    @Override
+    public Collection<OutputTag<?>> getSideOutputTags() {
+        return sideOutputTags.values();
+    }
+
     public Map<String, String> getJobParameters() {
         Map<String, String> jobParameters = new HashMap<>();
         if (numPartitions != null) {
@@ -104,4 +134,45 @@ public abstract class AbstractEmbeddedDataStreamPythonFunctionOperator<OUT>
         }
         return jobParameters;
     }
+
+    private class SideOutputContext {
+
+        private Map<String, byte[]> sideOutputTypeInfoPayloads = new HashMap<>();
+
+        private Map<String, PythonTypeUtils.DataConverter<Object, Object>>
+                sideOutputDataConverters = new HashMap<>();
+
+        @SuppressWarnings("unchecked")
+        public void open() {
+            for (Map.Entry<String, OutputTag<?>> entry : sideOutputTags.entrySet()) {
+                sideOutputTypeInfoPayloads.put(
+                        entry.getKey(), getSideOutputTypeInfoPayload(entry.getValue()));
+
+                sideOutputDataConverters.put(
+                        entry.getKey(),
+                        PythonTypeUtils.TypeInfoToDataConverter.typeInfoDataConverter(
+                                (TypeInformation) entry.getValue().getTypeInfo()));
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        public void collectSideOutputById(String id, Object record) {
+            OutputTag<?> outputTag = sideOutputTags.get(id);
+            PythonTypeUtils.DataConverter<Object, Object> sideOutputDataConverter =
+                    sideOutputDataConverters.get(id);
+            collector.collect(
+                    outputTag, new StreamRecord(sideOutputDataConverter.toInternal(record)));
+        }
+
+        public Map<String, byte[]> getAllSideOutputTypeInfoPayloads() {
+            return sideOutputTypeInfoPayloads;
+        }
+
+        private byte[] getSideOutputTypeInfoPayload(OutputTag<?> outputTag) {
+            FlinkFnApi.TypeInfo outputTypeInfo =
+                    PythonTypeUtils.TypeInfoToProtoConverter.toTypeInfoProto(
+                            outputTag.getTypeInfo(), getRuntimeContext().getUserCodeClassLoader());
+            return outputTypeInfo.toByteArray();
+        }
+    }
 }
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
index b02f3bfcb32..6b06d2e5ebf 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
@@ -117,6 +117,7 @@ public abstract class AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT>
         interpreter.set("job_parameters", getJobParameters());
         interpreter.set("keyed_state_backend", getKeyedStateBackend());
         interpreter.set("operator_state_backend", getOperatorStateBackend());
+        interpreter.set("side_output_context", sideOutputContext);
 
         interpreter.exec(
                 "from pyflink.fn_execution.embedded.operation_utils import create_one_input_user_defined_data_stream_function_from_protos");
@@ -129,6 +130,7 @@ public abstract class AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT>
                         + "runtime_context,"
                         + "function_context,"
                         + "timer_context,"
+                        + "side_output_context,"
                         + "job_parameters,"
                         + "keyed_state_backend,"
                         + "operator_state_backend)");
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
index b0767a309fe..6de7e6b8213 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
@@ -135,6 +135,7 @@ public abstract class AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O
         interpreter.set("runtime_context", getRuntimeContext());
         interpreter.set("function_context", getFunctionContext());
         interpreter.set("timer_context", getTimerContext());
+        interpreter.set("side_output_context", sideOutputContext);
         interpreter.set("keyed_state_backend", getKeyedStateBackend());
         interpreter.set("job_parameters", getJobParameters());
         interpreter.set("operator_state_backend", getOperatorStateBackend());
@@ -151,6 +152,7 @@ public abstract class AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O
                         + "runtime_context,"
                         + "function_context,"
                         + "timer_context,"
+                        + "side_output_context,"
                         + "job_parameters,"
                         + "keyed_state_backend,"
                         + "operator_state_backend)");
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonCoProcessOperator.java
index c6b0d78116c..840c751afe0 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonCoProcessOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonCoProcessOperator.java
@@ -77,7 +77,7 @@ public class EmbeddedPythonCoProcessOperator<IN1, IN2, OUT>
                 inBatchExecutionMode(getKeyedStateBackend()),
                 config.get(PYTHON_METRIC_ENABLED),
                 config.get(PYTHON_PROFILE_ENABLED),
-                false,
+                hasSideOutput,
                 config.get(STATE_CACHE_SIZE),
                 config.get(MAP_STATE_READ_CACHE_SIZE),
                 config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
index 65075c37ff9..edeee270208 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
@@ -105,7 +105,7 @@ public class EmbeddedPythonKeyedCoProcessOperator<K, IN1, IN2, OUT>
                 inBatchExecutionMode(getKeyedStateBackend()),
                 config.get(PYTHON_METRIC_ENABLED),
                 config.get(PYTHON_PROFILE_ENABLED),
-                false,
+                hasSideOutput,
                 config.get(STATE_CACHE_SIZE),
                 config.get(MAP_STATE_READ_CACHE_SIZE),
                 config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
index 76fdf58058a..d0db39d37cc 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
@@ -104,7 +104,7 @@ public class EmbeddedPythonKeyedProcessOperator<K, IN, OUT>
                 inBatchExecutionMode(getKeyedStateBackend()),
                 config.get(PYTHON_METRIC_ENABLED),
                 config.get(PYTHON_PROFILE_ENABLED),
-                false,
+                hasSideOutput,
                 config.get(STATE_CACHE_SIZE),
                 config.get(MAP_STATE_READ_CACHE_SIZE),
                 config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonProcessOperator.java
index efcceaddc29..acab1c2f531 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonProcessOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonProcessOperator.java
@@ -76,7 +76,7 @@ public class EmbeddedPythonProcessOperator<IN, OUT>
                 inBatchExecutionMode(getKeyedStateBackend()),
                 config.get(PYTHON_METRIC_ENABLED),
                 config.get(PYTHON_PROFILE_ENABLED),
-                false,
+                hasSideOutput,
                 config.get(STATE_CACHE_SIZE),
                 config.get(MAP_STATE_READ_CACHE_SIZE),
                 config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
index 94c68045205..4756d538333 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
@@ -105,7 +105,7 @@ public class EmbeddedPythonWindowOperator<K, IN, OUT, W extends Window>
                 inBatchExecutionMode(getKeyedStateBackend()),
                 config.get(PYTHON_METRIC_ENABLED),
                 config.get(PYTHON_PROFILE_ENABLED),
-                false,
+                hasSideOutput,
                 config.get(STATE_CACHE_SIZE),
                 config.get(MAP_STATE_READ_CACHE_SIZE),
                 config.get(MAP_STATE_WRITE_CACHE_SIZE));
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalDataStreamPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalDataStreamPythonFunctionOperator.java
index 31d1b8d5875..73712e895ee 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalDataStreamPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalDataStreamPythonFunctionOperator.java
@@ -117,17 +117,15 @@ public abstract class AbstractExternalDataStreamPythonFunctionOperator<OUT>
     // Side outputs
     // ----------------------------------------------------------------------
 
-    public void addSideOutputTag(OutputTag<?> outputTag) {
-        sideOutputTags.put(outputTag.getId(), outputTag);
-    }
-
     protected OutputTag<?> getOutputTagById(String id) {
         Preconditions.checkArgument(sideOutputTags.containsKey(id));
         return sideOutputTags.get(id);
     }
 
     public void addSideOutputTags(Collection<OutputTag<?>> outputTags) {
-        outputTags.forEach(this::addSideOutputTag);
+        for (OutputTag<?> outputTag : outputTags) {
+            sideOutputTags.put(outputTag.getId(), outputTag);
+        }
     }
 
     public Collection<OutputTag<?>> getSideOutputTags() {