You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/18 08:15:51 UTC

[GitHub] [flink] dianfu commented on a diff in pull request #19453: [FLINK-26477][python] Support side output in PyFlink DataStream API

dianfu commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r851915271


##########
flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java:
##########
@@ -249,6 +249,19 @@ private static ChainInfo chainWithInputIfPossible(
             }
         }
 
+        if (transform instanceof SideOutputTransformation) {

Review Comment:
   Why putting it in the method chainWithInputIfPossible? It has two problems:
   - It seems that it has nothing to do with the chaining optimization. It doesn't make sense to put them into one method
   - If chaining optimization is disabled(python.operator-chaining.enabled), this code will not be executed at all.



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java:
##########
@@ -97,6 +122,37 @@ public boolean containsPartitionCustom() {
         return this.containsPartitionCustom;
     }
 
+    public void addSideOutputTag(OutputTag<?> outputTag) {
+        sideOutputTags.put(outputTag.getId(), outputTag);
+    }
+
+    protected Map<String, FlinkFnApi.CoderInfoDescriptor> createSideOutputCoderDescriptors() {
+        Map<String, FlinkFnApi.CoderInfoDescriptor> descriptorMap = new HashMap<>();
+        for (Map.Entry<String, OutputTag<?>> entry : sideOutputTags.entrySet()) {
+            descriptorMap.put(
+                    entry.getKey(),
+                    createRawTypeCoderInfoDescriptorProto(
+                            getSideOutputTypeInfo(entry.getValue()),
+                            FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE,
+                            false));
+        }
+        return descriptorMap;
+    }
+
+    protected OutputTag<?> getOutputTagById(String id) {
+        Preconditions.checkArgument(sideOutputTags.containsKey(id));
+        return sideOutputTags.get(id);
+    }
+
+    protected TypeInformation<Row> getSideOutputTypeInfo(OutputTag<?> outputTag) {

Review Comment:
   ```suggestion
       private TypeInformation<Row> getSideOutputTypeInfo(OutputTag<?> outputTag) {
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -131,6 +133,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 
     protected final FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor;
     protected final FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor;
+    protected final Map<String, FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors;

Review Comment:
   Personally I think it's fine. Side output could also been seen as an important abstraction of the execution mode.



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java:
##########
@@ -55,13 +63,30 @@
     /** The TypeInformation of output data. */
     private final TypeInformation<OUT> outputTypeInfo;
 
+    private final Map<String, OutputTag<?>> sideOutputTags;
+
+    private transient Map<String, TypeSerializer<Row>> sideOutputSerializers;
+
     public AbstractDataStreamPythonFunctionOperator(
             Configuration config,
             DataStreamPythonFunctionInfo pythonFunctionInfo,
             TypeInformation<OUT> outputTypeInfo) {
         super(config);
         this.pythonFunctionInfo = Preconditions.checkNotNull(pythonFunctionInfo);
         this.outputTypeInfo = Preconditions.checkNotNull(outputTypeInfo);
+        sideOutputTags = new HashMap<>();

Review Comment:
   Move it into open?



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/collector/RunnerOutputCollector.java:
##########
@@ -47,6 +49,16 @@ public void collect(Row runnerOutput) {
         collector.collect((OUT) runnerOutput.getField(1));
     }
 
+    @SuppressWarnings("unchecked")
+    public <X> void collect(Row runnerOutput, OutputTag<X> outputTag) {
+        long ts = (long) runnerOutput.getField(0);
+        if (ts != Long.MIN_VALUE) {
+            collector.collect(outputTag, new StreamRecord<>((X) (runnerOutput.getField(1)), ts));

Review Comment:
   Creating a reusable instance variable to avoid creating StreamRecord for each record.



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/collector/RunnerOutputCollector.java:
##########
@@ -47,6 +49,16 @@ public void collect(Row runnerOutput) {
         collector.collect((OUT) runnerOutput.getField(1));
     }
 
+    @SuppressWarnings("unchecked")
+    public <X> void collect(Row runnerOutput, OutputTag<X> outputTag) {

Review Comment:
   ```suggestion
       public <X> void collect(OutputTag<X> outputTag, Row runnerOutput) {
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java:
##########
@@ -163,24 +161,89 @@ protected void buildTransforms(RunnerApi.Components.Builder componentsBuilder) {
                 transformBuilder.putInputs(MAIN_INPUT_NAME, COLLECTION_PREFIX + (i - 1));
             }
 
-            // prepare outputs
-            if (i == userDefinedDataStreamFunctions.size()) {
-                transformBuilder.putOutputs(MAIN_OUTPUT_NAME, OUTPUT_COLLECTION_ID);
-            } else {
-                transformBuilder.putOutputs(MAIN_OUTPUT_NAME, COLLECTION_PREFIX + i);
-
-                componentsBuilder
-                        .putPcollections(
-                                COLLECTION_PREFIX + i,
-                                RunnerApi.PCollection.newBuilder()
-                                        .setWindowingStrategyId(WINDOW_STRATEGY)
-                                        .setCoderId(CODER_PREFIX + i)
-                                        .build())
-                        .putCoders(CODER_PREFIX + i, createCoderProto(inputCoderDescriptor));
+            // prepare side outputs
+            if (i == userDefinedDataStreamFunctions.size() - 1
+                    && sideOutputCoderDescriptors != null) {
+                for (Map.Entry<String, FlinkFnApi.CoderInfoDescriptor> entry :
+                        sideOutputCoderDescriptors.entrySet()) {
+                    String reviseCollectionId = COLLECTION_PREFIX + "revise-" + entry.getKey();
+                    String reviseCoderId = CODER_PREFIX + "revise-" + entry.getKey();
+                    transformBuilder.putOutputs(entry.getKey(), reviseCollectionId);
+                    componentsBuilder
+                            .putPcollections(
+                                    reviseCollectionId,
+                                    RunnerApi.PCollection.newBuilder()
+                                            .setWindowingStrategyId(WINDOW_STRATEGY)
+                                            .setCoderId(reviseCoderId)
+                                            .build())
+                            .putCoders(reviseCoderId, createCoderProto(inputCoderDescriptor));
+                }
             }
+            // prepare outputs
+            transformBuilder.putOutputs(MAIN_OUTPUT_NAME, COLLECTION_PREFIX + i);
+            componentsBuilder
+                    .putPcollections(
+                            COLLECTION_PREFIX + i,
+                            RunnerApi.PCollection.newBuilder()
+                                    .setWindowingStrategyId(WINDOW_STRATEGY)
+                                    .setCoderId(CODER_PREFIX + i)
+                                    .build())
+                    .putCoders(CODER_PREFIX + i, createCoderProto(inputCoderDescriptor));
 
             componentsBuilder.putTransforms(transformName, transformBuilder.build());
         }
+
+        // Add REVISE_OUTPUT transformation for side outputs
+        if (sideOutputCoderDescriptors != null) {
+            for (Map.Entry<String, FlinkFnApi.CoderInfoDescriptor> entry :
+                    sideOutputCoderDescriptors.entrySet()) {
+                String reviseTransformId = TRANSFORM_ID_PREFIX + "revise-" + entry.getKey();
+                String reviseCollectionId = COLLECTION_PREFIX + "revise-" + entry.getKey();
+                String outputCollectionId = entry.getKey();
+                componentsBuilder.putTransforms(
+                        reviseTransformId,
+                        buildReviseTransform(
+                                reviseTransformId, reviseCollectionId, outputCollectionId));
+            }
+        }
+        // Add REVISE_OUTPUT transformation for main output
+        String reviseTransformId = TRANSFORM_ID_PREFIX + "revise";
+        componentsBuilder.putTransforms(
+                reviseTransformId,
+                buildReviseTransform(
+                        reviseTransformId,
+                        COLLECTION_PREFIX + (userDefinedDataStreamFunctions.size() - 1),
+                        OUTPUT_COLLECTION_ID));
+    }
+
+    private RunnerApi.PTransform buildReviseTransform(

Review Comment:
   There are many duplicate code with buildTransforms. Could we abstract it a bit and remove the duplicate code in buildTransforms?



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -444,6 +450,21 @@ private ExecutableStage createExecutableStage(RunnerApi.Environment environment)
                         .putCoders(OUTPUT_CODER_ID, createCoderProto(outputCoderDescriptor))
                         .putCoders(WINDOW_CODER_ID, getWindowCoderProto());
 
+        if (sideOutputCoderDescriptors != null) {

Review Comment:
   Could we use an empty map instead? It could avoid the null check which appears in many places and make the code more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org