You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/06/30 02:22:47 UTC

[flink] branch master updated: [FLINK-27586][python] Support non-keyed co-broadcast processing in PyFlink

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dfdf7afb047 [FLINK-27586][python] Support non-keyed co-broadcast processing in PyFlink
dfdf7afb047 is described below

commit dfdf7afb047d1b8af581883bf208e4d8a30a116f
Author: Juntao Hu <ma...@gmail.com>
AuthorDate: Fri May 20 18:40:11 2022 +0800

    [FLINK-27586][python] Support non-keyed co-broadcast processing in PyFlink
    
    This closes #19878.
---
 flink-python/dev/lint-python.sh                    |   4 +-
 flink-python/docs/pyflink.datastream.rst           |   2 +
 flink-python/pyflink/datastream/__init__.py        |  18 +-
 flink-python/pyflink/datastream/data_stream.py     | 275 ++++++++++++++++++---
 flink-python/pyflink/datastream/functions.py       | 145 ++++++++++-
 .../pyflink/datastream/tests/test_data_stream.py   |  51 +++-
 .../pyflink/fn_execution/beam/beam_operations.py   |  26 +-
 .../fn_execution/beam/beam_operations_fast.pxd     |   1 +
 .../fn_execution/beam/beam_operations_fast.pyx     |  24 +-
 .../fn_execution/beam/beam_operations_slow.py      |  34 +--
 .../pyflink/fn_execution/datastream/operations.py  |  55 ++++-
 .../fn_execution/datastream/process_function.py    |  59 ++++-
 .../pyflink/fn_execution/flink_fn_execution_pb2.py |  86 ++++---
 .../pyflink/fn_execution/internal_state.py         |  14 +-
 flink-python/pyflink/fn_execution/state_impl.py    |   9 +-
 .../pyflink/proto/flink-fn-execution.proto         |   1 +
 .../apache/flink/python/util/PythonConfigUtil.java |  42 +++-
 .../PythonBatchCoBroadcastProcessOperator.java     |  80 ++++++
 .../python/PythonBroadcastStateTransformation.java |  90 +++++++
 ...thonBroadcastStateTransformationTranslator.java |  87 +++++++
 20 files changed, 981 insertions(+), 122 deletions(-)

diff --git a/flink-python/dev/lint-python.sh b/flink-python/dev/lint-python.sh
index f3719faa477..4a39193f61f 100755
--- a/flink-python/dev/lint-python.sh
+++ b/flink-python/dev/lint-python.sh
@@ -304,7 +304,7 @@ function install_flake8() {
         fi
     fi
 
-    $CURRENT_DIR/install_command.sh -q flake8==3.7.9 2>&1 >/dev/null
+    $CURRENT_DIR/install_command.sh -q flake8==4.0.1 2>&1 >/dev/null
     if [ $? -ne 0 ]; then
         echo "pip install flake8 failed \
         please try to exec the script again.\
@@ -328,7 +328,7 @@ function install_sphinx() {
         fi
     fi
 
-    $CURRENT_DIR/install_command.sh -q Sphinx==2.4.4 Docutils==0.17.1 "Jinja2<3.1.0" 2>&1 >/dev/null
+    $CURRENT_DIR/install_command.sh -q Sphinx==3.5.4 Docutils==0.16 "Jinja2<3.1.0" 2>&1 >/dev/null
     if [ $? -ne 0 ]; then
         echo "pip install sphinx failed \
         please try to exec the script again.\
diff --git a/flink-python/docs/pyflink.datastream.rst b/flink-python/docs/pyflink.datastream.rst
index ee5126e630e..e9781265f03 100644
--- a/flink-python/docs/pyflink.datastream.rst
+++ b/flink-python/docs/pyflink.datastream.rst
@@ -26,12 +26,14 @@ Module contents
     :members:
     :undoc-members:
     :show-inheritance:
+    :inherited-members:
 
 pyflink.datastream.state module
 ------------------------------------
 .. automodule:: pyflink.datastream.state
     :members:
     :undoc-members:
+    :inherited-members:
 
 pyflink.datastream.connectors module
 ------------------------------------
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 8e5ea1443f4..be54f5bc301 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -35,6 +35,11 @@ Entry point classes of Flink DataStream API:
       Represent two connected streams of (possibly) different data types. Connected
       streams are useful for cases where operations on one stream directly affect the operations on
       the other stream, usually via shared state between the streams.
+    - :class:`BroadcastStream`:
+      Represent a stream with :class:`state.BroadcastState` (s).
+    - :class:`BroadcastConnectedStream`:
+      Represents the result of connecting a keyed or non-keyed stream, with a
+      :class:`BroadcastStream` with :class:`state.BroadcastState` (s)
 
 Functions used to transform a :class:`DataStream` into another :class:`DataStream`:
 
@@ -70,6 +75,10 @@ Functions used to transform a :class:`DataStream` into another :class:`DataStrea
       information such as the current timestamp, the watermark, etc.
     - :class:`AggregateFunction`:
       Base class for a user-defined aggregate function.
+    - :class:`BroadcastProcessFunction`:
+      A function to be applied to a :class:`BroadcastConnectedStream` that connects
+      :class:`BroadcastStream`, i.e. a stream with broadcast state, with a non-keyed
+      :class:`DataStream`.
     - :class:`RuntimeContext`:
       Contains information about the context in which functions are executed. Each
       parallel instance of the function will have a context through which it can access static
@@ -145,7 +154,7 @@ Classes for state operations:
       A type of state that can be created to store the state of a :class:`BroadcastStream`. This
       state assumes that the same elements are sent to all instances of an operator.
     - :class:`state.ReadOnlyBroadcastState`:
-      A read-only view of the :class:`BroadcastState`.
+      A read-only view of the :class:`state.BroadcastState`.
     - :class:`state.StateTtlConfig`:
       Configuration of state TTL logic.
 
@@ -202,14 +211,14 @@ Other important classes:
 from pyflink.datastream.checkpoint_config import CheckpointConfig, ExternalizedCheckpointCleanup
 from pyflink.datastream.checkpointing_mode import CheckpointingMode
 from pyflink.datastream.data_stream import DataStream, KeyedStream, WindowedStream, \
-    ConnectedStreams, DataStreamSink
+    ConnectedStreams, DataStreamSink, BroadcastStream, BroadcastConnectedStream
 from pyflink.datastream.execution_mode import RuntimeExecutionMode
 from pyflink.datastream.functions import (MapFunction, CoMapFunction, FlatMapFunction,
                                           CoFlatMapFunction, ReduceFunction, RuntimeContext,
                                           KeySelector, FilterFunction, Partitioner, SourceFunction,
                                           SinkFunction, CoProcessFunction, KeyedProcessFunction,
                                           KeyedCoProcessFunction, AggregateFunction, WindowFunction,
-                                          ProcessWindowFunction)
+                                          ProcessWindowFunction, BroadcastProcessFunction)
 from pyflink.datastream.slot_sharing_group import SlotSharingGroup, MemorySize
 from pyflink.datastream.state_backend import (StateBackend, MemoryStateBackend, FsStateBackend,
                                               RocksDBStateBackend, CustomStateBackend,
@@ -233,6 +242,8 @@ __all__ = [
     'KeyedStream',
     'WindowedStream',
     'ConnectedStreams',
+    'BroadcastStream',
+    'BroadcastConnectedStream',
     'DataStreamSink',
     'MapFunction',
     'CoMapFunction',
@@ -247,6 +258,7 @@ __all__ = [
     'WindowFunction',
     'ProcessWindowFunction',
     'AggregateFunction',
+    'BroadcastProcessFunction',
     'RuntimeContext',
     'TimerService',
     'CheckpointingMode',
diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py
index 35765e54d1a..b0ffa8fd8e5 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -18,7 +18,8 @@
 import typing
 import uuid
 from enum import Enum
-from typing import Callable, Union, List, cast, Optional
+from py4j.java_collections import ListConverter
+from typing import Callable, Union, List, cast, Optional, overload
 
 from pyflink.common import typeinfo, ExecutionConfig, Row
 from pyflink.common.typeinfo import RowTypeInfo, Types, TypeInformation, _from_java_type
@@ -39,11 +40,12 @@ from pyflink.datastream.functions import (_get_python_env, FlatMapFunction, MapF
                                           NullByteKeySelector, AllWindowFunction,
                                           InternalIterableAllWindowFunction,
                                           ProcessAllWindowFunction,
-                                          InternalIterableProcessAllWindowFunction)
+                                          InternalIterableProcessAllWindowFunction,
+                                          BroadcastProcessFunction)
 from pyflink.datastream.output_tag import OutputTag
 from pyflink.datastream.slot_sharing_group import SlotSharingGroup
 from pyflink.datastream.state import ValueStateDescriptor, ValueState, ListStateDescriptor, \
-    StateDescriptor, ReducingStateDescriptor, AggregatingStateDescriptor
+    StateDescriptor, ReducingStateDescriptor, AggregatingStateDescriptor, MapStateDescriptor
 from pyflink.datastream.utils import convert_to_python_obj
 from pyflink.datastream.window import (CountTumblingWindowAssigner, CountSlidingWindowAssigner,
                                        CountWindowSerializer, TimeWindowSerializer, Trigger,
@@ -52,7 +54,7 @@ from pyflink.datastream.window import (CountTumblingWindowAssigner, CountSliding
 from pyflink.java_gateway import get_gateway
 
 __all__ = ['CloseableIterator', 'DataStream', 'KeyedStream', 'ConnectedStreams', 'WindowedStream',
-           'DataStreamSink', 'CloseableIterator']
+           'DataStreamSink', 'CloseableIterator', 'BroadcastStream', 'BroadcastConnectedStream']
 
 WINDOW_STATE_NAME = 'window-contents'
 
@@ -484,15 +486,36 @@ class DataStream(object):
         j_united_stream = self._j_data_stream.union(j_data_stream_arr)
         return DataStream(j_data_stream=j_united_stream)
 
+    @overload
     def connect(self, ds: 'DataStream') -> 'ConnectedStreams':
+        pass
+
+    @overload
+    def connect(self, ds: 'BroadcastStream') -> 'BroadcastConnectedStream':
+        pass
+
+    def connect(self, ds: Union['DataStream', 'BroadcastStream']) \
+            -> Union['ConnectedStreams', 'BroadcastConnectedStream']:
         """
-        Creates a new 'ConnectedStreams' by connecting 'DataStream' outputs of (possible)
-        different types with each other. The DataStreams connected using this operator can
-        be used with CoFunctions to apply joint transformations.
+        If ds is a :class:`DataStream`, creates a new :class:`ConnectedStreams` by connecting
+        DataStream outputs of (possible) different types with each other. The DataStreams connected
+        using this operator can be used with CoFunctions to apply joint transformations.
 
-        :param ds: The DataStream with which this stream will be connected.
-        :return: The `ConnectedStreams`.
+        If ds is a :class:`BroadcastStream`, creates a new :class:`BroadcastConnectedStream` by
+        connecting the current :class:`DataStream` with a :class:`BroadcastStream`. The latter can
+        be created using the :meth:`broadcast` method. The resulting stream can be further processed
+        using the :meth:`BroadcastConnectedStream.process` method.
+
+        :param ds: The DataStream or BroadcastStream with which this stream will be connected.
+        :return: The ConnectedStreams or BroadcastConnectedStream.
+
+        .. versionchanged:: 1.16.0
+            Support connect BroadcastStream
         """
+        if isinstance(ds, BroadcastStream):
+            return BroadcastConnectedStream(
+                self, ds, cast(BroadcastStream, ds).broadcast_state_descriptors
+            )
         return ConnectedStreams(self, ds)
 
     def shuffle(self) -> 'DataStream':
@@ -561,13 +584,52 @@ class DataStream(object):
         """
         return DataStream(self._j_data_stream.forward())
 
+    @overload
     def broadcast(self) -> 'DataStream':
+        pass
+
+    @overload
+    def broadcast(self, broadcast_state_descriptor: MapStateDescriptor,
+                  *other_broadcast_state_descriptors: MapStateDescriptor) -> 'BroadcastStream':
+        pass
+
+    def broadcast(self, broadcast_state_descriptor: Optional[MapStateDescriptor] = None,
+                  *other_broadcast_state_descriptors: MapStateDescriptor) \
+            -> Union['DataStream', 'BroadcastStream']:
         """
         Sets the partitioning of the DataStream so that the output elements are broadcasted to every
         parallel instance of the next operation.
 
-        :return: The DataStream with broadcast partitioning set.
-        """
+        If :class:`~state.MapStateDescriptor` s are passed in, it returns a
+        :class:`BroadcastStream` with :class:`~state.BroadcastState` s implicitly created as the
+        descriptors specified.
+
+        Example:
+        ::
+
+            >>> map_state_desc1 = MapStateDescriptor("state1", Types.INT(), Types.INT())
+            >>> map_state_desc2 = MapStateDescriptor("state2", Types.INT(), Types.STRING())
+            >>> broadcast_stream = ds1.broadcast(map_state_desc1, map_state_desc2)
+            >>> broadcast_connected_stream = ds2.connect(broadcast_stream)
+
+        :param broadcast_state_descriptor: the first MapStateDescriptor describing BroadcastState.
+        :param other_broadcast_state_descriptors: the rest of MapStateDescriptors describing
+            BroadcastStates, if any.
+        :return: The DataStream with broadcast partitioning set or a BroadcastStream which can be
+            used in :meth:`connect` to create a BroadcastConnectedStream for further processing of
+            the elements.
+
+        .. versionchanged:: 1.16.0
+            Support return BroadcastStream
+        """
+        if broadcast_state_descriptor is not None:
+            args = [broadcast_state_descriptor]
+            args.extend(other_broadcast_state_descriptors)
+            for arg in args:
+                if not isinstance(arg, MapStateDescriptor):
+                    raise TypeError("broadcast_state_descriptor must be MapStateDescriptor")
+            broadcast_state_descriptors = [arg for arg in args]  # type: List[MapStateDescriptor]
+            return BroadcastStream(cast(DataStream, self.broadcast()), broadcast_state_descriptors)
         return DataStream(self._j_data_stream.broadcast())
 
     def process(self, func: ProcessFunction, output_type: TypeInformation = None) -> 'DataStream':
@@ -1571,6 +1633,28 @@ class KeyedStream(DataStream):
     def union(self, *streams) -> 'DataStream':
         return self._values().union(*streams)
 
+    @overload
+    def connect(self, ds: 'DataStream') -> 'ConnectedStreams':
+        pass
+
+    @overload
+    def connect(self, ds: 'BroadcastStream') -> 'BroadcastConnectedStream':
+        pass
+
+    def connect(self, ds: Union['DataStream', 'BroadcastStream']) \
+            -> Union['ConnectedStreams', 'BroadcastConnectedStream']:
+        """
+        If ds is a :class:`DataStream`, creates a new :class:`ConnectedStreams` by connecting
+        DataStream outputs of (possible) different types with each other. The DataStreams connected
+        using this operator can be used with CoFunctions to apply joint transformations.
+
+        Currently, connect(BroadcastStream) is not supported.
+
+        :param ds: The DataStream with which this stream will be connected.
+        :return: The ConnectedStreams.
+        """
+        return super().connect(ds)
+
     def shuffle(self) -> 'DataStream':
         raise Exception('Cannot override partitioning for KeyedStream.')
 
@@ -1586,7 +1670,10 @@ class KeyedStream(DataStream):
     def forward(self) -> 'DataStream':
         raise Exception('Cannot override partitioning for KeyedStream.')
 
-    def broadcast(self) -> 'DataStream':
+    def broadcast(self, *args):
+        """
+        Not supported, partitioning for KeyedStream cannot be overridden.
+        """
         raise Exception('Cannot override partitioning for KeyedStream.')
 
     def partition_custom(self, partitioner: Union[Callable, Partitioner],
@@ -2277,6 +2364,130 @@ class ConnectedStreams(object):
         return isinstance(self.stream1, KeyedStream) and isinstance(self.stream2, KeyedStream)
 
 
+class BroadcastStream(object):
+    """
+    A BroadcastStream is a stream with :class:`state.BroadcastState` (s). This can be created by any
+    stream using the :meth:`DataStream.broadcast` method and implicitly creates states where the
+    user can store elements of the created :class:`BroadcastStream`. (see
+    :class:`BroadcastConnectedStream`).
+    Note that no further operation can be applied to these streams. The only available option is
+    to connect them with a keyed or non-keyed stream, using the :meth:`KeyedStream.connect` and the
+    :meth:`DataStream.connect` respectively. Applying these methods will result it a
+    :class:`BroadcastConnectedStream` for further processing.
+    """
+
+    def __init__(
+        self,
+        input_stream: "DataStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.input_stream = input_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+
+class BroadcastConnectedStream(object):
+    """
+    A BroadcastConnectedStream represents the result of connecting a keyed or non-keyed stream, with
+    a :class:`BroadcastStream` with :class:`~state.BroadcastState` (s). As in the case of
+    :class:`ConnectedStreams` these streams are useful for cases where operations on one stream
+    directly affect the operations on the other stream, usually via shared state between the
+    streams.
+    An example for the use of such connected streams would be to apply rules that change over time
+    onto another, possibly keyed stream. The stream with the broadcast state has the rules, and will
+    store them in the broadcast state, while the other stream will contain the elements to apply the
+    rules to. By broadcasting the rules, these will be available in all parallel instances, and can
+    be applied to all partitions of the other stream.
+    """
+
+    def __init__(
+        self,
+        non_broadcast_stream: "DataStream",
+        broadcast_stream: "BroadcastStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.non_broadcast_stream = non_broadcast_stream
+        self.broadcast_stream = broadcast_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+    def process(
+        self,
+        func: BroadcastProcessFunction,
+        output_type: TypeInformation = None,
+    ) -> "DataStream":
+        """
+        Assumes as inputs a :class:`BroadcastStream` and a :class:`DataStream` or
+        :class:`KeyedStream` and applies the given :class:`BroadcastProcessFunction` or on them,
+        thereby creating a transformed output stream.
+
+        :param func: The :class:`BroadcastProcessFunction` that is called for each element in the
+            stream.
+        :param output_type: The type of the output elements, should be
+            :class:`common.TypeInformation` or list (implicit :class:`RowTypeInfo`) or None (
+            implicit :meth:`Types.PICKLED_BYTE_ARRAY`).
+        :return: The transformed :class:`DataStream`.
+        """
+        if isinstance(func, BroadcastProcessFunction) and self._is_keyed_stream():
+            raise TypeError("BroadcastProcessFunction should be applied to non-keyed DataStream")
+
+        j_input_transformation1 = self.non_broadcast_stream._j_data_stream.getTransformation()
+        j_input_transformation2 = (
+            self.broadcast_stream.input_stream._j_data_stream.getTransformation()
+        )
+
+        if output_type is None:
+            output_type_info = Types.PICKLED_BYTE_ARRAY()  # type: TypeInformation
+        elif isinstance(output_type, list):
+            output_type_info = RowTypeInfo(output_type)
+        elif isinstance(output_type, TypeInformation):
+            output_type_info = output_type
+        else:
+            raise TypeError("output_type must be None, list or TypeInformation")
+        j_output_type = output_type_info.get_java_type_info()
+
+        from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
+
+        func_type = UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS  # type: ignore
+        func_name = "Co-Process-Broadcast"
+
+        gateway = get_gateway()
+        JPythonBroadcastStateTransformation = (
+            gateway.jvm.org.apache.flink.streaming.api.transformations.python
+        ).PythonBroadcastStateTransformation
+        j_state_names = ListConverter().convert(
+            [i.get_name() for i in self.broadcast_state_descriptors], gateway._gateway_client
+        )
+        j_state_descriptors = (
+            JPythonBroadcastStateTransformation.convertStateNamesToStateDescriptors(j_state_names)
+        )
+
+        j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
+        j_data_stream_python_function_info = _create_j_data_stream_python_function_info(
+            func, func_type
+        )
+        j_env = (
+            self.non_broadcast_stream.get_execution_environment()._j_stream_execution_environment
+        )
+
+        j_transformation = JPythonBroadcastStateTransformation(
+            func_name,
+            j_conf,
+            j_data_stream_python_function_info,
+            j_input_transformation1,
+            j_input_transformation2,
+            j_state_descriptors,
+            j_output_type,
+            j_env.getParallelism(),
+        )
+        j_env.addOperator(j_transformation)
+        JPythonConfigUtil = gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
+        j_data_stream = JPythonConfigUtil.createSingleOutputStreamOperator(j_env, j_transformation)
+
+        return DataStream(j_data_stream)
+
+    def _is_keyed_stream(self):
+        return isinstance(self.non_broadcast_stream, KeyedStream)
+
+
 def _get_one_input_stream_operator(data_stream: DataStream,
                                    func: Union[Function,
                                                FunctionWrapper,
@@ -2293,8 +2504,7 @@ def _get_one_input_stream_operator(data_stream: DataStream,
     """
 
     gateway = get_gateway()
-    import cloudpickle
-    serialized_func = cloudpickle.dumps(func)
+
     j_input_types = data_stream._j_data_stream.getTransformation().getOutputType()
     if output_type is None:
         output_type_info = Types.PICKLED_BYTE_ARRAY()  # type: TypeInformation
@@ -2303,15 +2513,8 @@ def _get_one_input_stream_operator(data_stream: DataStream,
     else:
         output_type_info = output_type
 
+    j_data_stream_python_function_info = _create_j_data_stream_python_function_info(func, func_type)
     j_output_type_info = output_type_info.get_java_type_info()
-
-    j_data_stream_python_function = gateway.jvm.DataStreamPythonFunction(
-        bytearray(serialized_func),
-        _get_python_env())
-    j_data_stream_python_function_info = gateway.jvm.DataStreamPythonFunctionInfo(
-        j_data_stream_python_function,
-        func_type)
-
     j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
 
     from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
@@ -2365,10 +2568,7 @@ def _get_two_input_stream_operator(connected_streams: ConnectedStreams,
     :param type_info: the data type of the function output data.
     :return: A Java operator which is responsible for execution user defined python function.
     """
-
     gateway = get_gateway()
-    import cloudpickle
-    serialized_func = cloudpickle.dumps(func)
 
     j_input_types1 = connected_streams.stream1._j_data_stream.getTransformation().getOutputType()
     j_input_types2 = connected_streams.stream2._j_data_stream.getTransformation().getOutputType()
@@ -2380,14 +2580,9 @@ def _get_two_input_stream_operator(connected_streams: ConnectedStreams,
     else:
         output_type_info = type_info
 
+    j_data_stream_python_function_info = _create_j_data_stream_python_function_info(func, func_type)
     j_output_type_info = output_type_info.get_java_type_info()
-
-    j_data_stream_python_function = gateway.jvm.DataStreamPythonFunction(
-        bytearray(serialized_func),
-        _get_python_env())
-    j_data_stream_python_function_info = gateway.jvm.DataStreamPythonFunctionInfo(
-        j_data_stream_python_function,
-        func_type)
+    j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
 
     from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
     if func_type == UserDefinedDataStreamFunction.CO_PROCESS:  # type: ignore
@@ -2397,8 +2592,6 @@ def _get_two_input_stream_operator(connected_streams: ConnectedStreams,
     else:
         raise TypeError("Unsupported function type: %s" % func_type)
 
-    j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
-
     j_python_data_stream_function_operator = JTwoInputPythonFunctionOperator(
         j_conf,
         j_data_stream_python_function_info,
@@ -2409,6 +2602,20 @@ def _get_two_input_stream_operator(connected_streams: ConnectedStreams,
     return j_python_data_stream_function_operator, j_output_type_info
 
 
+def _create_j_data_stream_python_function_info(
+    func: Union[Function, FunctionWrapper, WindowOperationDescriptor], func_type: int
+) -> bytes:
+    gateway = get_gateway()
+
+    import cloudpickle
+    serialized_func = cloudpickle.dumps(func)
+
+    j_data_stream_python_function = gateway.jvm.DataStreamPythonFunction(
+        bytearray(serialized_func), _get_python_env()
+    )
+    return gateway.jvm.DataStreamPythonFunctionInfo(j_data_stream_python_function, func_type)
+
+
 class CloseableIterator(object):
     """
     Representing an Iterator that is also auto closeable.
diff --git a/flink-python/pyflink/datastream/functions.py b/flink-python/pyflink/datastream/functions.py
index 9ee869154ac..99ab7edbf84 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -22,7 +22,7 @@ from typing import Union, Any, Generic, TypeVar, Iterable
 
 from pyflink.datastream.state import ValueState, ValueStateDescriptor, ListStateDescriptor, \
     ListState, MapStateDescriptor, MapState, ReducingStateDescriptor, ReducingState, \
-    AggregatingStateDescriptor, AggregatingState
+    AggregatingStateDescriptor, AggregatingState, BroadcastState, ReadOnlyBroadcastState
 from pyflink.datastream.time_domain import TimeDomain
 from pyflink.datastream.timerservice import TimerService
 from pyflink.java_gateway import get_gateway
@@ -49,12 +49,16 @@ __all__ = [
     'WindowFunction',
     'AllWindowFunction',
     'ProcessWindowFunction',
-    'ProcessAllWindowFunction']
+    'ProcessAllWindowFunction',
+    'BroadcastProcessFunction',
+]
 
 
 W = TypeVar('W')
 W2 = TypeVar('W2')
 IN = TypeVar('IN')
+IN1 = TypeVar('IN1')
+IN2 = TypeVar('IN2')
 OUT = TypeVar('OUT')
 KEY = TypeVar('KEY')
 
@@ -1269,6 +1273,143 @@ class InternalIterableProcessWindowFunction(InternalWindowFunction[Iterable[IN],
         self._wrapped_function.clear(self._internal_context)
 
 
+class BaseBroadcastProcessFunction(Function):
+    """
+    The base class containing the functionality available to all broadcast process function. These
+    include the :class:`BroadcastProcessFunction`.
+    """
+
+    class BaseContext(ABC):
+        """
+        The base context available to all methods in a broadcast process function. This include
+        :class:`BroadcastProcessFunction`.
+        """
+
+        @abstractmethod
+        def timestamp(self) -> int:
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+            This might be None, for example if the time characteristic of your program is
+            set to :attr:`TimeCharacteristic.ProcessingTime`.
+            """
+            pass
+
+        @abstractmethod
+        def current_processing_time(self) -> int:
+            """Returns the current processing time."""
+            pass
+
+        @abstractmethod
+        def current_watermark(self) -> int:
+            """Returns the current watermark."""
+            pass
+
+    class Context(BaseContext):
+        """
+        A base :class`BaseContext` available to the broadcasted stream side of a
+        :class:`BroadcastConnectedStream`.
+        Apart from the basic functionality of a :class:`BaseContext`, this also allows to get and
+        update the elements stored in the :class:`BroadcastState`. In other words, it gives read/
+        write access to the broadcast state.
+        """
+
+        @abstractmethod
+        def get_broadcast_state(self, state_descriptor: MapStateDescriptor) -> BroadcastState:
+            """
+            Fetches the :class:`BroadcastState` with the specified name.
+            :param state_descriptor: the :class:`MapStateDescriptor` of the state to be fetched.
+            :return: The required :class:`BroadcastState`.
+            """
+            pass
+
+    class ReadOnlyContext(BaseContext):
+        """
+        A :class:`BaseContext` available to the non-broadcasted stream side of a
+        :class:`BroadcastConnectedStream`.
+        Apart from the basic functionality of a :class:`BaseContext`, this also allows to get
+        read-only access to the elements stored in the broadcast state.
+        """
+
+        @abstractmethod
+        def get_broadcast_state(
+            self, state_descriptor: MapStateDescriptor
+        ) -> ReadOnlyBroadcastState:
+            """
+            Fetches a read-only view of the broadcast state with the specified name.
+            :param state_descriptor: the class:`MapStateDescriptor` of the state to be fetched.
+            :return: The required read-only view of the broadcast state.
+            """
+            pass
+
+
+class BroadcastProcessFunction(BaseBroadcastProcessFunction, Generic[IN1, IN2, OUT]):
+    """
+    A function to be applied to a :class:`BroadcastConnectedStream` that connects
+    :class:`BroadcastStream`, i.e. a stream with broadcast state, with a non-keyed
+    :class:`DataStream`.
+
+    The stream with the broadcast state can be created using the :meth:`DataStream.broadcast`
+    method.
+
+    The user has to implement two methods:
+    * the :meth:`process_broadcast_element` which will be applied to each element in the broadcast
+    side
+    * the :meth:`process_element` which will be applied to the non-broadcasted/keyed side.
+
+    The :meth:`process_broadcast_element` takes as argument (among others) a context that allows it
+    to read/write to the broadcast state, while the :meth:`process_element` has read-only access to
+    the broadcast state.
+
+    .. versionadded:: 1.16.0
+    """
+
+    class Context(BaseBroadcastProcessFunction.Context, ABC):
+        """
+        A :class:`BaseBroadcastProcessFunction.Context` available to the broadcast side of a
+        :class:`BroadcastConnectedStream`.
+        """
+        pass
+
+    class ReadOnlyContext(BaseBroadcastProcessFunction.ReadOnlyContext, ABC):
+        """
+        A :class:`BaseBroadcastProcessFunction.Context` available to the non-keyed side of a
+        :class:`BroadcastConnectedStream` (if any).
+        """
+        pass
+
+    @abstractmethod
+    def process_element(self, value: IN1, ctx: ReadOnlyContext):
+        """
+        This method is called for each element in the (non-broadcast) :class:`DataStream`.
+        This function can output zero or more elements via :code:`yield` statement, query the
+        current processing/event time, and also query and update the local keyed state. Finally, it
+        has read-only access to the broadcast state. The context is only valid during the invocation
+        of this method, do not store it.
+
+        :param value: The stream element.
+        :param ctx: A :class:`ReadOnlyContext` that allows querying the timestamp of the element,
+            querying the current processing/event time and updating the broadcast state. The context
+            is only valid during the invocation of this method, do not store it.
+        """
+        pass
+
+    @abstractmethod
+    def process_broadcast_element(self, value: IN2, ctx: Context):
+        """
+        This method is called for each element in the :class:`BroadcastStream`.
+        This function can output zero or more elements via :code:`yield` statement, query the
+        current processing/event time, and also query and update the internal
+        :class:`BroadcastState`. These can be done through the provided :class:`Context`. The
+        context is only valid during the invocation of this method, do not store it.
+
+        :param value: The stream element.
+        :param ctx: A :class:`Context` that allows querying the timestamp of the element, querying
+            the current processing/event time and updating the broadcast state. The context is only
+            valid during the invocation of this method, do not store it.
+        """
+        pass
+
+
 class InternalIterableProcessAllWindowFunction(InternalWindowFunction[Iterable[IN], OUT, int, W]):
 
     def __init__(self, wrapped_function: ProcessAllWindowFunction):
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 05e25008d51..7f7f40d7d6e 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -19,6 +19,8 @@ import datetime
 import decimal
 import os
 import uuid
+from collections import defaultdict
+from typing import Tuple
 
 from pyflink.common import Row, Configuration
 from pyflink.common.time import Time
@@ -29,7 +31,8 @@ from pyflink.datastream.data_stream import DataStream
 from pyflink.datastream.functions import (AggregateFunction, CoMapFunction, CoFlatMapFunction,
                                           MapFunction, FilterFunction, FlatMapFunction,
                                           KeyedCoProcessFunction, KeyedProcessFunction, KeySelector,
-                                          ProcessFunction, ReduceFunction, CoProcessFunction)
+                                          ProcessFunction, ReduceFunction, CoProcessFunction,
+                                          BroadcastProcessFunction)
 from pyflink.datastream.output_tag import OutputTag
 from pyflink.datastream.state import (ValueStateDescriptor, ListStateDescriptor, MapStateDescriptor,
                                       ReducingStateDescriptor, ReducingState, AggregatingState,
@@ -939,6 +942,52 @@ class DataStreamTests(object):
         side_expected = ['1', '1', '2', '2', '3', '3', '4', '4']
         self.assert_equals_sorted(side_expected, side_sink.get_results())
 
+    def test_co_broadcast_process(self):
+        ds = self.env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())  # type: DataStream
+        ds_broadcast = self.env.from_collection(
+            [(0, "a"), (1, "b")], type_info=Types.TUPLE([Types.INT(), Types.STRING()])
+        )  # type: DataStream
+
+        class MyBroadcastProcessFunction(BroadcastProcessFunction):
+            def __init__(self, map_state_desc):
+                self._map_state_desc = map_state_desc
+                self._cache = defaultdict(list)
+
+            def process_element(self, value: int, ctx: BroadcastProcessFunction.ReadOnlyContext):
+                ro_broadcast_state = ctx.get_broadcast_state(self._map_state_desc)
+                key = value % 2
+                if ro_broadcast_state.contains(key):
+                    if self._cache.get(key) is not None:
+                        for v in self._cache[key]:
+                            yield ro_broadcast_state.get(key) + str(v)
+                        self._cache[key].clear()
+                    yield ro_broadcast_state.get(key) + str(value)
+                else:
+                    self._cache[key].append(value)
+
+            def process_broadcast_element(
+                self, value: Tuple[int, str], ctx: BroadcastProcessFunction.Context
+            ):
+                key = value[0]
+                yield str(key) + value[1]
+                broadcast_state = ctx.get_broadcast_state(self._map_state_desc)
+                broadcast_state.put(key, value[1])
+                if self._cache.get(key) is not None:
+                    for v in self._cache[key]:
+                        yield value[1] + str(v)
+                    self._cache[key].clear()
+
+        map_state_desc = MapStateDescriptor(
+            "mapping", key_type_info=Types.INT(), value_type_info=Types.STRING()
+        )
+        ds.connect(ds_broadcast.broadcast(map_state_desc)).process(
+            MyBroadcastProcessFunction(map_state_desc), output_type=Types.STRING()
+        ).add_sink(self.test_sink)
+
+        self.env.execute("test_co_broadcast_process")
+        expected = ["0a", "0a", "1b", "1b", "a2", "a4", "b1", "b3", "b5"]
+        self.assert_equals_sorted(expected, self.test_sink.get_results())
+
 
 class StreamingModeDataStreamTests(DataStreamTests, PyFlinkStreamingTestCase):
     def test_data_stream_name(self):
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations.py b/flink-python/pyflink/fn_execution/beam/beam_operations.py
index 601c0f51082..3bf739d44cb 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations.py
@@ -24,7 +24,7 @@ from apache_beam.utils import proto_utils
 from pyflink.fn_execution import flink_fn_execution_pb2
 from pyflink.fn_execution.coders import from_proto, from_type_info_proto, TimeWindowCoder, \
     CountWindowCoder, FlattenRowCoder
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
+from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend, RemoteOperatorStateBackend
 
 import pyflink.fn_execution.datastream.operations as datastream_operations
 import pyflink.fn_execution.table.operations as table_operations
@@ -152,8 +152,18 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
         side_inputs=None,
         output_coders=[output_coders[tag] for tag in output_tags])
     name = common.NameContext(transform_proto.unique_name)
-
     serialized_fn = spec.serialized_fn
+
+    if isinstance(serialized_fn, flink_fn_execution_pb2.UserDefinedDataStreamFunction):
+        operator_state_backend = RemoteOperatorStateBackend(
+            factory.state_handler,
+            serialized_fn.state_cache_size,
+            serialized_fn.map_state_read_cache_size,
+            serialized_fn.map_state_write_cache_size,
+        )
+    else:
+        operator_state_backend = None
+
     if hasattr(serialized_fn, "key_type"):
         # keyed operation, need to create the KeyedStateBackend.
         row_schema = serialized_fn.key_type.row_schema
@@ -180,7 +190,9 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
             factory.state_sampler,
             consumers,
             internal_operation_cls,
-            keyed_state_backend)
+            keyed_state_backend,
+            operator_state_backend,
+        )
     elif internal_operation_cls == datastream_operations.StatefulOperation:
         key_row_coder = from_type_info_proto(serialized_fn.key_type_info)
         keyed_state_backend = RemoteKeyedStateBackend(
@@ -197,7 +209,9 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
             factory.state_sampler,
             consumers,
             internal_operation_cls,
-            keyed_state_backend)
+            keyed_state_backend,
+            operator_state_backend,
+        )
     else:
         return beam_operation_cls(
             name,
@@ -205,4 +219,6 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
             factory.counter_factory,
             factory.state_sampler,
             consumers,
-            internal_operation_cls)
+            internal_operation_cls,
+            operator_state_backend,
+        )
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
index 9808d23bd4b..29e907aa1ea 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
@@ -53,6 +53,7 @@ cdef class FunctionOperation(Operation):
     cdef object process_element
     cdef object operation
     cdef object operation_cls
+    cdef object operator_state_backend
     cdef object _profiler
     cdef object generate_operation(self)
 
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
index 03dbcf2d1d9..1bcea4bfd1f 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
@@ -110,7 +110,8 @@ cdef class FunctionOperation(Operation):
     each input element.
     """
 
-    def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls):
+    def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls,
+                 operator_state_backend):
         super(FunctionOperation, self).__init__(name, spec, counter_factory, sampler)
         consumer = consumers[DEFAULT_OUTPUT_TAG][0]
         if isinstance(consumer, DataOutputOperation):
@@ -125,6 +126,7 @@ cdef class FunctionOperation(Operation):
         self._output_processors = FunctionOperation._create_output_processors(consumers)  \
             # type: Dict[str, List[OutputProcessor]]
         self.operation_cls = operation_cls
+        self.operator_state_backend = operator_state_backend
         self.operation = self.generate_operation()
         self.process_element = self.operation.process_element
         self.operation.open()
@@ -227,24 +229,32 @@ cdef class FunctionOperation(Operation):
 
 
 cdef class StatelessFunctionOperation(FunctionOperation):
-    def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls):
+    def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls,
+                 operator_state_backend):
         super(StatelessFunctionOperation, self).__init__(
-            name, spec, counter_factory, sampler, consumers, operation_cls)
+            name, spec, counter_factory, sampler, consumers, operation_cls, operator_state_backend)
 
     cdef object generate_operation(self):
-        return self.operation_cls(self.spec.serialized_fn)
+        if self.operator_state_backend is not None:
+            return self.operation_cls(self.spec.serialized_fn, self.operator_state_backend)
+        else:
+            return self.operation_cls(self.spec.serialized_fn)
 
 
 cdef class StatefulFunctionOperation(FunctionOperation):
     def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls,
-                 keyed_state_backend):
+                 keyed_state_backend, operator_state_backend):
         self._keyed_state_backend = keyed_state_backend
         self._reusable_windowed_value = windowed_value.create(None, -1, None, None)
         super(StatefulFunctionOperation, self).__init__(
-            name, spec, counter_factory, sampler, consumers, operation_cls)
+            name, spec, counter_factory, sampler, consumers, operation_cls, operator_state_backend)
 
     cdef object generate_operation(self):
-        return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend)
+        if self.operator_state_backend is not None:
+            return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend,
+                                      self.operator_state_backend)
+        else:
+            return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend)
 
     cpdef void add_timer_info(self, timer_family_id, timer_info):
         # ignore timer_family_id
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
index 8c85af43163..603ba4c3b55 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
@@ -71,12 +71,14 @@ class FunctionOperation(Operation):
     each input element.
     """
 
-    def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls):
+    def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls,
+                 operator_state_backend):
         super(FunctionOperation, self).__init__(name, spec, counter_factory, sampler)
         self._output_processors = self._create_output_processors(
             consumers
         )  # type: Dict[str, List[OutputProcessor]]
         self.operation_cls = operation_cls
+        self.operator_state_backend = operator_state_backend
         self.operation = self.generate_operation()
         self.process_element = self.operation.process_element
         self.operation.open()
@@ -176,34 +178,34 @@ class FunctionOperation(Operation):
 
 
 class StatelessFunctionOperation(FunctionOperation):
-    def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls):
+    def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls,
+                 operator_state_backend):
         super(StatelessFunctionOperation, self).__init__(
-            name, spec, counter_factory, sampler, consumers, operation_cls
+            name, spec, counter_factory, sampler, consumers, operation_cls, operator_state_backend
         )
 
     def generate_operation(self):
-        return self.operation_cls(self.spec.serialized_fn)
+        if self.operator_state_backend is not None:
+            return self.operation_cls(self.spec.serialized_fn, self.operator_state_backend)
+        else:
+            return self.operation_cls(self.spec.serialized_fn)
 
 
 class StatefulFunctionOperation(FunctionOperation):
-    def __init__(
-        self,
-        name,
-        spec,
-        counter_factory,
-        sampler,
-        consumers,
-        operation_cls,
-        keyed_state_backend,
-    ):
+    def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls,
+                 keyed_state_backend, operator_state_backend):
         self._keyed_state_backend = keyed_state_backend
         self._reusable_windowed_value = windowed_value.create(None, -1, None, None)
         super(StatefulFunctionOperation, self).__init__(
-            name, spec, counter_factory, sampler, consumers, operation_cls
+            name, spec, counter_factory, sampler, consumers, operation_cls, operator_state_backend
         )
 
     def generate_operation(self):
-        return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend)
+        if self.operator_state_backend is not None:
+            return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend,
+                                      self.operator_state_backend)
+        else:
+            return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend)
 
     def add_timer_info(self, timer_family_id: str, timer_info: TimerInfo):
         # ignore timer_family_id
diff --git a/flink-python/pyflink/fn_execution/datastream/operations.py b/flink-python/pyflink/fn_execution/datastream/operations.py
index c13db669764..845ba039b3a 100644
--- a/flink-python/pyflink/fn_execution/datastream/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/operations.py
@@ -16,16 +16,20 @@
 # limitations under the License.
 ################################################################################
 import abc
+from typing import cast
 
 from pyflink.common import Row
 from pyflink.common.serializer import VoidNamespaceSerializer
 from pyflink.datastream import TimeDomain, RuntimeContext
+from pyflink.datastream.functions import BroadcastProcessFunction
 from pyflink.datastream.window import WindowOperationDescriptor
 from pyflink.fn_execution import pickle
 from pyflink.fn_execution.datastream.process_function import (
     InternalKeyedProcessFunctionOnTimerContext,
     InternalKeyedProcessFunctionContext,
     InternalProcessFunctionContext,
+    InternalBroadcastProcessFunctionContext,
+    InternalReadOnlyBroadcastProcessFunctionContext,
 )
 from pyflink.fn_execution.datastream.runtime_context import StreamingRuntimeContext
 from pyflink.fn_execution.datastream.window.window_operator import WindowOperator
@@ -46,14 +50,17 @@ DATA_STREAM_STATEFUL_FUNCTION_URN = "flink:transform:ds:stateful_function:v1"
 
 
 class Operation(abc.ABC):
-    def __init__(self, serialized_fn):
+    def __init__(self, serialized_fn, operator_state_backend=None):
         if serialized_fn.metric_enabled:
             self.base_metric_group = GenericMetricGroup(None, None)
         else:
             self.base_metric_group = None
+        self.operator_state_backend = operator_state_backend
 
     def finish(self):
         self._update_gauge(self.base_metric_group)
+        if self.operator_state_backend is not None:
+            self.operator_state_backend.commit()
 
     def _update_gauge(self, base_metric_group):
         if base_metric_group is not None:
@@ -75,8 +82,8 @@ class Operation(abc.ABC):
 
 
 class StatelessOperation(Operation):
-    def __init__(self, serialized_fn):
-        super(StatelessOperation, self).__init__(serialized_fn)
+    def __init__(self, serialized_fn, operator_state_backend):
+        super(StatelessOperation, self).__init__(serialized_fn, operator_state_backend)
         (
             self.open_func,
             self.close_func,
@@ -86,6 +93,7 @@ class StatelessOperation(Operation):
             runtime_context=StreamingRuntimeContext.of(
                 serialized_fn.runtime_context, self.base_metric_group
             ),
+            operator_state_store=operator_state_backend,
         )
 
     def open(self):
@@ -99,8 +107,8 @@ class StatelessOperation(Operation):
 
 
 class StatefulOperation(Operation):
-    def __init__(self, serialized_fn, keyed_state_backend):
-        super(StatefulOperation, self).__init__(serialized_fn)
+    def __init__(self, serialized_fn, keyed_state_backend, operator_state_backend):
+        super(StatefulOperation, self).__init__(serialized_fn, operator_state_backend)
         self.keyed_state_backend = keyed_state_backend
         (
             self.open_func,
@@ -139,7 +147,7 @@ class StatefulOperation(Operation):
 
 
 def extract_stateless_function(
-    user_defined_function_proto, runtime_context: RuntimeContext
+    user_defined_function_proto, runtime_context: RuntimeContext, operator_state_store
 ):
     """
     Extracts user-defined-function from the proto representation of a
@@ -147,6 +155,7 @@ def extract_stateless_function(
 
     :param user_defined_function_proto: the proto representation of the Python :class:`Function`
     :param runtime_context: the streaming runtime context
+    :param operator_state_store: operator state store for getting broadcast states
     """
     from pyflink.fn_execution import flink_fn_execution_pb2
 
@@ -218,6 +227,40 @@ def extract_stateless_function(
 
             process_element_func = wrapped_func
 
+        elif func_type == UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS:
+            user_defined_func = cast(BroadcastProcessFunction, user_defined_func)
+            process_element = user_defined_func.process_element
+            process_broadcast_element = user_defined_func.process_broadcast_element
+            broadcast_ctx = InternalBroadcastProcessFunctionContext(
+                NonKeyedTimerServiceImpl(), operator_state_store
+            )
+            read_only_broadcast_ctx = InternalReadOnlyBroadcastProcessFunctionContext(
+                NonKeyedTimerServiceImpl(), operator_state_store
+            )
+
+            def wrapped_func(value):
+                # VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK,
+                #   [isNormal, broadcastInput, normalInput]]
+                timestamp = value[0]
+                watermark = value[1]
+                broadcast_ctx.set_timestamp(timestamp)
+                cast(
+                    TimerServiceImpl, broadcast_ctx.timer_service()
+                ).advance_watermark(watermark)
+                read_only_broadcast_ctx.set_timestamp(timestamp)
+                cast(
+                    TimerServiceImpl, read_only_broadcast_ctx.timer_service()
+                ).advance_watermark(watermark)
+
+                data = value[2]
+                if data[0]:
+                    results = process_element(data[1], read_only_broadcast_ctx)
+                else:
+                    results = process_broadcast_element(data[2], broadcast_ctx)
+                yield from _emit_results(timestamp, watermark, results, has_side_output)
+
+            process_element_func = wrapped_func
+
         else:
             raise Exception("Unsupported function_type: " + str(func_type))
 
diff --git a/flink-python/pyflink/fn_execution/datastream/process_function.py b/flink-python/pyflink/fn_execution/datastream/process_function.py
index 8eefff3f92a..2441b1f43df 100644
--- a/flink-python/pyflink/fn_execution/datastream/process_function.py
+++ b/flink-python/pyflink/fn_execution/datastream/process_function.py
@@ -15,10 +15,14 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from typing import cast
 
 from pyflink.datastream import TimerService, TimeDomain
 from pyflink.datastream.functions import KeyedProcessFunction, KeyedCoProcessFunction, \
-    ProcessFunction, CoProcessFunction
+    ProcessFunction, CoProcessFunction, BroadcastProcessFunction
+from pyflink.datastream.state import MapStateDescriptor, BroadcastState, ReadOnlyBroadcastState, \
+    OperatorStateStore
+from pyflink.fn_execution.internal_state import InternalBroadcastState
 
 
 class InternalKeyedProcessFunctionOnTimerContext(
@@ -99,3 +103,56 @@ class InternalProcessFunctionContext(ProcessFunction.Context, CoProcessFunction.
 
     def set_timestamp(self, ts: int):
         self._timestamp = ts
+
+
+class InternalBroadcastProcessFunctionContext(BroadcastProcessFunction.Context):
+    def __init__(self, timer_service: TimerService, operator_state_store: OperatorStateStore):
+        self._timer_service = timer_service
+        self._timestamp = None
+        self._operator_state_store = operator_state_store
+
+    def timer_service(self) -> TimerService:
+        return self._timer_service
+
+    def timestamp(self) -> int:
+        return self._timestamp
+
+    def current_processing_time(self) -> int:
+        return self._timer_service.current_processing_time()
+
+    def current_watermark(self) -> int:
+        return self._timer_service.current_watermark()
+
+    def get_broadcast_state(self, state_descriptor: MapStateDescriptor) -> BroadcastState:
+        return self._operator_state_store.get_broadcast_state(state_descriptor)
+
+    def set_timestamp(self, ts: int):
+        self._timestamp = ts
+
+
+class InternalReadOnlyBroadcastProcessFunctionContext(BroadcastProcessFunction.ReadOnlyContext):
+    def __init__(self, timer_server: TimerService, operator_state_store: OperatorStateStore):
+        self._timer_service = timer_server
+        self._timestamp = None
+        self._operator_state_store = operator_state_store
+
+    def timer_service(self) -> TimerService:
+        return self._timer_service
+
+    def timestamp(self) -> int:
+        return self._timestamp
+
+    def current_processing_time(self) -> int:
+        return self._timer_service.current_processing_time()
+
+    def current_watermark(self) -> int:
+        return self._timer_service.current_watermark()
+
+    def get_broadcast_state(self, state_descriptor: MapStateDescriptor) -> ReadOnlyBroadcastState:
+        return cast(
+            InternalBroadcastState,
+            self._operator_state_store.get_broadcast_state(state_descriptor)
+        ).to_read_only_broadcast_state()
+
+    def set_timestamp(self, ts: int):
+        self._timestamp = ts
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index 80cbaab505d..e64fb497d50 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='flink-fn-execution.proto',
   package='org.apache.flink.fn_execution.v1',
   syntax='proto3',
-  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
+  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
 )
 
 
@@ -374,14 +374,18 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE = _descriptor.EnumDescriptor(
       options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
-      name='REVISE_OUTPUT', index=5, number=100,
+      name='CO_BROADCAST_PROCESS', index=5, number=5,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='REVISE_OUTPUT', index=6, number=100,
       options=None,
       type=None),
   ],
   containing_type=None,
   options=None,
-  serialized_start=6861,
-  serialized_end=6976,
+  serialized_start=6862,
+  serialized_end=7003,
 )
 _sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE)
 
@@ -406,8 +410,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES = _descriptor.EnumD
   ],
   containing_type=None,
   options=None,
-  serialized_start=8538,
-  serialized_end=8636,
+  serialized_start=8565,
+  serialized_end=8663,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES)
 
@@ -424,8 +428,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY = _descri
   ],
   containing_type=None,
   options=None,
-  serialized_start=8638,
-  serialized_end=8680,
+  serialized_start=8665,
+  serialized_end=8707,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY)
 
@@ -450,8 +454,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=8682,
-  serialized_end=8750,
+  serialized_start=8709,
+  serialized_end=8777,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE)
 
@@ -472,8 +476,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=8752,
-  serialized_end=8826,
+  serialized_start=8779,
+  serialized_end=8853,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY)
 
@@ -490,8 +494,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC = _descriptor.EnumDescript
   ],
   containing_type=None,
   options=None,
-  serialized_start=8828,
-  serialized_end=8871,
+  serialized_start=8855,
+  serialized_end=8898,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC)
 
@@ -512,8 +516,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=9838,
-  serialized_end=9870,
+  serialized_start=9865,
+  serialized_end=9897,
 )
 _sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE)
 
@@ -2039,7 +2043,7 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=5984,
-  serialized_end=6976,
+  serialized_end=7003,
 )
 
 
@@ -2076,8 +2080,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY = _
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7760,
-  serialized_end=7848,
+  serialized_start=7787,
+  serialized_end=7875,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY = _descriptor.Descriptor(
@@ -2106,8 +2110,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTR
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7850,
-  serialized_end=7925,
+  serialized_start=7877,
+  serialized_end=7952,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = _descriptor.Descriptor(
@@ -2160,8 +2164,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = _descript
       name='CleanupStrategy', full_name='org.apache.flink.fn_execution.v1.StateDescriptor.StateTTLConfig.CleanupStrategies.MapStrategiesEntry.CleanupStrategy',
       index=0, containing_type=None, fields=[]),
   ],
-  serialized_start=7928,
-  serialized_end=8536,
+  serialized_start=7955,
+  serialized_end=8563,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = _descriptor.Descriptor(
@@ -2199,8 +2203,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7582,
-  serialized_end=8680,
+  serialized_start=7609,
+  serialized_end=8707,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor(
@@ -2260,8 +2264,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7111,
-  serialized_end=8871,
+  serialized_start=7138,
+  serialized_end=8898,
 )
 
 _STATEDESCRIPTOR = _descriptor.Descriptor(
@@ -2297,8 +2301,8 @@ _STATEDESCRIPTOR = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6979,
-  serialized_end=8871,
+  serialized_start=7006,
+  serialized_end=8898,
 )
 
 
@@ -2328,8 +2332,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9467,
-  serialized_end=9541,
+  serialized_start=9494,
+  serialized_end=9568,
 )
 
 _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
@@ -2358,8 +2362,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9543,
-  serialized_end=9610,
+  serialized_start=9570,
+  serialized_end=9637,
 )
 
 _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
@@ -2388,8 +2392,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9612,
-  serialized_end=9681,
+  serialized_start=9639,
+  serialized_end=9708,
 )
 
 _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
@@ -2418,8 +2422,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9683,
-  serialized_end=9762,
+  serialized_start=9710,
+  serialized_end=9789,
 )
 
 _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
@@ -2448,8 +2452,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9764,
-  serialized_end=9836,
+  serialized_start=9791,
+  serialized_end=9863,
 )
 
 _CODERINFODESCRIPTOR = _descriptor.Descriptor(
@@ -2524,8 +2528,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor(
       name='data_type', full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type',
       index=0, containing_type=None, fields=[]),
   ],
-  serialized_start=8874,
-  serialized_end=9883,
+  serialized_start=8901,
+  serialized_end=9910,
 )
 
 _INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
diff --git a/flink-python/pyflink/fn_execution/internal_state.py b/flink-python/pyflink/fn_execution/internal_state.py
index 5a0bd5627a6..59475f149ef 100644
--- a/flink-python/pyflink/fn_execution/internal_state.py
+++ b/flink-python/pyflink/fn_execution/internal_state.py
@@ -114,8 +114,20 @@ class InternalMapState(InternalKvState[N], MapState[K, V], ABC):
 
 
 class InternalReadOnlyBroadcastState(ReadOnlyBroadcastState[K, V], ABC):
+    """
+    The peer to :class:`ReadOnlyBroadcastState`.
+    """
     pass
 
 
 class InternalBroadcastState(InternalReadOnlyBroadcastState[K, V], BroadcastState[K, V], ABC):
-    pass
+    """
+    The peer to :class:`BroadcastState`.
+    """
+
+    @abstractmethod
+    def to_read_only_broadcast_state(self) -> InternalReadOnlyBroadcastState[K, V]:
+        """
+        Convert to :class:`ReadOnlyBroadcastState` interface with the same underlying state.
+        """
+        pass
diff --git a/flink-python/pyflink/fn_execution/state_impl.py b/flink-python/pyflink/fn_execution/state_impl.py
index 1252508ccfc..64fe4b1b349 100644
--- a/flink-python/pyflink/fn_execution/state_impl.py
+++ b/flink-python/pyflink/fn_execution/state_impl.py
@@ -1266,7 +1266,14 @@ class SynchronousBroadcastRuntimeState(
         return SynchronousReadOnlyBroadcastRuntimeState(self._name, self._internal_map_state)
 
 
-class RemoteOperatorStateBackend(OperatorStateStore):
+class OperatorStateBackend(OperatorStateStore, ABC):
+
+    @abstractmethod
+    def commit(self):
+        pass
+
+
+class RemoteOperatorStateBackend(OperatorStateBackend):
     def __init__(
         self, state_handler, state_cache_size, map_state_read_cache_size, map_state_write_cache_size
     ):
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto b/flink-python/pyflink/proto/flink-fn-execution.proto
index 5b8ba5e9b42..83be0def5c1 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -346,6 +346,7 @@ message UserDefinedDataStreamFunction {
     KEYED_PROCESS = 2;
     KEYED_CO_PROCESS = 3;
     WINDOW = 4;
+    CO_BROADCAST_PROCESS = 5;
     REVISE_OUTPUT = 100;
   }
 
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index 8ae55cc8760..ca466b9abd9 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -24,7 +24,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.python.PythonConfig;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.graph.TransformationTranslator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.python.AbstractDataStreamPythonFunctionOperator;
@@ -35,16 +38,20 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.python.PythonBroadcastStateTransformation;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.translators.python.PythonBroadcastStateTransformationTranslator;
 import org.apache.flink.util.OutputTag;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Queues;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
@@ -82,8 +89,7 @@ public class PythonConfigUtil {
         return (Configuration) configurationField.get(env);
     }
 
-    public static void configPythonOperator(StreamExecutionEnvironment env)
-            throws IllegalAccessException, NoSuchFieldException {
+    public static void configPythonOperator(StreamExecutionEnvironment env) throws Exception {
         final Configuration config =
                 extractPythonConfiguration(env.getCachedFiles(), env.getConfiguration());
 
@@ -103,6 +109,7 @@ public class PythonConfigUtil {
         }
 
         processSideOutput(env.getTransformations());
+        registerPythonBroadcastTransformationTranslator();
     }
 
     /** Extract the configurations which is used in the Python operators. */
@@ -240,6 +247,8 @@ public class PythonConfigUtil {
         } else if (transform instanceof AbstractMultipleInputTransformation) {
             return isPythonOperator(
                     ((AbstractMultipleInputTransformation<?>) transform).getOperatorFactory());
+        } else if (transform instanceof PythonBroadcastStateTransformation) {
+            return true;
         } else {
             return false;
         }
@@ -325,4 +334,33 @@ public class PythonConfigUtil {
             return Optional.of(inputTransformation);
         }
     }
+
+    @SuppressWarnings("rawtypes,unchecked")
+    public static void registerPythonBroadcastTransformationTranslator() throws Exception {
+        final Field translatorMapField =
+                StreamGraphGenerator.class.getDeclaredField("translatorMap");
+        translatorMapField.setAccessible(true);
+        final Map<Class<? extends Transformation>, TransformationTranslator<?, ?>> translatorMap =
+                (Map<Class<? extends Transformation>, TransformationTranslator<?, ?>>)
+                        translatorMapField.get(null);
+        final Field underlyingMapField = translatorMap.getClass().getDeclaredField("m");
+        underlyingMapField.setAccessible(true);
+        final Map<Class<? extends Transformation>, TransformationTranslator<?, ?>> underlyingMap =
+                (Map<Class<? extends Transformation>, TransformationTranslator<?, ?>>)
+                        underlyingMapField.get(translatorMap);
+
+        underlyingMap.put(
+                PythonBroadcastStateTransformation.class,
+                new PythonBroadcastStateTransformationTranslator<>());
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static SingleOutputStreamOperator<?> createSingleOutputStreamOperator(
+            StreamExecutionEnvironment env, Transformation<?> transformation) throws Exception {
+        Constructor<SingleOutputStreamOperator> constructor =
+                SingleOutputStreamOperator.class.getDeclaredConstructor(
+                        StreamExecutionEnvironment.class, Transformation.class);
+        constructor.setAccessible(true);
+        return constructor.newInstance(env, transformation);
+    }
 }
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchCoBroadcastProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchCoBroadcastProcessOperator.java
new file mode 100644
index 00000000000..6cc07f76956
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchCoBroadcastProcessOperator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PythonBatchCoBroadcastProcessOperator} is responsible for executing the Python
+ * CoBroadcastProcess Function under BATCH mode, {@link PythonCoProcessOperator} is used under
+ * STREAMING mode. This operator forces to run out data from broadcast side first, and then process
+ * data from regular side.
+ *
+ * @param <IN1> The input type of the regular stream
+ * @param <IN2> The input type of the broadcast stream
+ * @param <OUT> The output type of the CoBroadcastProcess function
+ */
+@Internal
+public class PythonBatchCoBroadcastProcessOperator<IN1, IN2, OUT>
+        extends PythonCoProcessOperator<IN1, IN2, OUT>
+        implements BoundedMultiInput, InputSelectable {
+
+    private transient volatile boolean isBroadcastSideDone;
+
+    public PythonBatchCoBroadcastProcessOperator(
+            Configuration config,
+            DataStreamPythonFunctionInfo pythonFunctionInfo,
+            TypeInformation<IN1> inputTypeInfo1,
+            TypeInformation<IN2> inputTypeInfo2,
+            TypeInformation<OUT> outputTypeInfo) {
+        super(config, pythonFunctionInfo, inputTypeInfo1, inputTypeInfo2, outputTypeInfo);
+    }
+
+    @Override
+    public void endInput(int inputId) throws Exception {
+        if (inputId == 2) {
+            isBroadcastSideDone = true;
+        }
+    }
+
+    @Override
+    public InputSelection nextSelection() {
+        if (!isBroadcastSideDone) {
+            return InputSelection.SECOND;
+        } else {
+            return InputSelection.FIRST;
+        }
+    }
+
+    @Override
+    public void processElement1(StreamRecord<IN1> element) throws Exception {
+        Preconditions.checkState(
+                isBroadcastSideDone,
+                "Should not process regular input before broadcast side is done.");
+
+        super.processElement1(element);
+    }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java
new file mode 100644
index 00000000000..951846d0e65
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapper;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Co-Broadcast-Process operation, which will be
+ * translated into different operations by {@link
+ * org.apache.flink.streaming.runtime.translators.python.PythonBroadcastStateTransformationTranslator}.
+ */
+public class PythonBroadcastStateTransformation<IN1, IN2, OUT>
+        extends AbstractBroadcastStateTransformation<IN1, IN2, OUT> {
+
+    private final Configuration configuration;
+    private final DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo;
+
+    public PythonBroadcastStateTransformation(
+            String name,
+            Configuration configuration,
+            DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo,
+            Transformation<IN1> regularInput,
+            Transformation<IN2> broadcastInput,
+            List<MapStateDescriptor<?, ?>> broadcastStateDescriptors,
+            TypeInformation<OUT> outTypeInfo,
+            int parallelism) {
+        super(
+                name,
+                regularInput,
+                broadcastInput,
+                broadcastStateDescriptors,
+                outTypeInfo,
+                parallelism);
+        this.configuration = configuration;
+        this.dataStreamPythonFunctionInfo = dataStreamPythonFunctionInfo;
+        updateManagedMemoryStateBackendUseCase(false);
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
+    public DataStreamPythonFunctionInfo getDataStreamPythonFunctionInfo() {
+        return dataStreamPythonFunctionInfo;
+    }
+
+    public static List<MapStateDescriptor<ByteArrayWrapper, byte[]>>
+            convertStateNamesToStateDescriptors(Collection<String> names) {
+        List<MapStateDescriptor<ByteArrayWrapper, byte[]>> descriptors =
+                new ArrayList<>(names.size());
+        TypeSerializer<byte[]> byteArraySerializer =
+                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO.createSerializer(
+                        new ExecutionConfig());
+        for (String name : names) {
+            descriptors.add(
+                    new MapStateDescriptor<>(
+                            name, ByteArrayWrapperSerializer.INSTANCE, byteArraySerializer));
+        }
+        return descriptors;
+    }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java
new file mode 100644
index 00000000000..5106db25210
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.translators.python;
+
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.python.PythonBatchCoBroadcastProcessOperator;
+import org.apache.flink.streaming.api.operators.python.PythonCoProcessOperator;
+import org.apache.flink.streaming.api.transformations.python.PythonBroadcastStateTransformation;
+import org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/**
+ * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} that translates {@link
+ * PythonBroadcastStateTransformation} into {@link PythonCoProcessOperator} or {@link
+ * PythonBatchCoBroadcastProcessOperator}.
+ */
+public class PythonBroadcastStateTransformationTranslator<IN1, IN2, OUT>
+        extends AbstractTwoInputTransformationTranslator<
+                IN1, IN2, OUT, PythonBroadcastStateTransformation<IN1, IN2, OUT>> {
+
+    @Override
+    protected Collection<Integer> translateForBatchInternal(
+            PythonBroadcastStateTransformation<IN1, IN2, OUT> transformation, Context context) {
+        Preconditions.checkNotNull(transformation);
+        Preconditions.checkNotNull(context);
+
+        PythonBatchCoBroadcastProcessOperator<IN1, IN2, OUT> operator =
+                new PythonBatchCoBroadcastProcessOperator<>(
+                        transformation.getConfiguration(),
+                        transformation.getDataStreamPythonFunctionInfo(),
+                        transformation.getRegularInput().getOutputType(),
+                        transformation.getBroadcastInput().getOutputType(),
+                        transformation.getOutputType());
+
+        return translateInternal(
+                transformation,
+                transformation.getRegularInput(),
+                transformation.getBroadcastInput(),
+                SimpleOperatorFactory.of(operator),
+                null,
+                null,
+                null,
+                context);
+    }
+
+    @Override
+    protected Collection<Integer> translateForStreamingInternal(
+            PythonBroadcastStateTransformation<IN1, IN2, OUT> transformation, Context context) {
+        Preconditions.checkNotNull(transformation);
+        Preconditions.checkNotNull(context);
+
+        PythonCoProcessOperator<IN1, IN2, OUT> operator =
+                new PythonCoProcessOperator<>(
+                        transformation.getConfiguration(),
+                        transformation.getDataStreamPythonFunctionInfo(),
+                        transformation.getRegularInput().getOutputType(),
+                        transformation.getBroadcastInput().getOutputType(),
+                        transformation.getOutputType());
+
+        return translateInternal(
+                transformation,
+                transformation.getRegularInput(),
+                transformation.getBroadcastInput(),
+                SimpleOperatorFactory.of(operator),
+                null,
+                null,
+                null,
+                context);
+    }
+}