You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/13 09:03:06 UTC

[GitHub] [flink] Vancior opened a new pull request, #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Vancior opened a new pull request, #19453:
URL: https://github.com/apache/flink/pull/19453

   ## What is the purpose of the change
   
   This pr introduces side output support in PyFlink DataStream API, where one can use `yield tag, data` to push data to side stream, and use `DataStream.get_side_output(tag)` to get the corresponding stream. `WindowedStream.side_output_late_data(tag)` is also supported.
   
   ## Brief change log
   
   - introduce multiple output PCollections in Beam executable stage, each assgined to the main stream or a side output stream
   - tag side output data with user tag and main stream data with default tag, dispatch data to different PCollections according to the tag
   - add `late_data_output_tag` in `WindowOperator` and yield late data with this tag
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
   - added integration tests in test_data_stream.py and test_window.py, which checks if main stream and side stream are correctly follow to different sinks
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r854718202


##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java:
##########
@@ -102,85 +106,144 @@ public BeamDataStreamPythonFunctionRunner(
 
     @Override
     protected void buildTransforms(RunnerApi.Components.Builder componentsBuilder) {
-        for (int i = 0; i < userDefinedDataStreamFunctions.size() + 1; i++) {
-            String functionUrn;
-            if (i == 0) {
-                functionUrn = headOperatorFunctionUrn;
-            } else {
-                functionUrn = STATELESS_FUNCTION_URN;
-            }
+        for (int i = 0; i < userDefinedDataStreamFunctions.size(); i++) {
 
-            FlinkFnApi.UserDefinedDataStreamFunction functionProto;
-            if (i < userDefinedDataStreamFunctions.size()) {
-                functionProto = userDefinedDataStreamFunctions.get(i);
-            } else {
-                // the last function in the operation tree is used to prune the watermark column
-                functionProto = createReviseOutputDataStreamFunctionProto();
-            }
+            final Map<String, String> outputCollectionMap = new HashMap<>();
 
-            // Use ParDoPayload as a wrapper of the actual payload as timer is only supported in
-            // ParDo
-            final RunnerApi.ParDoPayload.Builder payloadBuilder =
-                    RunnerApi.ParDoPayload.newBuilder()
-                            .setDoFn(
-                                    RunnerApi.FunctionSpec.newBuilder()
-                                            .setUrn(functionUrn)
-                                            .setPayload(
-                                                    org.apache.beam.vendor.grpc.v1p26p0.com.google
-                                                            .protobuf.ByteString.copyFrom(
-                                                            functionProto.toByteArray()))
-                                            .build());
-
-            // Timer is only available in the head operator
-            if (i == 0 && timerCoderDescriptor != null) {
-                payloadBuilder.putTimerFamilySpecs(
-                        TIMER_ID,
-                        RunnerApi.TimerFamilySpec.newBuilder()
-                                // this field is not used, always set it as event time
-                                .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
-                                .setTimerFamilyCoderId(WRAPPER_TIMER_CODER_ID)
-                                .build());
+            // Prepare side outputs
+            if (i == userDefinedDataStreamFunctions.size() - 1) {

Review Comment:
   This add outputs to local variable `outputCollectionMap` in the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19453:
URL: https://github.com/apache/flink/pull/19453#issuecomment-1097768130

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d76047371e17c0de2df12772137136b0c99588a1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d76047371e17c0de2df12772137136b0c99588a1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d76047371e17c0de2df12772137136b0c99588a1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r854719409


##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -190,7 +194,8 @@ public BeamPythonFunctionRunner(
             MemoryManager memoryManager,
             double managedMemoryFraction,
             FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor,
-            FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) {
+            FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor,
+            @Nullable Map<String, FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors) {

Review Comment:
   Table operators don't have side outputs, so I assume it's reasonable to use `null` for `sideOutputCoderDescriptors` in `BeamTablePythonFunctionRunner`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu closed pull request #19453: [FLINK-26481][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #19453: [FLINK-26481][python] Support side output in PyFlink DataStream API
URL: https://github.com/apache/flink/pull/19453


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r852939230


##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -190,7 +194,8 @@ public BeamPythonFunctionRunner(
             MemoryManager memoryManager,
             double managedMemoryFraction,
             FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor,
-            FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) {
+            FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor,
+            @Nullable Map<String, FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors) {

Review Comment:
   Remove the @Nullable annotation and use Preconditions.checkNotNull to make sure that given value is not null.



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java:
##########
@@ -102,85 +106,144 @@ public BeamDataStreamPythonFunctionRunner(
 
     @Override
     protected void buildTransforms(RunnerApi.Components.Builder componentsBuilder) {
-        for (int i = 0; i < userDefinedDataStreamFunctions.size() + 1; i++) {
-            String functionUrn;
-            if (i == 0) {
-                functionUrn = headOperatorFunctionUrn;
-            } else {
-                functionUrn = STATELESS_FUNCTION_URN;
-            }
+        for (int i = 0; i < userDefinedDataStreamFunctions.size(); i++) {
 
-            FlinkFnApi.UserDefinedDataStreamFunction functionProto;
-            if (i < userDefinedDataStreamFunctions.size()) {
-                functionProto = userDefinedDataStreamFunctions.get(i);
-            } else {
-                // the last function in the operation tree is used to prune the watermark column
-                functionProto = createReviseOutputDataStreamFunctionProto();
-            }
+            final Map<String, String> outputCollectionMap = new HashMap<>();
 
-            // Use ParDoPayload as a wrapper of the actual payload as timer is only supported in
-            // ParDo
-            final RunnerApi.ParDoPayload.Builder payloadBuilder =
-                    RunnerApi.ParDoPayload.newBuilder()
-                            .setDoFn(
-                                    RunnerApi.FunctionSpec.newBuilder()
-                                            .setUrn(functionUrn)
-                                            .setPayload(
-                                                    org.apache.beam.vendor.grpc.v1p26p0.com.google
-                                                            .protobuf.ByteString.copyFrom(
-                                                            functionProto.toByteArray()))
-                                            .build());
-
-            // Timer is only available in the head operator
-            if (i == 0 && timerCoderDescriptor != null) {
-                payloadBuilder.putTimerFamilySpecs(
-                        TIMER_ID,
-                        RunnerApi.TimerFamilySpec.newBuilder()
-                                // this field is not used, always set it as event time
-                                .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
-                                .setTimerFamilyCoderId(WRAPPER_TIMER_CODER_ID)
-                                .build());
+            // Prepare side outputs
+            if (i == userDefinedDataStreamFunctions.size() - 1) {

Review Comment:
   Does it make sense to move this out of the for loop?



##########
flink-python/pyflink/datastream/tests/test_window.py:
##########
@@ -346,6 +349,40 @@ def test_session_window_late_merge(self):
         expected = ['(hi,3)']
         self.assert_equals_sorted(expected, results)
 
+    def test_side_output_late_data(self):
+        self.env.set_parallelism(1)
+        config = Configuration(
+            j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment)
+        )
+        config.set_integer('python.fn-execution.bundle.size', 1)
+        jvm = get_gateway().jvm
+        watermark_strategy = WatermarkStrategy(
+            jvm.org.apache.flink.api.common.eventtime.WatermarkStrategy.forGenerator(
+                jvm.org.apache.flink.streaming.api.functions.python.eventtime.
+                PerElementWatermarkGenerator.getSupplier()
+            )
+        ).with_timestamp_assigner(SecondColumnTimestampAssigner())
+
+        tag = OutputTag('late-data', type_info=Types.ROW([Types.STRING(), Types.INT()]))
+        ds1 = self.env.from_collection([('a', 0), ('a', 8), ('a', 4), ('a', 6)],
+                                       type_info=Types.ROW([Types.STRING(), Types.INT()]))
+        ds2 = ds1.assign_timestamps_and_watermarks(watermark_strategy) \
+            .key_by(lambda e: e[0]) \
+            .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
+            .allowed_lateness(0) \
+            .side_output_late_data(tag) \
+            .process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))

Review Comment:
   There is some changes for CountWindowProcessFunction in the latest master. Need to rebase the PR and change this a bit.



##########
flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java:
##########
@@ -98,12 +100,15 @@ public class PythonOperatorChainingOptimizer {
      */
     @SuppressWarnings("unchecked")
     public static void apply(StreamExecutionEnvironment env) throws Exception {
+        final Field transformationsField =
+                StreamExecutionEnvironment.class.getDeclaredField("transformations");
+        transformationsField.setAccessible(true);
+        final List<Transformation<?>> transformations =
+                (List<Transformation<?>>) transformationsField.get(env);
+
+        preprocessSideOutput(transformations);

Review Comment:
   What about moving this to PythonConfigUtils?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on a diff in pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r851041415


##########
flink-python/pyflink/datastream/output_tag.py:
##########
@@ -0,0 +1,50 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.typeinfo import TypeInformation, Types, RowTypeInfo
+from pyflink.java_gateway import get_gateway
+
+
+class OutputTag(object):
+    """
+    An :class:`OutputTag` is a typed and named tag to use for tagging side outputs of an operator.
+
+    Example:
+    ::
+
+        >>> info = OutputTag("late-data", Types.TUPLE([Types.STRING(), Types.LONG()]))

Review Comment:
   Add more examples that no argument `type_info` or the input argument `type_info` is list



##########
flink-python/pyflink/datastream/output_tag.py:
##########
@@ -0,0 +1,50 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.typeinfo import TypeInformation, Types, RowTypeInfo
+from pyflink.java_gateway import get_gateway
+
+
+class OutputTag(object):
+    """
+    An :class:`OutputTag` is a typed and named tag to use for tagging side outputs of an operator.
+
+    Example:
+    ::
+
+        >>> info = OutputTag("late-data", Types.TUPLE([Types.STRING(), Types.LONG()]))
+
+    """
+
+    def __init__(self, tag_id: str, type_info: TypeInformation = None):
+        if tag_id == "":
+            raise ValueError("tag_id cannot be empty string")
+        self.tag_id = tag_id
+        if type_info is None:
+            self.type_info = Types.PICKLED_BYTE_ARRAY()
+        elif isinstance(type_info, list):
+            self.type_info = RowTypeInfo(type_info)
+        else:

Review Comment:
   check type_info is instance of TypeInformation



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -131,6 +133,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 
     protected final FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor;
     protected final FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor;
+    protected final Map<String, FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors;

Review Comment:
   It is confusing that SideOutput appears in `BeamPythonFunctionRunner`



##########
flink-python/pyflink/datastream/tests/test_data_stream.py:
##########
@@ -755,6 +756,134 @@ def process_element(self, value, ctx):
         expected = ['(1,hi)', '(2,hello)', '(4,hi)', '(6,hello)', '(9,hi)', '(12,hello)']
         self.assert_equals_sorted(expected, results)
 
+    def test_process_side_output(self):

Review Comment:
   Do we need to add a test that use multiple OutputTags?



##########
flink-python/pyflink/datastream/output_tag.py:
##########
@@ -0,0 +1,50 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.typeinfo import TypeInformation, Types, RowTypeInfo
+from pyflink.java_gateway import get_gateway
+
+
+class OutputTag(object):
+    """
+    An :class:`OutputTag` is a typed and named tag to use for tagging side outputs of an operator.
+
+    Example:
+    ::
+
+        >>> info = OutputTag("late-data", Types.TUPLE([Types.STRING(), Types.LONG()]))
+
+    """
+
+    def __init__(self, tag_id: str, type_info: TypeInformation = None):
+        if tag_id == "":
+            raise ValueError("tag_id cannot be empty string")

Review Comment:
   ```suggestion
               raise ValueError("OutputTag tag_id cannot be None or empty string")
   ```



##########
flink-python/pyflink/datastream/output_tag.py:
##########
@@ -0,0 +1,50 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.typeinfo import TypeInformation, Types, RowTypeInfo
+from pyflink.java_gateway import get_gateway
+
+
+class OutputTag(object):
+    """
+    An :class:`OutputTag` is a typed and named tag to use for tagging side outputs of an operator.
+
+    Example:
+    ::
+
+        >>> info = OutputTag("late-data", Types.TUPLE([Types.STRING(), Types.LONG()]))
+
+    """
+
+    def __init__(self, tag_id: str, type_info: TypeInformation = None):
+        if tag_id == "":
+            raise ValueError("tag_id cannot be empty string")
+        self.tag_id = tag_id
+        if type_info is None:
+            self.type_info = Types.PICKLED_BYTE_ARRAY()
+        elif isinstance(type_info, list):
+            self.type_info = RowTypeInfo(type_info)
+        else:
+            self.type_info = type_info
+
+    def get_java_output_tag(self):
+        gateway = get_gateway()
+        j_obj = gateway.jvm.org.apache.flink.util.OutputTag(self.tag_id,
+                                                            self.type_info.get_java_type_info())
+        # deal with serializability
+        self.type_info._j_typeinfo = None

Review Comment:
   What's the reason for doing this?



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -444,6 +450,21 @@ private ExecutableStage createExecutableStage(RunnerApi.Environment environment)
                         .putCoders(OUTPUT_CODER_ID, createCoderProto(outputCoderDescriptor))
                         .putCoders(WINDOW_CODER_ID, getWindowCoderProto());
 
+        if (sideOutputCoderDescriptors != null) {

Review Comment:
   Is it a better choice to put this part of the logic in `BeamDataStreamFunctionRunner`?
   



##########
flink-python/pyflink/datastream/output_tag.py:
##########
@@ -0,0 +1,50 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.typeinfo import TypeInformation, Types, RowTypeInfo
+from pyflink.java_gateway import get_gateway
+
+
+class OutputTag(object):
+    """
+    An :class:`OutputTag` is a typed and named tag to use for tagging side outputs of an operator.
+
+    Example:
+    ::
+
+        >>> info = OutputTag("late-data", Types.TUPLE([Types.STRING(), Types.LONG()]))

Review Comment:
   Add example about wrong usage that input argument tag_id is None or empty string



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -165,10 +168,10 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
     private transient RemoteBundle remoteBundle;
 
     /** The Python function execution result tuple: (raw bytes, length). */

Review Comment:
   change the notes.



##########
flink-python/pyflink/datastream/output_tag.py:
##########
@@ -0,0 +1,50 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.typeinfo import TypeInformation, Types, RowTypeInfo
+from pyflink.java_gateway import get_gateway
+
+
+class OutputTag(object):
+    """
+    An :class:`OutputTag` is a typed and named tag to use for tagging side outputs of an operator.
+
+    Example:
+    ::
+
+        >>> info = OutputTag("late-data", Types.TUPLE([Types.STRING(), Types.LONG()]))
+
+    """
+
+    def __init__(self, tag_id: str, type_info: TypeInformation = None):
+        if tag_id == "":

Review Comment:
   ```suggestion
           if not tag_id:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
Vancior commented on PR #19453:
URL: https://github.com/apache/flink/pull/19453#issuecomment-1101067079

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r851847821


##########
flink-python/pyflink/datastream/output_tag.py:
##########
@@ -0,0 +1,50 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.typeinfo import TypeInformation, Types, RowTypeInfo
+from pyflink.java_gateway import get_gateway
+
+
+class OutputTag(object):
+    """
+    An :class:`OutputTag` is a typed and named tag to use for tagging side outputs of an operator.
+
+    Example:
+    ::
+
+        >>> info = OutputTag("late-data", Types.TUPLE([Types.STRING(), Types.LONG()]))
+
+    """
+
+    def __init__(self, tag_id: str, type_info: TypeInformation = None):
+        if tag_id == "":
+            raise ValueError("tag_id cannot be empty string")
+        self.tag_id = tag_id
+        if type_info is None:
+            self.type_info = Types.PICKLED_BYTE_ARRAY()
+        elif isinstance(type_info, list):
+            self.type_info = RowTypeInfo(type_info)
+        else:
+            self.type_info = type_info
+
+    def get_java_output_tag(self):
+        gateway = get_gateway()
+        j_obj = gateway.jvm.org.apache.flink.util.OutputTag(self.tag_id,
+                                                            self.type_info.get_java_type_info())
+        # deal with serializability
+        self.type_info._j_typeinfo = None

Review Comment:
   A tag might be serialized into a UDF after getting its java type info, and the serialization would fail since it contains a py4j reference which has a thread lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r851915271


##########
flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java:
##########
@@ -249,6 +249,19 @@ private static ChainInfo chainWithInputIfPossible(
             }
         }
 
+        if (transform instanceof SideOutputTransformation) {

Review Comment:
   Why putting it in the method chainWithInputIfPossible? It has two problems:
   - It seems that it has nothing to do with the chaining optimization. It doesn't make sense to put them into one method
   - If chaining optimization is disabled(python.operator-chaining.enabled), this code will not be executed at all.



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java:
##########
@@ -97,6 +122,37 @@ public boolean containsPartitionCustom() {
         return this.containsPartitionCustom;
     }
 
+    public void addSideOutputTag(OutputTag<?> outputTag) {
+        sideOutputTags.put(outputTag.getId(), outputTag);
+    }
+
+    protected Map<String, FlinkFnApi.CoderInfoDescriptor> createSideOutputCoderDescriptors() {
+        Map<String, FlinkFnApi.CoderInfoDescriptor> descriptorMap = new HashMap<>();
+        for (Map.Entry<String, OutputTag<?>> entry : sideOutputTags.entrySet()) {
+            descriptorMap.put(
+                    entry.getKey(),
+                    createRawTypeCoderInfoDescriptorProto(
+                            getSideOutputTypeInfo(entry.getValue()),
+                            FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE,
+                            false));
+        }
+        return descriptorMap;
+    }
+
+    protected OutputTag<?> getOutputTagById(String id) {
+        Preconditions.checkArgument(sideOutputTags.containsKey(id));
+        return sideOutputTags.get(id);
+    }
+
+    protected TypeInformation<Row> getSideOutputTypeInfo(OutputTag<?> outputTag) {

Review Comment:
   ```suggestion
       private TypeInformation<Row> getSideOutputTypeInfo(OutputTag<?> outputTag) {
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -131,6 +133,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 
     protected final FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor;
     protected final FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor;
+    protected final Map<String, FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors;

Review Comment:
   Personally I think it's fine. Side output could also been seen as an important abstraction of the execution mode.



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java:
##########
@@ -55,13 +63,30 @@
     /** The TypeInformation of output data. */
     private final TypeInformation<OUT> outputTypeInfo;
 
+    private final Map<String, OutputTag<?>> sideOutputTags;
+
+    private transient Map<String, TypeSerializer<Row>> sideOutputSerializers;
+
     public AbstractDataStreamPythonFunctionOperator(
             Configuration config,
             DataStreamPythonFunctionInfo pythonFunctionInfo,
             TypeInformation<OUT> outputTypeInfo) {
         super(config);
         this.pythonFunctionInfo = Preconditions.checkNotNull(pythonFunctionInfo);
         this.outputTypeInfo = Preconditions.checkNotNull(outputTypeInfo);
+        sideOutputTags = new HashMap<>();

Review Comment:
   Move it into open?



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/collector/RunnerOutputCollector.java:
##########
@@ -47,6 +49,16 @@ public void collect(Row runnerOutput) {
         collector.collect((OUT) runnerOutput.getField(1));
     }
 
+    @SuppressWarnings("unchecked")
+    public <X> void collect(Row runnerOutput, OutputTag<X> outputTag) {
+        long ts = (long) runnerOutput.getField(0);
+        if (ts != Long.MIN_VALUE) {
+            collector.collect(outputTag, new StreamRecord<>((X) (runnerOutput.getField(1)), ts));

Review Comment:
   Creating a reusable instance variable to avoid creating StreamRecord for each record.



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/collector/RunnerOutputCollector.java:
##########
@@ -47,6 +49,16 @@ public void collect(Row runnerOutput) {
         collector.collect((OUT) runnerOutput.getField(1));
     }
 
+    @SuppressWarnings("unchecked")
+    public <X> void collect(Row runnerOutput, OutputTag<X> outputTag) {

Review Comment:
   ```suggestion
       public <X> void collect(OutputTag<X> outputTag, Row runnerOutput) {
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java:
##########
@@ -163,24 +161,89 @@ protected void buildTransforms(RunnerApi.Components.Builder componentsBuilder) {
                 transformBuilder.putInputs(MAIN_INPUT_NAME, COLLECTION_PREFIX + (i - 1));
             }
 
-            // prepare outputs
-            if (i == userDefinedDataStreamFunctions.size()) {
-                transformBuilder.putOutputs(MAIN_OUTPUT_NAME, OUTPUT_COLLECTION_ID);
-            } else {
-                transformBuilder.putOutputs(MAIN_OUTPUT_NAME, COLLECTION_PREFIX + i);
-
-                componentsBuilder
-                        .putPcollections(
-                                COLLECTION_PREFIX + i,
-                                RunnerApi.PCollection.newBuilder()
-                                        .setWindowingStrategyId(WINDOW_STRATEGY)
-                                        .setCoderId(CODER_PREFIX + i)
-                                        .build())
-                        .putCoders(CODER_PREFIX + i, createCoderProto(inputCoderDescriptor));
+            // prepare side outputs
+            if (i == userDefinedDataStreamFunctions.size() - 1
+                    && sideOutputCoderDescriptors != null) {
+                for (Map.Entry<String, FlinkFnApi.CoderInfoDescriptor> entry :
+                        sideOutputCoderDescriptors.entrySet()) {
+                    String reviseCollectionId = COLLECTION_PREFIX + "revise-" + entry.getKey();
+                    String reviseCoderId = CODER_PREFIX + "revise-" + entry.getKey();
+                    transformBuilder.putOutputs(entry.getKey(), reviseCollectionId);
+                    componentsBuilder
+                            .putPcollections(
+                                    reviseCollectionId,
+                                    RunnerApi.PCollection.newBuilder()
+                                            .setWindowingStrategyId(WINDOW_STRATEGY)
+                                            .setCoderId(reviseCoderId)
+                                            .build())
+                            .putCoders(reviseCoderId, createCoderProto(inputCoderDescriptor));
+                }
             }
+            // prepare outputs
+            transformBuilder.putOutputs(MAIN_OUTPUT_NAME, COLLECTION_PREFIX + i);
+            componentsBuilder
+                    .putPcollections(
+                            COLLECTION_PREFIX + i,
+                            RunnerApi.PCollection.newBuilder()
+                                    .setWindowingStrategyId(WINDOW_STRATEGY)
+                                    .setCoderId(CODER_PREFIX + i)
+                                    .build())
+                    .putCoders(CODER_PREFIX + i, createCoderProto(inputCoderDescriptor));
 
             componentsBuilder.putTransforms(transformName, transformBuilder.build());
         }
+
+        // Add REVISE_OUTPUT transformation for side outputs
+        if (sideOutputCoderDescriptors != null) {
+            for (Map.Entry<String, FlinkFnApi.CoderInfoDescriptor> entry :
+                    sideOutputCoderDescriptors.entrySet()) {
+                String reviseTransformId = TRANSFORM_ID_PREFIX + "revise-" + entry.getKey();
+                String reviseCollectionId = COLLECTION_PREFIX + "revise-" + entry.getKey();
+                String outputCollectionId = entry.getKey();
+                componentsBuilder.putTransforms(
+                        reviseTransformId,
+                        buildReviseTransform(
+                                reviseTransformId, reviseCollectionId, outputCollectionId));
+            }
+        }
+        // Add REVISE_OUTPUT transformation for main output
+        String reviseTransformId = TRANSFORM_ID_PREFIX + "revise";
+        componentsBuilder.putTransforms(
+                reviseTransformId,
+                buildReviseTransform(
+                        reviseTransformId,
+                        COLLECTION_PREFIX + (userDefinedDataStreamFunctions.size() - 1),
+                        OUTPUT_COLLECTION_ID));
+    }
+
+    private RunnerApi.PTransform buildReviseTransform(

Review Comment:
   There are many duplicate code with buildTransforms. Could we abstract it a bit and remove the duplicate code in buildTransforms?



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -444,6 +450,21 @@ private ExecutableStage createExecutableStage(RunnerApi.Environment environment)
                         .putCoders(OUTPUT_CODER_ID, createCoderProto(outputCoderDescriptor))
                         .putCoders(WINDOW_CODER_ID, getWindowCoderProto());
 
+        if (sideOutputCoderDescriptors != null) {

Review Comment:
   Could we use an empty map instead? It could avoid the null check which appears in many places and make the code more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r851850261


##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -444,6 +450,21 @@ private ExecutableStage createExecutableStage(RunnerApi.Environment environment)
                         .putCoders(OUTPUT_CODER_ID, createCoderProto(outputCoderDescriptor))
                         .putCoders(WINDOW_CODER_ID, getWindowCoderProto());
 
+        if (sideOutputCoderDescriptors != null) {

Review Comment:
   Some logic in `createExecutableStage` and `createValueOnlyWireCoderSetting` rely on this field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org