You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/04 03:38:40 UTC

[GitHub] [flink] Vancior opened a new pull request, #20144: [FLINK-27587][python] Support keyed co-broadcast processing

Vancior opened a new pull request, #20144:
URL: https://github.com/apache/flink/pull/20144

   
   ## What is the purpose of the change
   
   This PR implements keyed broadcast processing ( `KeyedStream.connect(BroadcastStream).process(KeyedBroadcastProcessFunction)` ) in PyFlink.
   
   
   ## Brief change log
   
   - add `KeyedBroadcastProcessFunction` interface in Python
   - introduce `KeyedBroadcastStateTransformation` and its translator in compile keyed broadcast processing into `PythonKeyedCoProcessor` or `PythonBatchKeyedCoBroadcastProcessOperator`.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - integration test `test_keyed_co_broadcast_process` in test_data_stream.py
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (Python Sphinx doc)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20144: [FLINK-27587][python] Support keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20144:
URL: https://github.com/apache/flink/pull/20144#issuecomment-1173306587

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5b4fd94b639113fbbed640ba890d38f03e23d971",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b4fd94b639113fbbed640ba890d38f03e23d971",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5b4fd94b639113fbbed640ba890d38f03e23d971 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20144: [FLINK-27587][python] Support keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20144:
URL: https://github.com/apache/flink/pull/20144#discussion_r921789203


##########
flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Keyed-Co-Broadcast-Process operation, which will
+ * be translated into different operations by {@link
+ * org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator}.
+ */
+@Internal
+public class PythonKeyedBroadcastStateTransformation<OUT>

Review Comment:
   I've tried before, but there's just some class casting cannot be solved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on a diff in pull request #20144: [FLINK-27587][python] Support keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on code in PR #20144:
URL: https://github.com/apache/flink/pull/20144#discussion_r921069655


##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -56,6 +55,8 @@
 __all__ = ['CloseableIterator', 'DataStream', 'KeyedStream', 'ConnectedStreams', 'WindowedStream',
            'DataStreamSink', 'CloseableIterator', 'BroadcastStream', 'BroadcastConnectedStream']
 
+from pyflink.util.java_utils import to_jarray

Review Comment:
   move the import before `__all__`



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1648,10 +1649,16 @@ def connect(self, ds: Union['DataStream', 'BroadcastStream']) \
         DataStream outputs of (possible) different types with each other. The DataStreams connected
         using this operator can be used with CoFunctions to apply joint transformations.
 
-        Currently, connect(BroadcastStream) is not supported.
+        If ds is a :class:`BroadcastStream`, creates a new :class:`BroadcastConnectedStream` by
+        connecting the current :class:`DataStream` with a :class:`BroadcastStream`. The latter can
+        be created using the :meth:`broadcast` method. The resulting stream can be further processed
+        using the :meth:`BroadcastConnectedStream.process` method.
+
+        :param ds: The DataStream or BroadcastStream with which this stream will be connected.
+        :return: The ConnectedStreams or BroadcastConnectedStream.
 
-        :param ds: The DataStream with which this stream will be connected.
-        :return: The ConnectedStreams.
+        .. versionchanged:: 1.16.0
+            Support connect BroadcastStream

Review Comment:
   ```suggestion
              Support connect BroadcastStream
   ```



##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -1381,31 +1388,159 @@ class ReadOnlyContext(BaseBroadcastProcessFunction.ReadOnlyContext, ABC):
     def process_element(self, value: IN1, ctx: ReadOnlyContext):
         """
         This method is called for each element in the (non-broadcast) :class:`DataStream`.
+
+        This function can output zero or more elements via :code:`yield` statement, and query the
+        current processing/event time. Finally, it has read-only access to the broadcast state. The
+        context is only valid during the invocation of this method, do not store it.
+
+        :param value: The stream element.
+        :param ctx: A :class:`BroadcastProcessFunction.ReadOnlyContext` that allows querying the
+            timestamp of the element, querying the current processing/event time and reading the

Review Comment:
   ```suggestion
           :param ctx: 
               A :class:`BroadcastProcessFunction.ReadOnlyContext` that allows querying the
               timestamp of the element, querying the current processing/event time and reading the
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchKeyedCoBroadcastProcessOperator.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PythonBatchKeyedCoBroadcastProcessOperator} is responsible for executing the Python
+ * CoBroadcastProcess function under BATCH mode, {@link PythonKeyedCoProcessOperator} is used under
+ * STREAMING mode. This operator forces to run out data from broadcast side first, and then process
+ * data from regular side.
+ *
+ * @param <OUT> The output type of the CoBroadcastProcess function
+ */
+@Internal
+public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>
+        extends PythonKeyedCoProcessOperator<OUT> implements BoundedMultiInput, InputSelectable {
+
+    private transient volatile boolean isBroadcastSideDone;

Review Comment:
   ```suggestion
       private transient volatile boolean isBroadcastSideDone = false;
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Keyed-Co-Broadcast-Process operation, which will
+ * be translated into different operations by {@link
+ * org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator}.
+ */
+@Internal
+public class PythonKeyedBroadcastStateTransformation<OUT>

Review Comment:
   Could `PythonKeyedBroadcastStateTransformation` extend `PythonBroadcastStateTransformation`



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchKeyedCoBroadcastProcessOperator.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PythonBatchKeyedCoBroadcastProcessOperator} is responsible for executing the Python
+ * CoBroadcastProcess function under BATCH mode, {@link PythonKeyedCoProcessOperator} is used under
+ * STREAMING mode. This operator forces to run out data from broadcast side first, and then process
+ * data from regular side.
+ *
+ * @param <OUT> The output type of the CoBroadcastProcess function
+ */
+@Internal
+public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>

Review Comment:
   add `serialVersionUID`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo closed pull request #20144: [FLINK-27587][python] Support keyed co-broadcast processing

Posted by GitBox <gi...@apache.org>.
HuangXingBo closed pull request #20144: [FLINK-27587][python] Support keyed co-broadcast processing
URL: https://github.com/apache/flink/pull/20144


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org