You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/17 14:44:58 UTC

[GitHub] [flink] kl0u commented on a change in pull request #14312: [FLINK-20491] Support Broadcast State in BATCH execution mode

kl0u commented on a change in pull request #14312:
URL: https://github.com/apache/flink/pull/14312#discussion_r545134296



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
##########
@@ -205,26 +212,46 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
 			final TypeInformation<OUT> outTypeInfo) {
 
 		Preconditions.checkNotNull(function);
-		Preconditions.checkArgument(!(nonBroadcastStream instanceof KeyedStream),
+		Preconditions.checkArgument(
+				!(nonBroadcastStream instanceof KeyedStream),
 				"A BroadcastProcessFunction can only be used on a non-keyed stream.");
 
-		TwoInputStreamOperator<IN1, IN2, OUT> operator =
-				new CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
-		return transform("Co-Process-Broadcast", outTypeInfo, operator);
+		return transform("Co-Process-Broadcast", function, outTypeInfo);
 	}
 
 	@Internal
 	private <OUT> SingleOutputStreamOperator<OUT> transform(
 			final String functionName,
-			final TypeInformation<OUT> outTypeInfo,
-			final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+			BroadcastProcessFunction<IN1, IN2, OUT> userFunction,

Review comment:
       Add `final` for uniformity.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
##########
@@ -205,26 +212,46 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
 			final TypeInformation<OUT> outTypeInfo) {
 
 		Preconditions.checkNotNull(function);
-		Preconditions.checkArgument(!(nonBroadcastStream instanceof KeyedStream),
+		Preconditions.checkArgument(
+				!(nonBroadcastStream instanceof KeyedStream),
 				"A BroadcastProcessFunction can only be used on a non-keyed stream.");
 
-		TwoInputStreamOperator<IN1, IN2, OUT> operator =
-				new CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
-		return transform("Co-Process-Broadcast", outTypeInfo, operator);
+		return transform("Co-Process-Broadcast", function, outTypeInfo);
 	}
 
 	@Internal
 	private <OUT> SingleOutputStreamOperator<OUT> transform(
 			final String functionName,
-			final TypeInformation<OUT> outTypeInfo,
-			final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+			BroadcastProcessFunction<IN1, IN2, OUT> userFunction,
+			final TypeInformation<OUT> outTypeInfo) {
 
 		// read the output type of the input Transforms to coax out errors about MissingTypeInfo
 		nonBroadcastStream.getType();
 		broadcastStream.getType();
 
 		final BroadcastStateTransformation<IN1, IN2, OUT> transformation =
-				getBroadcastStateTransformation(functionName, outTypeInfo, operator);
+				getBroadcastStateTransformation(functionName, clean(userFunction), outTypeInfo);
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final SingleOutputStreamOperator<OUT> returnStream =
+				new SingleOutputStreamOperator(environment, transformation);
+
+		getExecutionEnvironment().addOperator(transformation);
+		return returnStream;
+	}
+
+	@Internal
+	private <OUT> SingleOutputStreamOperator<OUT> transform(
+			final String functionName,
+			KeyedBroadcastProcessFunction<?, IN1, IN2, OUT> userFunction,

Review comment:
       Add `final` for uniformity.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
##########
@@ -236,26 +263,32 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
 
 	private <OUT> BroadcastStateTransformation<IN1, IN2, OUT> getBroadcastStateTransformation(
 			final String functionName,
-			final TypeInformation<OUT> outTypeInfo,
-			final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
-
-		if (nonBroadcastStream instanceof KeyedStream) {
-			return BroadcastStateTransformation.forKeyedStream(
-					functionName,
-					(KeyedStream<IN1, ?>) nonBroadcastStream,
-					broadcastStream,
-					SimpleOperatorFactory.of(operator),
-					outTypeInfo,
-					environment.getParallelism());
-		} else {
-			return BroadcastStateTransformation.forNonKeyedStream(
-					functionName,
-					nonBroadcastStream,
-					broadcastStream,
-					SimpleOperatorFactory.of(operator),
-					outTypeInfo,
-					environment.getParallelism());
-		}
+			BroadcastProcessFunction<IN1, IN2, OUT> userFunction,
+			final TypeInformation<OUT> outTypeInfo) {
+
+		return BroadcastStateTransformation.forStream(
+				functionName,
+				nonBroadcastStream,
+				broadcastStream,
+				userFunction,
+				broadcastStateDescriptors,
+				outTypeInfo,
+				environment.getParallelism());
+	}
+
+	private <KS, OUT> KeyedBroadcastStateTransformation<KS, IN1, IN2, OUT> getBroadcastStateTransformation(
+			final String functionName,
+			KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> userFunction,

Review comment:
       Add `final` for uniformity.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/BroadcastStateTransformation.java
##########
@@ -53,111 +50,91 @@
 @Internal
 public class BroadcastStateTransformation<IN1, IN2, OUT> extends PhysicalTransformation<OUT> {

Review comment:
       The two BroadcastTransformations could have a base class without the key-related fields and the function.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
##########
@@ -236,26 +263,32 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
 
 	private <OUT> BroadcastStateTransformation<IN1, IN2, OUT> getBroadcastStateTransformation(
 			final String functionName,
-			final TypeInformation<OUT> outTypeInfo,
-			final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
-
-		if (nonBroadcastStream instanceof KeyedStream) {
-			return BroadcastStateTransformation.forKeyedStream(
-					functionName,
-					(KeyedStream<IN1, ?>) nonBroadcastStream,
-					broadcastStream,
-					SimpleOperatorFactory.of(operator),
-					outTypeInfo,
-					environment.getParallelism());
-		} else {
-			return BroadcastStateTransformation.forNonKeyedStream(
-					functionName,
-					nonBroadcastStream,
-					broadcastStream,
-					SimpleOperatorFactory.of(operator),
-					outTypeInfo,
-					environment.getParallelism());
-		}
+			BroadcastProcessFunction<IN1, IN2, OUT> userFunction,
+			final TypeInformation<OUT> outTypeInfo) {
+
+		return BroadcastStateTransformation.forStream(
+				functionName,
+				nonBroadcastStream,
+				broadcastStream,
+				userFunction,
+				broadcastStateDescriptors,
+				outTypeInfo,
+				environment.getParallelism());
+	}
+

Review comment:
       Now the two `getBroadcastStateTransformation()` methods have no additional logic and each has a single call-site, so why not inlining them in their call-sites?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/BroadcastStateTransformation.java
##########
@@ -53,111 +50,91 @@
 @Internal
 public class BroadcastStateTransformation<IN1, IN2, OUT> extends PhysicalTransformation<OUT> {
 
-	private final Transformation<IN1> nonBroadcastStream;
+	private final BroadcastProcessFunction<IN1, IN2, OUT> userFunction;
 
-	private final Transformation<IN2> broadcastStream;
+	private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
 
-	private final StreamOperatorFactory<OUT> operatorFactory;
+	private final Transformation<IN1> inputStream;
 
-	private final TypeInformation<?> stateKeyType;
+	private final Transformation<IN2> broadcastStream;
 
-	private final KeySelector<IN1, ?> keySelector;
+	private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
 
 	private BroadcastStateTransformation(
 			final String name,
 			final Transformation<IN1> inputStream,
 			final Transformation<IN2> broadcastStream,
-			final StreamOperatorFactory<OUT> operatorFactory,
-			@Nullable final TypeInformation<?> keyType,
-			@Nullable final KeySelector<IN1, ?> keySelector,
+			final BroadcastProcessFunction<IN1, IN2, OUT> userFunction,
+			final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors,
 			final TypeInformation<OUT> outTypeInfo,
 			final int parallelism) {
 		super(name, outTypeInfo, parallelism);
-		this.nonBroadcastStream = checkNotNull(inputStream);
+		this.inputStream = checkNotNull(inputStream);
 		this.broadcastStream = checkNotNull(broadcastStream);
-		this.operatorFactory = checkNotNull(operatorFactory);
+		this.userFunction = userFunction;
+		this.broadcastStateDescriptors = broadcastStateDescriptors;
 
-		this.stateKeyType = keyType;
-		this.keySelector = keySelector;
-		updateManagedMemoryStateBackendUseCase(keySelector != null);
+		updateManagedMemoryStateBackendUseCase(false /* not keyed */);
 	}
 
 	public Transformation<IN2> getBroadcastStream() {
 		return broadcastStream;
 	}
 
 	public Transformation<IN1> getNonBroadcastStream() {
-		return nonBroadcastStream;
+		return inputStream;
 	}
 
-	public StreamOperatorFactory<OUT> getOperatorFactory() {
-		return operatorFactory;
+	public BroadcastProcessFunction<IN1, IN2, OUT> getUserFunction() {
+		return userFunction;
 	}
 
-	public TypeInformation<?> getStateKeyType() {
-		return stateKeyType;
+	public List<MapStateDescriptor<?, ?>> getBroadcastStateDescriptors() {
+		return broadcastStateDescriptors;
 	}
 
-	public KeySelector<IN1, ?> getKeySelector() {
-		return keySelector;
+	public ChainingStrategy getChainingStrategy() {
+		return chainingStrategy;
 	}
 
 	@Override
-	public void setChainingStrategy(ChainingStrategy strategy) {
-		this.operatorFactory.getChainingStrategy();
+	public void setChainingStrategy(ChainingStrategy chainingStrategy) {
+		this.chainingStrategy = checkNotNull(chainingStrategy);
 	}
 
 	@Override
 	public List<Transformation<?>> getTransitivePredecessors() {
 		final List<Transformation<?>> predecessors = new ArrayList<>();
 		predecessors.add(this);
-		predecessors.add(nonBroadcastStream);
+		predecessors.add(inputStream);
 		predecessors.add(broadcastStream);
 		return predecessors;
 	}
 
 	@Override
 	public List<Transformation<?>> getInputs() {
 		final List<Transformation<?>> predecessors = new ArrayList<>();
-		predecessors.add(nonBroadcastStream);
+		predecessors.add(inputStream);
 		predecessors.add(broadcastStream);
 		return predecessors;
 	}
 
 	// ------------------------------- Static Constructors -------------------------------
 
-	public static <IN1, IN2, OUT> BroadcastStateTransformation<IN1, IN2, OUT> forNonKeyedStream(
-			final String name,
-			final DataStream<IN1> nonBroadcastStream,
-			final BroadcastStream<IN2> broadcastStream,
-			final StreamOperatorFactory<OUT> operatorFactory,
-			final TypeInformation<OUT> outTypeInfo,
-			final int parallelism) {
-		return new BroadcastStateTransformation<>(
-				name,
-				checkNotNull(nonBroadcastStream).getTransformation(),
-				checkNotNull(broadcastStream).getTransformation(),
-				operatorFactory,
-				null,
-				null,
-				outTypeInfo,
-				parallelism);
-	}
-
-	public static <IN1, IN2, OUT> BroadcastStateTransformation<IN1, IN2, OUT> forKeyedStream(
+	public static <IN1, IN2, OUT> BroadcastStateTransformation<IN1, IN2, OUT> forStream(

Review comment:
       We can make the constructor `public` now and remove the static constructors from here and from the `keyed` alternative as now we have two separate classes.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedBroadcastStateTransformation.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the transformation for the Broadcast State pattern. In a nutshell, this transformation
+ * allows to take a broadcasted (non-keyed) stream, connect it with another keyed or non-keyed
+ * stream, and apply a function on the resulting connected stream. This function will have access to
+ * all the elements that belong to the non-keyed, broadcasted side, as this is kept in Flink's
+ * state.
+ *
+ * <p>For more information see the
+ * <a href="https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html">
+ * Broadcast State Pattern documentation page</a>.
+ *
+ * @param <IN1> The type of the elements in the non-broadcasted input.
+ * @param <IN2> The type of the elements in the broadcasted input.
+ * @param <OUT> The type of the elements that result from this transformation.
+ */
+@Internal
+public class KeyedBroadcastStateTransformation<KS, IN1, IN2, OUT> extends PhysicalTransformation<OUT> {
+
+	private final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> userFunction;
+
+	private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
+
+	private final Transformation<IN1> inputStream;
+
+	private final Transformation<IN2> broadcastStream;
+
+	private final TypeInformation<?> stateKeyType;
+
+	private final KeySelector<IN1, ?> keySelector;

Review comment:
       Same here.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedBroadcastStateTransformation.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the transformation for the Broadcast State pattern. In a nutshell, this transformation
+ * allows to take a broadcasted (non-keyed) stream, connect it with another keyed or non-keyed
+ * stream, and apply a function on the resulting connected stream. This function will have access to
+ * all the elements that belong to the non-keyed, broadcasted side, as this is kept in Flink's
+ * state.
+ *
+ * <p>For more information see the
+ * <a href="https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html">
+ * Broadcast State Pattern documentation page</a>.
+ *
+ * @param <IN1> The type of the elements in the non-broadcasted input.
+ * @param <IN2> The type of the elements in the broadcasted input.
+ * @param <OUT> The type of the elements that result from this transformation.
+ */
+@Internal
+public class KeyedBroadcastStateTransformation<KS, IN1, IN2, OUT> extends PhysicalTransformation<OUT> {
+
+	private final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> userFunction;
+
+	private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
+
+	private final Transformation<IN1> inputStream;
+
+	private final Transformation<IN2> broadcastStream;
+
+	private final TypeInformation<?> stateKeyType;
+
+	private final KeySelector<IN1, ?> keySelector;
+
+	private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
+
+	private KeyedBroadcastStateTransformation(
+			final String name,
+			final Transformation<IN1> inputStream,
+			final Transformation<IN2> broadcastStream,
+			final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> userFunction,
+			final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors,
+			final TypeInformation<?> keyType,
+			final KeySelector<IN1, ?> keySelector,
+			final TypeInformation<OUT> outTypeInfo,
+			final int parallelism) {
+		super(name, outTypeInfo, parallelism);
+		this.inputStream = checkNotNull(inputStream);
+		this.broadcastStream = checkNotNull(broadcastStream);
+		this.userFunction = userFunction;
+		this.broadcastStateDescriptors = broadcastStateDescriptors;
+
+		this.stateKeyType = keyType;
+		this.keySelector = keySelector;
+		updateManagedMemoryStateBackendUseCase(true /* we have keyed state */);
+	}
+
+	public Transformation<IN2> getBroadcastStream() {
+		return broadcastStream;
+	}
+
+	public Transformation<IN1> getNonBroadcastStream() {
+		return inputStream;
+	}
+
+	public KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> getUserFunction() {
+		return userFunction;
+	}
+
+	public List<MapStateDescriptor<?, ?>> getBroadcastStateDescriptors() {
+		return broadcastStateDescriptors;
+	}
+
+	public TypeInformation<?> getStateKeyType() {
+		return stateKeyType;
+	}
+
+	public KeySelector<IN1, ?> getKeySelector() {
+		return keySelector;
+	}
+
+	public ChainingStrategy getChainingStrategy() {
+		return chainingStrategy;
+	}
+
+	@Override
+	public void setChainingStrategy(ChainingStrategy chainingStrategy) {
+		this.chainingStrategy = checkNotNull(chainingStrategy);
+	}
+
+	@Override
+	public List<Transformation<?>> getTransitivePredecessors() {
+		final List<Transformation<?>> predecessors = new ArrayList<>();
+		predecessors.add(this);
+		predecessors.add(inputStream);
+		predecessors.add(broadcastStream);
+		return predecessors;
+	}
+
+	@Override
+	public List<Transformation<?>> getInputs() {
+		final List<Transformation<?>> predecessors = new ArrayList<>();
+		predecessors.add(inputStream);
+		predecessors.add(broadcastStream);
+		return predecessors;
+	}
+
+	// ------------------------------- Static Constructors -------------------------------
+

Review comment:
       Now we can remove this and make the constructor `public`, right?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedBroadcastStateTransformation.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the transformation for the Broadcast State pattern. In a nutshell, this transformation
+ * allows to take a broadcasted (non-keyed) stream, connect it with another keyed or non-keyed
+ * stream, and apply a function on the resulting connected stream. This function will have access to
+ * all the elements that belong to the non-keyed, broadcasted side, as this is kept in Flink's
+ * state.
+ *
+ * <p>For more information see the
+ * <a href="https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html">
+ * Broadcast State Pattern documentation page</a>.
+ *
+ * @param <IN1> The type of the elements in the non-broadcasted input.
+ * @param <IN2> The type of the elements in the broadcasted input.
+ * @param <OUT> The type of the elements that result from this transformation.
+ */
+@Internal
+public class KeyedBroadcastStateTransformation<KS, IN1, IN2, OUT> extends PhysicalTransformation<OUT> {
+
+	private final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> userFunction;
+
+	private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
+
+	private final Transformation<IN1> inputStream;
+
+	private final Transformation<IN2> broadcastStream;
+
+	private final TypeInformation<?> stateKeyType;

Review comment:
       Now this can have `KS` as a generic type, right? (for type safety)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -735,4 +761,10 @@ public int hashCode() {
 			return inputEdge.hashCode();
 		}
 	}
+

Review comment:
       Why not putting it as a method in the `InputConfig` with a default implementation that returns `false`? I think that this will make the code nicer and also it will avoid checks like `inputConfig instanceof StreamConfig.NetworkInputConfig`

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -84,7 +84,7 @@
 
 	private String transformationUID;
 	private String userHash;
-	private boolean sortedInputs = false;
+	private Map<Integer, StreamConfig.InputRequirement> inputRequirements = new HashMap<>();

Review comment:
       Why not making this `final`?
   
   Also nit-pick: I find it is nicer to initialize fields in the constructor but feel free to ignore this.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BroadcastStateTransformationTranslator.java
##########
@@ -51,14 +69,18 @@
 		checkNotNull(transformation);
 		checkNotNull(context);
 
+		CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> operator = new CoBroadcastWithNonKeyedOperator<>(

Review comment:
       I like that we create the operator here as now we push further down the runtime-related stuff. In the future we could have a translator that gives, for example, not an operator but sth else.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -238,10 +231,12 @@ private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
 		}
 	}
 
+	// TODO: this method is a bit misleading, because what it really does is setting up

Review comment:
       Can't we rename it to sth more descriptive?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org