You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/04 13:22:28 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #14312: [FLINK-20491] Support Broadcast State in BATCH execution mode

dawidwys commented on a change in pull request #14312:
URL: https://github.com/apache/flink/pull/14312#discussion_r551228929



##########
File path: docs/dev/datastream_execution_mode.md
##########
@@ -237,6 +237,35 @@ next key.
 See [FLIP-140](https://cwiki.apache.org/confluence/x/kDh4CQ) for background
 information on this.
 
+### Order of Processing
+
+The order in which records are processed in operators or user-defined functions (UDFs) can differ between `BATCH` and `STREAMING` execution.
+
+In `STREAMING` mode, user-defined functions should not make any assumptions about incoming records' order.
+Data is processed as soon as it arrives.
+
+In `BATCH` execution mode, there are some operations where Flink guarantees order. 
+The ordering can be a side effect of the particular task scheduling,
+network shuffle, and state backend (see above), or a conscious choice by the system.
+
+There are three general types of input that we can differentiate:
+
+- _broadcast input_: input from a broadcast stream (see also [Broadcast
+  State]({% link dev/stream/state/broadcast_state.md %}))
+- _regular input_: input that isn't any of the above types of input

Review comment:
       shouldn't it be below the keyed input?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
##########
@@ -318,23 +346,40 @@ private InputStatus addNextToQueue(HeadElement reuse, DataOutput<IN> output) thr
      * all sorting inputs. Should be used by the {@link StreamInputProcessor} to choose the next
      * input to consume from.
      */
-    private static class InputSelector implements InputSelectable {
+    private static class InputSelector implements InputSelectable, BoundedMultiInput {
 
         private final CommonContext commonContext;
-        private final int numberOfInputs;
+        private final int numInputs;
+        private final ConcurrentLinkedQueue<Integer> passThroughInputsIndices;

Review comment:
       I'd rather not use the concurrent collection. Unless you are sure it is required. If so I'd put a comment why is it so.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
##########
@@ -107,29 +112,38 @@ private MultiInputSortingDataInput(
      */
     public static class SelectableSortingInputs {

Review comment:
       I think it would make sense to run the benchmarks over the change to see if we have a performance penalty, especially in the https://github.com/apache/flink-benchmarks/blob/0843ba6c8e2e8c856d4da333e546c3077220c7c8/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java#L157
   
   (there is some instruction how to run the benchmarks)
   https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115511847

##########
File path: flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java
##########
@@ -180,6 +191,107 @@ public void batchSumSingleResultPerKey() throws Exception {
         }
     }
 
+    /** Verifies that all broadcast input is processed before keyed input. */
+    @Test
+    public void batchKeyedBroadcastExecution() throws Exception {

Review comment:
       nit: How about we test with at least two keys?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -142,6 +148,7 @@ public static StreamMultipleInputProcessor create(
                                                             idx, userClassloader))
                                     .toArray(TypeSerializer[]::new),
                             streamConfig.getStateKeySerializer(userClassloader),
+                            new StreamTaskInput[0],

Review comment:
       This is wrong, right?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
##########
@@ -90,24 +94,51 @@
 
         InputSelectable inputSelectable =
                 streamOperator instanceof InputSelectable ? (InputSelectable) streamOperator : null;
-        if (streamConfig.shouldSortInputs()) {
+
+        // this is a bit verbose because we're manually handling input1 and input2
+        // TODO: extract method
+        StreamConfig.InputConfig[] inputConfigs = streamConfig.getInputs(userClassloader);
+        boolean input1IsSorted = requiresSorting(inputConfigs[0]);
+        boolean input2IsSorted = requiresSorting(inputConfigs[1]);
+
+        if (input1IsSorted || input2IsSorted) {
+            // as soon as one input requires sorting we need to treat all inputs differently, to
+            // make sure that pass-through inputs have precedence
 
             if (inputSelectable != null) {
                 throw new IllegalStateException(
                         "The InputSelectable interface is not supported with sorting inputs");
             }
 
-            @SuppressWarnings("unchecked")
+            List<StreamTaskInput<?>> sortedTaskInputs = new ArrayList<>();
+            List<StreamTaskInput<?>> passThroughTaskInputs = new ArrayList<>();
+            int input1Index = 0;
+            int input2Index;
+            if (input1IsSorted) {
+                sortedTaskInputs.add(input1);
+            } else {
+                passThroughTaskInputs.add(input1);
+            }
+            if (input2IsSorted) {
+                input2Index = sortedTaskInputs.size();
+                sortedTaskInputs.add(input2);
+            } else {
+                input2Index = passThroughTaskInputs.size();
+                passThroughTaskInputs.add(input2);
+            }
+
+            @SuppressWarnings({"unchecked", "rawtypes"})

Review comment:
       nit: my IDE complains the `"rawtypes"` is unnecessary.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractBroadcastStateTransformation.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for Broadcast State transformations. In a nutshell, this transformation allows to take
+ * a broadcast (non-keyed) stream, connect it with another keyed or non-keyed stream, and apply a
+ * function on the resulting connected stream. This function will have access to all the elements
+ * that belong to the non-keyed, broadcasted side, as this is kept in Flink's state.

Review comment:
       nit: Is the sentence correct? I think it will have access to the state populated with the elements of the broadcast side. Not necessarily to all the elements or elements in general.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
##########
@@ -246,28 +246,36 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
         return returnStream;
     }
 
-    private <OUT> BroadcastStateTransformation<IN1, IN2, OUT> getBroadcastStateTransformation(
+    @Internal
+    private <KEY, OUT> SingleOutputStreamOperator<OUT> transform(
             final String functionName,
-            final TypeInformation<OUT> outTypeInfo,
-            final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
-
-        if (nonBroadcastStream instanceof KeyedStream) {
-            return BroadcastStateTransformation.forKeyedStream(
-                    functionName,
-                    (KeyedStream<IN1, ?>) nonBroadcastStream,
-                    broadcastStream,
-                    SimpleOperatorFactory.of(operator),
-                    outTypeInfo,
-                    environment.getParallelism());
-        } else {
-            return BroadcastStateTransformation.forNonKeyedStream(
-                    functionName,
-                    nonBroadcastStream,
-                    broadcastStream,
-                    SimpleOperatorFactory.of(operator),
-                    outTypeInfo,
-                    environment.getParallelism());
-        }
+            final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> userFunction,
+            final TypeInformation<OUT> outTypeInfo) {
+
+        // read the output type of the input Transforms to coax out errors about MissingTypeInfo
+        nonBroadcastStream.getType();
+        broadcastStream.getType();
+
+        KeyedStream<IN1, KEY> keyedInputStream = (KeyedStream<IN1, KEY>) nonBroadcastStream;
+
+        final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation =
+                new KeyedBroadcastStateTransformation<>(
+                        functionName,

Review comment:
       nit: How about we inline the functionName(s) in both `transform` methods.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/TwoInputTransformationTranslator.java
##########
@@ -46,7 +47,11 @@
                 transformation.getStateKeySelector1() != null
                         && transformation.getStateKeySelector2() != null;
         if (isKeyed) {

Review comment:
       I think we should change it so that we apply sorting if any of the inputs is keyed.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
##########
@@ -90,24 +94,51 @@
 
         InputSelectable inputSelectable =
                 streamOperator instanceof InputSelectable ? (InputSelectable) streamOperator : null;
-        if (streamConfig.shouldSortInputs()) {
+
+        // this is a bit verbose because we're manually handling input1 and input2
+        // TODO: extract method
+        StreamConfig.InputConfig[] inputConfigs = streamConfig.getInputs(userClassloader);
+        boolean input1IsSorted = requiresSorting(inputConfigs[0]);
+        boolean input2IsSorted = requiresSorting(inputConfigs[1]);
+
+        if (input1IsSorted || input2IsSorted) {
+            // as soon as one input requires sorting we need to treat all inputs differently, to
+            // make sure that pass-through inputs have precedence
 
             if (inputSelectable != null) {
                 throw new IllegalStateException(
                         "The InputSelectable interface is not supported with sorting inputs");
             }
 
-            @SuppressWarnings("unchecked")
+            List<StreamTaskInput<?>> sortedTaskInputs = new ArrayList<>();
+            List<StreamTaskInput<?>> passThroughTaskInputs = new ArrayList<>();
+            int input1Index = 0;

Review comment:
       It is rather a matter of preference, but I'd move the index computing down below, where it is needed. I found it a bit confusing here and the IDE complains that in line 126 the `passThroughTaskInputs.size()` is always `0`.
   
   We won't need this logic here, if we change the lines 151-160 to:
   ```StreamTaskInput<?>[] sortedInputs = selectableSortingInputs.getSortedInputs();
   StreamTaskInput<?>[] passThroughInputs = selectableSortingInputs.getPassThroughInputs();
   if (input1IsSorted) {
       input1 = toTypedInput(sortedInputs[0]);
   } else {
       input1 = toTypedInput(passThroughInputs[0]);
   }
   if (input2IsSorted) {
       input2 = toTypedInput(sortedInputs[sortedInputs.length - 1]);
   } else {
       input2 = toTypedInput(passThroughInputs[passThroughInputs.length - 1]);
   }
   ```
   
   We can base that on the assumption that the `input1` will be first in the corresponding group while the `input2` last.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org