You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/14 12:51:41 UTC

[GitHub] [flink] HuangXingBo commented on a diff in pull request #20144: [FLINK-27587][python] Support keyed co-broadcast processing

HuangXingBo commented on code in PR #20144:
URL: https://github.com/apache/flink/pull/20144#discussion_r921069655


##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -56,6 +55,8 @@
 __all__ = ['CloseableIterator', 'DataStream', 'KeyedStream', 'ConnectedStreams', 'WindowedStream',
            'DataStreamSink', 'CloseableIterator', 'BroadcastStream', 'BroadcastConnectedStream']
 
+from pyflink.util.java_utils import to_jarray

Review Comment:
   move the import before `__all__`



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1648,10 +1649,16 @@ def connect(self, ds: Union['DataStream', 'BroadcastStream']) \
         DataStream outputs of (possible) different types with each other. The DataStreams connected
         using this operator can be used with CoFunctions to apply joint transformations.
 
-        Currently, connect(BroadcastStream) is not supported.
+        If ds is a :class:`BroadcastStream`, creates a new :class:`BroadcastConnectedStream` by
+        connecting the current :class:`DataStream` with a :class:`BroadcastStream`. The latter can
+        be created using the :meth:`broadcast` method. The resulting stream can be further processed
+        using the :meth:`BroadcastConnectedStream.process` method.
+
+        :param ds: The DataStream or BroadcastStream with which this stream will be connected.
+        :return: The ConnectedStreams or BroadcastConnectedStream.
 
-        :param ds: The DataStream with which this stream will be connected.
-        :return: The ConnectedStreams.
+        .. versionchanged:: 1.16.0
+            Support connect BroadcastStream

Review Comment:
   ```suggestion
              Support connect BroadcastStream
   ```



##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -1381,31 +1388,159 @@ class ReadOnlyContext(BaseBroadcastProcessFunction.ReadOnlyContext, ABC):
     def process_element(self, value: IN1, ctx: ReadOnlyContext):
         """
         This method is called for each element in the (non-broadcast) :class:`DataStream`.
+
+        This function can output zero or more elements via :code:`yield` statement, and query the
+        current processing/event time. Finally, it has read-only access to the broadcast state. The
+        context is only valid during the invocation of this method, do not store it.
+
+        :param value: The stream element.
+        :param ctx: A :class:`BroadcastProcessFunction.ReadOnlyContext` that allows querying the
+            timestamp of the element, querying the current processing/event time and reading the

Review Comment:
   ```suggestion
           :param ctx: 
               A :class:`BroadcastProcessFunction.ReadOnlyContext` that allows querying the
               timestamp of the element, querying the current processing/event time and reading the
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchKeyedCoBroadcastProcessOperator.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PythonBatchKeyedCoBroadcastProcessOperator} is responsible for executing the Python
+ * CoBroadcastProcess function under BATCH mode, {@link PythonKeyedCoProcessOperator} is used under
+ * STREAMING mode. This operator forces to run out data from broadcast side first, and then process
+ * data from regular side.
+ *
+ * @param <OUT> The output type of the CoBroadcastProcess function
+ */
+@Internal
+public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>
+        extends PythonKeyedCoProcessOperator<OUT> implements BoundedMultiInput, InputSelectable {
+
+    private transient volatile boolean isBroadcastSideDone;

Review Comment:
   ```suggestion
       private transient volatile boolean isBroadcastSideDone = false;
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Keyed-Co-Broadcast-Process operation, which will
+ * be translated into different operations by {@link
+ * org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator}.
+ */
+@Internal
+public class PythonKeyedBroadcastStateTransformation<OUT>

Review Comment:
   Could `PythonKeyedBroadcastStateTransformation` extend `PythonBroadcastStateTransformation`



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchKeyedCoBroadcastProcessOperator.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PythonBatchKeyedCoBroadcastProcessOperator} is responsible for executing the Python
+ * CoBroadcastProcess function under BATCH mode, {@link PythonKeyedCoProcessOperator} is used under
+ * STREAMING mode. This operator forces to run out data from broadcast side first, and then process
+ * data from regular side.
+ *
+ * @param <OUT> The output type of the CoBroadcastProcess function
+ */
+@Internal
+public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>

Review Comment:
   add `serialVersionUID`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org