You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/06 03:06:28 UTC

[GitHub] [flink] Vancior opened a new pull request, #19878: [FLINK-27584][python] Support non-keyed co-broadcast processing

Vancior opened a new pull request, #19878:
URL: https://github.com/apache/flink/pull/19878

   
   
   ## What is the purpose of the change
   
   This PR implements non-keyed co-broadcast processing interfaces, including `BroadcastStream`, `BroadcastConnectedStream` and `BroadcastProcessFunction`.
   
   
   ## Brief change log
   
   - add `BroadcastStream`, `BroadcastConnectedStream`, `BroadcastProcessFunction` and related internal classes
   - add an distinct `CO_BROADCAST_PROCESS` function type for broadcast processing
   - introduce `PythonBroadcastStateTransformation` and corresponding translator to translate python broadcast process into different operators for batch or streaming
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   
   This change added tests and can be verified as follows:
   
   - `test_co_broadcast_process` in test_data_stream.py
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (Python generated doc)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on a diff in pull request #19878: [FLINK-27584][python] Support non-keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on code in PR #19878:
URL: https://github.com/apache/flink/pull/19878#discussion_r906952038


##########
flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapper;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Co-Broadcast-Process operation, which will be
+ * translated into different operations by {@link
+ * org.apache.flink.streaming.runtime.translators.python.PythonBroadcastStateTransformationTranslator}.
+ */
+public class PythonBroadcastStateTransformation<IN1, IN2, OUT>
+        extends AbstractBroadcastStateTransformation<IN1, IN2, OUT> {
+
+    private final Configuration configuration;
+    private final DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo;
+
+    public PythonBroadcastStateTransformation(
+            String name,
+            Configuration configuration,
+            DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo,
+            Transformation<IN1> regularInput,
+            Transformation<IN2> broadcastInput,
+            List<MapStateDescriptor<?, ?>> broadcastStateDescriptors,
+            TypeInformation<OUT> outTypeInfo,
+            int parallelism) {
+        super(
+                name,
+                regularInput,
+                broadcastInput,
+                broadcastStateDescriptors,
+                outTypeInfo,
+                parallelism);
+        this.configuration = configuration;
+        this.dataStreamPythonFunctionInfo = dataStreamPythonFunctionInfo;
+        updateManagedMemoryStateBackendUseCase(false);

Review Comment:
   Could the step put in the PythonConfigUtil just like other python transformations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #19878: [FLINK-27584][python] Support non-keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #19878:
URL: https://github.com/apache/flink/pull/19878#discussion_r897938554


##########
flink-python/pyflink/fn_execution/beam/beam_operations.py:
##########
@@ -151,9 +150,19 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
         input=None,
         side_inputs=None,
         output_coders=[output_coders[tag] for tag in output_tags])
+    serialized_fn = spec.serialized_fn
     name = common.NameContext(transform_proto.unique_name)
 
-    serialized_fn = spec.serialized_fn
+    if isinstance(serialized_fn, flink_fn_execution_pb2.UserDefinedDataStreamFunction):
+        operator_state_backend = RemoteOperatorStateBackend(
+            factory.state_handler,
+            serialized_fn.state_cache_size,
+            serialized_fn.map_state_read_cache_size,
+            serialized_fn.map_state_write_cache_size,
+        )
+    else:
+        operator_state_backend = None

Review Comment:
   Only DataStream API can use operator state, so I guess that's okay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on a diff in pull request #19878: [FLINK-27584][python] Support non-keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on code in PR #19878:
URL: https://github.com/apache/flink/pull/19878#discussion_r906952038


##########
flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapper;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Co-Broadcast-Process operation, which will be
+ * translated into different operations by {@link
+ * org.apache.flink.streaming.runtime.translators.python.PythonBroadcastStateTransformationTranslator}.
+ */
+public class PythonBroadcastStateTransformation<IN1, IN2, OUT>
+        extends AbstractBroadcastStateTransformation<IN1, IN2, OUT> {
+
+    private final Configuration configuration;
+    private final DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo;
+
+    public PythonBroadcastStateTransformation(
+            String name,
+            Configuration configuration,
+            DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo,
+            Transformation<IN1> regularInput,
+            Transformation<IN2> broadcastInput,
+            List<MapStateDescriptor<?, ?>> broadcastStateDescriptors,
+            TypeInformation<OUT> outTypeInfo,
+            int parallelism) {
+        super(
+                name,
+                regularInput,
+                broadcastInput,
+                broadcastStateDescriptors,
+                outTypeInfo,
+                parallelism);
+        this.configuration = configuration;
+        this.dataStreamPythonFunctionInfo = dataStreamPythonFunctionInfo;
+        updateManagedMemoryStateBackendUseCase(false);

Review Comment:
   Could the step put in the PythonConfigUtil just like other python transformations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on a diff in pull request #19878: [FLINK-27584][python] Support non-keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on code in PR #19878:
URL: https://github.com/apache/flink/pull/19878#discussion_r899713731


##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -561,13 +580,42 @@ def forward(self) -> 'DataStream':
         """
         return DataStream(self._j_data_stream.forward())
 
+    @overload
     def broadcast(self) -> 'DataStream':
+        pass
+
+    @overload
+    def broadcast(self, *args) -> 'BroadcastStream':

Review Comment:
   add the typeint `MapStateDescriptor`?



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -561,13 +580,42 @@ def forward(self) -> 'DataStream':
         """
         return DataStream(self._j_data_stream.forward())
 
+    @overload
     def broadcast(self) -> 'DataStream':
+        pass
+
+    @overload
+    def broadcast(self, *args) -> 'BroadcastStream':
+        pass
+
+    def broadcast(self, *args):

Review Comment:
   add the typehint too?



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1586,7 +1634,7 @@ def rebalance(self) -> 'DataStream':
     def forward(self) -> 'DataStream':
         raise Exception('Cannot override partitioning for KeyedStream.')
 
-    def broadcast(self) -> 'DataStream':
+    def broadcast(self, *args):

Review Comment:
   ditto



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -484,15 +486,32 @@ def union(self, *streams: 'DataStream') -> 'DataStream':
         j_united_stream = self._j_data_stream.union(j_data_stream_arr)
         return DataStream(j_data_stream=j_united_stream)
 
+    @overload
     def connect(self, ds: 'DataStream') -> 'ConnectedStreams':
+        pass
+
+    @overload
+    def connect(self, ds: 'BroadcastStream') -> 'BroadcastConnectedStream':
+        pass
+
+    def connect(self, ds):

Review Comment:
   add the typeint `Union['DataStream', 'BroadcastStream']`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo closed pull request #19878: [FLINK-27586][python] Support non-keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
HuangXingBo closed pull request #19878: [FLINK-27586][python] Support non-keyed co-broadcast processing
URL: https://github.com/apache/flink/pull/19878


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #19878: [FLINK-27584][python] Support non-keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #19878:
URL: https://github.com/apache/flink/pull/19878#discussion_r897902264


##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1586,7 +1618,7 @@ def rebalance(self) -> 'DataStream':
     def forward(self) -> 'DataStream':
         raise Exception('Cannot override partitioning for KeyedStream.')
 
-    def broadcast(self) -> 'DataStream':
+    def broadcast(self, *args) -> Union['DataStream', 'BroadcastStream']:

Review Comment:
   Yes, that's will be another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on a diff in pull request #19878: [FLINK-27584][python] Support non-keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on code in PR #19878:
URL: https://github.com/apache/flink/pull/19878#discussion_r895283506


##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -484,15 +486,25 @@ def union(self, *streams: 'DataStream') -> 'DataStream':
         j_united_stream = self._j_data_stream.union(j_data_stream_arr)
         return DataStream(j_data_stream=j_united_stream)
 
-    def connect(self, ds: 'DataStream') -> 'ConnectedStreams':
-        """
-        Creates a new 'ConnectedStreams' by connecting 'DataStream' outputs of (possible)
-        different types with each other. The DataStreams connected using this operator can
-        be used with CoFunctions to apply joint transformations.
-
-        :param ds: The DataStream with which this stream will be connected.
-        :return: The `ConnectedStreams`.
-        """
+    def connect(
+        self, ds: Union["DataStream", "BroadcastStream"]
+    ) -> Union["ConnectedStreams", "BroadcastConnectedStream"]:
+        """
+        If ds is a :class:`DataStream`, creates a new :class:`ConnectedStreams` by connecting
+        DataStream outputs of (possible) different types with each other. The DataStreams connected
+        using this operator can be used with CoFunctions to apply joint transformations.
+        If ds is a :class:`BroadcastStream`, creates a new :class:`BroadcastConnectedStream` by

Review Comment:
   add a blank line



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -2277,6 +2309,130 @@ def _is_keyed_stream(self):
         return isinstance(self.stream1, KeyedStream) and isinstance(self.stream2, KeyedStream)
 
 
+class BroadcastStream(object):
+    """
+    A BroadcastStream is a stream with :class:`state.BroadcastState` (s). This can be created by any
+    stream using the :meth:`DataStream.broadcast` method and implicitly creates states where the
+    user can store elements of the created :class:`BroadcastStream`. (see
+    :class:`BroadcastConnectedStream`).
+    Note that no further operation can be applied to these streams. The only available option is
+    to connect them with a keyed or non-keyed stream, using the :meth:`KeyedStream.connect` and the
+    :meth:`DataStream.connect` respectively. Applying these methods will result it a
+    :class:`BroadcastConnectedStream` for further processing.
+    """
+
+    def __init__(
+        self,
+        input_stream: "DataStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.input_stream = input_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+
+class BroadcastConnectedStream(object):
+    """
+    A BroadcastConnectedStream represents the result of connecting a keyed or non-keyed stream, with
+    a :class:`BroadcastStream` with :class:`~state.BroadcastState` (s). As in the case of
+    :class:`ConnectedStreams` these streams are useful for cases where operations on one stream
+    directly affect the operations on the other stream, usually via shared state between the
+    streams.
+    An example for the use of such connected streams would be to apply rules that change over time
+    onto another, possibly keyed stream. The stream with the broadcast state has the rules, and will
+    store them in the broadcast state, while the other stream will contain the elements to apply the
+    rules to. By broadcasting the rules, these will be available in all parallel instances, and can
+    be applied to all partitions of the other stream.
+    """
+
+    def __init__(
+        self,
+        non_broadcast_stream: "DataStream",
+        broadcast_stream: "BroadcastStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.non_broadcast_stream = non_broadcast_stream
+        self.broadcast_stream = broadcast_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+    def process(
+        self,
+        func: BroadcastProcessFunction,
+        output_type: TypeInformation = None,
+    ) -> "DataStream":
+        """
+        Assumes as inputs a :class:`BroadcastStream` and a :class:`DataStream` or
+        :class:`KeyedStream` and applies the given :class:`BroadcastProcessFunction` or on them,
+        thereby creating a transformed output stream.
+
+        :param func: The :class:`BroadcastProcessFunction` that is called for each element in the
+            stream.
+        :param output_type: The type of the output elements, should be
+            :class:`common.TypeInformation` or list (implicit :class:`RowTypeInfo`) or None (
+            implicit :meth:`Types.PICKLED_BYTE_ARRAY`).
+        :return: The transformed :class:`DataStream`.
+        """
+        if isinstance(func, BroadcastProcessFunction) and self._is_keyed_stream():
+            raise TypeError("BroadcastProcessFunction should be applied to non-keyed DataStream")
+
+        j_input_transformation1 = self.non_broadcast_stream._j_data_stream.getTransformation()
+        j_input_transformation2 = (
+            self.broadcast_stream.input_stream._j_data_stream.getTransformation()
+        )
+
+        if output_type is None:
+            output_type_info = Types.PICKLED_BYTE_ARRAY()  # type: TypeInformation
+        elif isinstance(output_type, list):

Review Comment:
   ```suggestion
           elif isinstance(output_type, List):
   ```



##########
flink-python/pyflink/fn_execution/datastream/operations.py:
##########
@@ -218,6 +227,37 @@ def wrapped_func(value):
 
             process_element_func = wrapped_func
 
+        elif func_type == UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS:
+            user_defined_func = cast(BroadcastProcessFunction, user_defined_func)
+            process_element = user_defined_func.process_element
+            process_broadcast_element = user_defined_func.process_broadcast_element
+            broadcast_ctx = InternalBroadcastProcessFunctionContext(
+                NonKeyedTimerServiceImpl(), operator_state_store
+            )
+            read_only_broadcast_ctx = InternalReadOnlyBroadcastProcessFunctionContext(
+                NonKeyedTimerServiceImpl(), operator_state_store
+            )
+
+            def wrapped_func(value):
+                # VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK,
+                #   [isNormal, broadcastInput, normalInput]]
+                timestamp = value[0]
+                watermark = value[1]
+                broadcast_ctx.set_timestamp(timestamp)
+                broadcast_ctx.timer_service().advance_watermark(watermark)

Review Comment:
   ```suggestion
                   cast(TimerServiceImpl, broadcast_ctx.timer_service()).advance_watermark(watermark)
   ```



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1586,7 +1618,7 @@ def rebalance(self) -> 'DataStream':
     def forward(self) -> 'DataStream':
         raise Exception('Cannot override partitioning for KeyedStream.')
 
-    def broadcast(self) -> 'DataStream':
+    def broadcast(self, *args) -> Union['DataStream', 'BroadcastStream']:

Review Comment:
   Are we not going to support broadcast on keyedstream?
   



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -561,13 +573,33 @@ def forward(self) -> 'DataStream':
         """
         return DataStream(self._j_data_stream.forward())
 
-    def broadcast(self) -> 'DataStream':
+    def broadcast(self, *args) -> Union["DataStream", "BroadcastStream"]:

Review Comment:
   What aout using `typing.overload`? https://docs.python.org/3/library/typing.html#typing.overload
   



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -2277,6 +2309,130 @@ def _is_keyed_stream(self):
         return isinstance(self.stream1, KeyedStream) and isinstance(self.stream2, KeyedStream)
 
 
+class BroadcastStream(object):
+    """
+    A BroadcastStream is a stream with :class:`state.BroadcastState` (s). This can be created by any
+    stream using the :meth:`DataStream.broadcast` method and implicitly creates states where the
+    user can store elements of the created :class:`BroadcastStream`. (see
+    :class:`BroadcastConnectedStream`).
+    Note that no further operation can be applied to these streams. The only available option is
+    to connect them with a keyed or non-keyed stream, using the :meth:`KeyedStream.connect` and the
+    :meth:`DataStream.connect` respectively. Applying these methods will result it a
+    :class:`BroadcastConnectedStream` for further processing.
+    """
+
+    def __init__(
+        self,
+        input_stream: "DataStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.input_stream = input_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+
+class BroadcastConnectedStream(object):
+    """
+    A BroadcastConnectedStream represents the result of connecting a keyed or non-keyed stream, with
+    a :class:`BroadcastStream` with :class:`~state.BroadcastState` (s). As in the case of
+    :class:`ConnectedStreams` these streams are useful for cases where operations on one stream
+    directly affect the operations on the other stream, usually via shared state between the
+    streams.
+    An example for the use of such connected streams would be to apply rules that change over time
+    onto another, possibly keyed stream. The stream with the broadcast state has the rules, and will
+    store them in the broadcast state, while the other stream will contain the elements to apply the
+    rules to. By broadcasting the rules, these will be available in all parallel instances, and can
+    be applied to all partitions of the other stream.
+    """
+
+    def __init__(
+        self,
+        non_broadcast_stream: "DataStream",
+        broadcast_stream: "BroadcastStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.non_broadcast_stream = non_broadcast_stream
+        self.broadcast_stream = broadcast_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+    def process(
+        self,
+        func: BroadcastProcessFunction,
+        output_type: TypeInformation = None,
+    ) -> "DataStream":
+        """
+        Assumes as inputs a :class:`BroadcastStream` and a :class:`DataStream` or
+        :class:`KeyedStream` and applies the given :class:`BroadcastProcessFunction` or on them,
+        thereby creating a transformed output stream.
+
+        :param func: The :class:`BroadcastProcessFunction` that is called for each element in the
+            stream.
+        :param output_type: The type of the output elements, should be
+            :class:`common.TypeInformation` or list (implicit :class:`RowTypeInfo`) or None (
+            implicit :meth:`Types.PICKLED_BYTE_ARRAY`).
+        :return: The transformed :class:`DataStream`.
+        """
+        if isinstance(func, BroadcastProcessFunction) and self._is_keyed_stream():
+            raise TypeError("BroadcastProcessFunction should be applied to non-keyed DataStream")
+
+        j_input_transformation1 = self.non_broadcast_stream._j_data_stream.getTransformation()
+        j_input_transformation2 = (
+            self.broadcast_stream.input_stream._j_data_stream.getTransformation()
+        )
+
+        if output_type is None:
+            output_type_info = Types.PICKLED_BYTE_ARRAY()  # type: TypeInformation
+        elif isinstance(output_type, list):
+            output_type_info = RowTypeInfo(output_type)
+        elif isinstance(output_type, TypeInformation):
+            output_type_info = output_type
+        else:
+            raise TypeError("output_type must be None, list or TypeInformation")
+        j_output_type = output_type_info.get_java_type_info()
+
+        from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
+
+        func_type = UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS  # type: ignore
+        func_name = "Co-Process-Broadcast"
+
+        gateway = get_gateway()
+        JPythonBroadcastStateTransformation = (
+            gateway.jvm.org.apache.flink.streaming.api.transformations.python
+        ).PythonBroadcastStateTransformation
+        j_state_names = ListConverter().convert(

Review Comment:
   Does the typeinfo set in MapDescriptor have any effect?



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -484,15 +486,25 @@ def union(self, *streams: 'DataStream') -> 'DataStream':
         j_united_stream = self._j_data_stream.union(j_data_stream_arr)
         return DataStream(j_data_stream=j_united_stream)
 
-    def connect(self, ds: 'DataStream') -> 'ConnectedStreams':
-        """
-        Creates a new 'ConnectedStreams' by connecting 'DataStream' outputs of (possible)
-        different types with each other. The DataStreams connected using this operator can
-        be used with CoFunctions to apply joint transformations.
-
-        :param ds: The DataStream with which this stream will be connected.
-        :return: The `ConnectedStreams`.
-        """
+    def connect(
+        self, ds: Union["DataStream", "BroadcastStream"]
+    ) -> Union["ConnectedStreams", "BroadcastConnectedStream"]:
+        """
+        If ds is a :class:`DataStream`, creates a new :class:`ConnectedStreams` by connecting
+        DataStream outputs of (possible) different types with each other. The DataStreams connected
+        using this operator can be used with CoFunctions to apply joint transformations.
+        If ds is a :class:`BroadcastStream`, creates a new :class:`BroadcastConnectedStream` by
+        connecting the current :class:`DataStream` with a :class:`BroadcastStream`. The latter can
+        be created using the :meth:`broadcast` method. The resulting stream can be further processed
+        using the :meth:`BroadcastConnectedStream.process` method.

Review Comment:
   add a note to declare the added version



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -561,13 +573,33 @@ def forward(self) -> 'DataStream':
         """
         return DataStream(self._j_data_stream.forward())
 
-    def broadcast(self) -> 'DataStream':
+    def broadcast(self, *args) -> Union["DataStream", "BroadcastStream"]:
         """
         Sets the partitioning of the DataStream so that the output elements are broadcasted to every
         parallel instance of the next operation.
+        If :class:`~state.MapStateDescriptor` (s) are passed in, it returns a
+        :class:`BroadcastStream` with :class:`~state.BroadcastState` (s) implicitly created as the
+        descriptors specified.
 
-        :return: The DataStream with broadcast partitioning set.
-        """
+        Example:

Review Comment:
   We also need the connect(DataStream) examples



##########
flink-python/pyflink/fn_execution/beam/beam_operations.py:
##########
@@ -151,9 +150,19 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
         input=None,
         side_inputs=None,
         output_coders=[output_coders[tag] for tag in output_tags])
+    serialized_fn = spec.serialized_fn
     name = common.NameContext(transform_proto.unique_name)
 
-    serialized_fn = spec.serialized_fn
+    if isinstance(serialized_fn, flink_fn_execution_pb2.UserDefinedDataStreamFunction):
+        operator_state_backend = RemoteOperatorStateBackend(
+            factory.state_handler,
+            serialized_fn.state_cache_size,
+            serialized_fn.map_state_read_cache_size,
+            serialized_fn.map_state_write_cache_size,
+        )
+    else:
+        operator_state_backend = None

Review Comment:
   When `serialized_fn` is not a instance of `flink_fn_execution_pb2.UserDefinedDataStreamFunction`
   



##########
flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapper;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Co-Broadcast-Process operation, which will be
+ * translated into different operations by @{link}.

Review Comment:
   ```suggestion
    * translated into different operations by @{link PythonBroadcastStateTransformationTranslator}.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19878: [FLINK-27584][python] Support non-keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19878:
URL: https://github.com/apache/flink/pull/19878#issuecomment-1146988254

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "abf3ea2bd984da12b61ae926ff60367c29ccbfe4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "abf3ea2bd984da12b61ae926ff60367c29ccbfe4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * abf3ea2bd984da12b61ae926ff60367c29ccbfe4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #19878: [FLINK-27584][python] Support non-keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #19878:
URL: https://github.com/apache/flink/pull/19878#discussion_r897928475


##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -2277,6 +2309,130 @@ def _is_keyed_stream(self):
         return isinstance(self.stream1, KeyedStream) and isinstance(self.stream2, KeyedStream)
 
 
+class BroadcastStream(object):
+    """
+    A BroadcastStream is a stream with :class:`state.BroadcastState` (s). This can be created by any
+    stream using the :meth:`DataStream.broadcast` method and implicitly creates states where the
+    user can store elements of the created :class:`BroadcastStream`. (see
+    :class:`BroadcastConnectedStream`).
+    Note that no further operation can be applied to these streams. The only available option is
+    to connect them with a keyed or non-keyed stream, using the :meth:`KeyedStream.connect` and the
+    :meth:`DataStream.connect` respectively. Applying these methods will result it a
+    :class:`BroadcastConnectedStream` for further processing.
+    """
+
+    def __init__(
+        self,
+        input_stream: "DataStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.input_stream = input_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+
+class BroadcastConnectedStream(object):
+    """
+    A BroadcastConnectedStream represents the result of connecting a keyed or non-keyed stream, with
+    a :class:`BroadcastStream` with :class:`~state.BroadcastState` (s). As in the case of
+    :class:`ConnectedStreams` these streams are useful for cases where operations on one stream
+    directly affect the operations on the other stream, usually via shared state between the
+    streams.
+    An example for the use of such connected streams would be to apply rules that change over time
+    onto another, possibly keyed stream. The stream with the broadcast state has the rules, and will
+    store them in the broadcast state, while the other stream will contain the elements to apply the
+    rules to. By broadcasting the rules, these will be available in all parallel instances, and can
+    be applied to all partitions of the other stream.
+    """
+
+    def __init__(
+        self,
+        non_broadcast_stream: "DataStream",
+        broadcast_stream: "BroadcastStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.non_broadcast_stream = non_broadcast_stream
+        self.broadcast_stream = broadcast_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+    def process(
+        self,
+        func: BroadcastProcessFunction,
+        output_type: TypeInformation = None,
+    ) -> "DataStream":
+        """
+        Assumes as inputs a :class:`BroadcastStream` and a :class:`DataStream` or
+        :class:`KeyedStream` and applies the given :class:`BroadcastProcessFunction` or on them,
+        thereby creating a transformed output stream.
+
+        :param func: The :class:`BroadcastProcessFunction` that is called for each element in the
+            stream.
+        :param output_type: The type of the output elements, should be
+            :class:`common.TypeInformation` or list (implicit :class:`RowTypeInfo`) or None (
+            implicit :meth:`Types.PICKLED_BYTE_ARRAY`).
+        :return: The transformed :class:`DataStream`.
+        """
+        if isinstance(func, BroadcastProcessFunction) and self._is_keyed_stream():
+            raise TypeError("BroadcastProcessFunction should be applied to non-keyed DataStream")
+
+        j_input_transformation1 = self.non_broadcast_stream._j_data_stream.getTransformation()
+        j_input_transformation2 = (
+            self.broadcast_stream.input_stream._j_data_stream.getTransformation()
+        )
+
+        if output_type is None:
+            output_type_info = Types.PICKLED_BYTE_ARRAY()  # type: TypeInformation
+        elif isinstance(output_type, list):
+            output_type_info = RowTypeInfo(output_type)
+        elif isinstance(output_type, TypeInformation):
+            output_type_info = output_type
+        else:
+            raise TypeError("output_type must be None, list or TypeInformation")
+        j_output_type = output_type_info.get_java_type_info()
+
+        from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
+
+        func_type = UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS  # type: ignore
+        func_name = "Co-Process-Broadcast"
+
+        gateway = get_gateway()
+        JPythonBroadcastStateTransformation = (
+            gateway.jvm.org.apache.flink.streaming.api.transformations.python
+        ).PythonBroadcastStateTransformation
+        j_state_names = ListConverter().convert(

Review Comment:
   Actually no, but the base class of `PythonBroadcastStateTransformation`, `AbstractBroadcastStateTransformation` needs this for constructor, so this's just for expected behavior of the base class, althrough in PyFlink we are not restricting which broadcast states can be accessed during runtime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org