You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/30 09:26:16 UTC

[GitHub] [flink-benchmarks] pnowojski commented on a change in pull request #5: [FLINK-19884] Add benchmarks for batch-style execution for bounded keyed streams

pnowojski commented on a change in pull request #5:
URL: https://github.com/apache/flink-benchmarks/pull/5#discussion_r514945951



##########
File path: pom.xml
##########
@@ -20,13 +20,28 @@ under the License.
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 
-	<groupId>org.apache.flink.benchmark</groupId>
-	<artifactId>flink-hackathon-benchmarks</artifactId>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>benchmark</artifactId>

Review comment:
       👍  😳 

##########
File path: src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.SplittableIterator;
+
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.openjdk.jmh.annotations.Scope.Thread;
+
+/**
+ * An end to end test for sorted inputs for a keyed operator with bounded inputs.
+ */
+public class SortingBoundedInputBenchmarks extends BenchmarkBase {
+
+	protected static final int RECORDS_PER_INVOCATION = 900_000;
+
+	public static void main(String[] args) throws RunnerException {
+		Options options = new OptionsBuilder()
+				.verbosity(VerboseMode.NORMAL)
+				.include(".*" + SortingBoundedInputBenchmarks.class.getCanonicalName() + ".*")
+				.build();
+
+		new Runner(options).run();
+	}
+
+	@State(Thread)
+	public static class SortingInputContext extends FlinkEnvironmentContext {
+		@Param({"true", "false"})
+		public boolean useSingleKeyStateBackend;
+
+		@Override
+		protected Configuration createConfiguration() {
+			Configuration configuration = super.createConfiguration();
+			configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, useSingleKeyStateBackend);
+			configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+			return configuration;
+		}
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void oneInputOperator(SortingInputContext context) {
+		StreamExecutionEnvironment env = context.env;
+
+		DataStreamSource<Tuple2<Integer, byte[]>> elements = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		);
+
+		SingleOutputStreamOperator<Long> counts = elements
+			.keyBy(element -> element.f0)
+			.transform(
+				"Asserting operator",
+				BasicTypeInfo.LONG_TYPE_INFO,
+				new AssertingOperator()
+			);
+
+		counts.addSink(new DiscardingSink<>());
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void testTwoInputOperator(SortingInputContext context) {
+		StreamExecutionEnvironment env = context.env;
+
+		DataStreamSource<Tuple2<Integer, byte[]>> elements1 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 2),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		);
+
+		DataStreamSource<Tuple2<Integer, byte[]>> elements2 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 2),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		);
+		SingleOutputStreamOperator<Long> counts = elements1.connect(elements2)
+			.keyBy(
+				element -> element.f0,
+				element -> element.f0
+			)
+			.transform(
+				"Asserting operator",
+				BasicTypeInfo.LONG_TYPE_INFO,
+				new AssertingTwoInputOperator()
+			);
+
+		counts.addSink(new DiscardingSink<>());
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void testThreeInputOperator(SortingInputContext context) {
+		StreamExecutionEnvironment env = context.env;
+
+		KeyedStream<Tuple2<Integer, byte[]>, Object> elements1 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 3),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		).keyBy(el -> el.f0);
+
+		KeyedStream<Tuple2<Integer, byte[]>, Object> elements2 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 3),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		).keyBy(el -> el.f0);
+
+		KeyedStream<Tuple2<Integer, byte[]>, Object> elements3 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 3),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		).keyBy(el -> el.f0);
+
+		KeyedMultipleInputTransformation<Long> assertingTransformation = new KeyedMultipleInputTransformation<>(
+			"Asserting operator",
+			new AssertingThreeInputOperatorFactory(),
+			BasicTypeInfo.LONG_TYPE_INFO,
+			-1,
+			BasicTypeInfo.INT_TYPE_INFO
+		);
+		assertingTransformation.addInput(elements1.getTransformation(), elements1.getKeySelector());
+		assertingTransformation.addInput(elements2.getTransformation(), elements2.getKeySelector());
+		assertingTransformation.addInput(elements3.getTransformation(), elements3.getKeySelector());
+
+		env.addOperator(assertingTransformation);
+		DataStream<Long> counts = new DataStream<>(env, assertingTransformation);
+
+		counts.addSink(new DiscardingSink<>());
+	}
+
+	private static class AssertingOperator extends AbstractStreamOperator<Long>
+			implements OneInputStreamOperator<Tuple2<Integer, byte[]>, Long>, BoundedOneInput {
+		private final Set<Integer> seenKeys = new HashSet<>();
+		private long seenRecords = 0;
+		private Integer currentKey = null;
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) throws Exception {
+			this.seenRecords++;
+			Integer incomingKey = element.getValue().f0;
+			if (!Objects.equals(incomingKey, currentKey)) {
+				if (!seenKeys.add(incomingKey)) {
+					Assert.fail("Received an out of order key: " + incomingKey);
+				}
+				this.currentKey = incomingKey;
+			}
+		}
+
+		@Override
+		public void endInput() {
+			output.collect(new StreamRecord<>(seenRecords));
+		}
+	}
+
+	private static class AssertingTwoInputOperator extends AbstractStreamOperator<Long>
+			implements TwoInputStreamOperator<Tuple2<Integer, byte[]>, Tuple2<Integer, byte[]>, Long>, BoundedMultiInput {
+		private final Set<Integer> seenKeys = new HashSet<>();
+		private long seenRecords = 0;
+		private Integer currentKey = null;
+		private boolean input1Finished = false;
+		private boolean input2Finished = false;
+
+		@Override
+		public void processElement1(StreamRecord<Tuple2<Integer, byte[]>> element) {
+			processElement(element);
+		}
+
+		@Override
+		public void processElement2(StreamRecord<Tuple2<Integer, byte[]>> element) {
+			processElement(element);
+		}
+
+		private void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) {
+			this.seenRecords++;
+			Integer incomingKey = element.getValue().f0;
+			if (!Objects.equals(incomingKey, currentKey)) {
+				if (!seenKeys.add(incomingKey)) {
+					Assert.fail("Received an out of order key: " + incomingKey);
+				}
+				this.currentKey = incomingKey;
+			}
+		}
+
+		@Override
+		public void endInput(int inputId) {
+			if (inputId == 1) {
+				input1Finished = true;
+			}
+
+			if (inputId == 2) {
+				input2Finished = true;
+			}
+
+			if (input1Finished && input2Finished) {
+				output.collect(new StreamRecord<>(seenRecords));
+			}
+		}
+	}
+
+	private static class AssertingThreeInputOperator extends AbstractStreamOperatorV2<Long>
+			implements MultipleInputStreamOperator<Long>, BoundedMultiInput {
+		private final Set<Integer> seenKeys = new HashSet<>();
+		private long seenRecords = 0;
+		private Integer currentKey = null;
+		private boolean input1Finished = false;
+		private boolean input2Finished = false;
+		private boolean input3Finished = false;
+
+		public AssertingThreeInputOperator(
+				StreamOperatorParameters<Long> parameters,
+				int numberOfInputs) {
+			super(parameters, 3);
+			assert numberOfInputs == 3;
+		}
+
+		private void processElement(Tuple2<Integer, byte[]> element) {
+			this.seenRecords++;
+			Integer incomingKey = element.f0;
+			if (!Objects.equals(incomingKey, currentKey)) {
+				if (!seenKeys.add(incomingKey)) {
+					Assert.fail("Received an out of order key: " + incomingKey);
+				}
+				this.currentKey = incomingKey;
+			}
+		}
+
+		@Override
+		public void endInput(int inputId) {
+			if (inputId == 1) {
+				input1Finished = true;
+			}
+
+			if (inputId == 2) {
+				input2Finished = true;
+			}
+
+			if (inputId == 3) {
+				input3Finished = true;
+			}
+
+			if (input1Finished && input2Finished && input3Finished) {
+				output.collect(new StreamRecord<>(seenRecords));
+			}
+		}
+
+		@Override
+		public List<Input> getInputs() {
+			return Arrays.asList(
+				new SingleInput(this::processElement),
+				new SingleInput(this::processElement),
+				new SingleInput(this::processElement)
+			);
+		}
+	}
+
+	private static class AssertingThreeInputOperatorFactory implements StreamOperatorFactory<Long> {
+		@Override
+		@SuppressWarnings("unchecked")
+		public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
+			return (T) new AssertingThreeInputOperator(parameters, 3);
+		}
+
+		@Override
+		public void setChainingStrategy(ChainingStrategy strategy) {
+
+		}
+
+		@Override
+		public ChainingStrategy getChainingStrategy() {
+			return ChainingStrategy.NEVER;
+		}
+
+		@Override
+		public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+			return AssertingThreeInputOperator.class;
+		}
+	}
+
+	private static class SingleInput implements Input<Tuple2<Integer, byte[]>> {
+
+		private final Consumer<Tuple2<Integer, byte[]>> recordConsumer;
+
+		private SingleInput(Consumer<Tuple2<Integer, byte[]>> recordConsumer) {
+			this.recordConsumer = recordConsumer;
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) throws Exception {
+			recordConsumer.accept(element.getValue());
+		}
+
+		@Override
+		public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {
+
+		}
+
+		@Override
+		public void processLatencyMarker(LatencyMarker latencyMarker) {
+
+		}
+
+		@Override
+		public void setKeyContextElement(StreamRecord<Tuple2<Integer, byte[]>> record) {
+
+		}
+	}
+
+	private static class InputGenerator extends SplittableIterator<Tuple2<Integer, byte[]>> {
+
+		private final long numberOfRecords;
+		private long generatedRecords;
+		private final Random rnd = new Random();
+		private final byte[] bytes = new byte[(int) MemorySize.ofMebiBytes(1).getBytes()];

Review comment:
       With 1MB of payload, you will be mostly benchmarking serialisation code and `memcopy` on the machine, which is not what we would like to do.
   
   As you want to micro benchmark the code you have added, you should setup a benchmark that minimises other overheads. So I would be in favour of dropping this field entirely and just use single integer record. Keep in mind that the purpose of this benchmark is to catch performance regressions in the future, so we should have here the worst case scenario for which performance regression would be the most easily noticeable.

##########
File path: src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.SplittableIterator;
+
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.openjdk.jmh.annotations.Scope.Thread;
+
+/**
+ * An end to end test for sorted inputs for a keyed operator with bounded inputs.
+ */
+public class SortingBoundedInputBenchmarks extends BenchmarkBase {
+
+	protected static final int RECORDS_PER_INVOCATION = 900_000;
+
+	public static void main(String[] args) throws RunnerException {
+		Options options = new OptionsBuilder()
+				.verbosity(VerboseMode.NORMAL)
+				.include(".*" + SortingBoundedInputBenchmarks.class.getCanonicalName() + ".*")
+				.build();
+
+		new Runner(options).run();
+	}
+
+	@State(Thread)
+	public static class SortingInputContext extends FlinkEnvironmentContext {
+		@Param({"true", "false"})
+		public boolean useSingleKeyStateBackend;
+
+		@Override
+		protected Configuration createConfiguration() {
+			Configuration configuration = super.createConfiguration();
+			configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, useSingleKeyStateBackend);
+			configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+			return configuration;
+		}
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void oneInputOperator(SortingInputContext context) {
+		StreamExecutionEnvironment env = context.env;
+
+		DataStreamSource<Tuple2<Integer, byte[]>> elements = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		);
+
+		SingleOutputStreamOperator<Long> counts = elements
+			.keyBy(element -> element.f0)
+			.transform(
+				"Asserting operator",
+				BasicTypeInfo.LONG_TYPE_INFO,
+				new AssertingOperator()
+			);
+
+		counts.addSink(new DiscardingSink<>());
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void testTwoInputOperator(SortingInputContext context) {
+		StreamExecutionEnvironment env = context.env;
+
+		DataStreamSource<Tuple2<Integer, byte[]>> elements1 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 2),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		);
+
+		DataStreamSource<Tuple2<Integer, byte[]>> elements2 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 2),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		);
+		SingleOutputStreamOperator<Long> counts = elements1.connect(elements2)
+			.keyBy(
+				element -> element.f0,
+				element -> element.f0
+			)
+			.transform(
+				"Asserting operator",
+				BasicTypeInfo.LONG_TYPE_INFO,
+				new AssertingTwoInputOperator()
+			);
+
+		counts.addSink(new DiscardingSink<>());
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void testThreeInputOperator(SortingInputContext context) {
+		StreamExecutionEnvironment env = context.env;
+
+		KeyedStream<Tuple2<Integer, byte[]>, Object> elements1 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 3),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		).keyBy(el -> el.f0);
+
+		KeyedStream<Tuple2<Integer, byte[]>, Object> elements2 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 3),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		).keyBy(el -> el.f0);
+
+		KeyedStream<Tuple2<Integer, byte[]>, Object> elements3 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 3),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		).keyBy(el -> el.f0);
+
+		KeyedMultipleInputTransformation<Long> assertingTransformation = new KeyedMultipleInputTransformation<>(
+			"Asserting operator",
+			new AssertingThreeInputOperatorFactory(),
+			BasicTypeInfo.LONG_TYPE_INFO,
+			-1,
+			BasicTypeInfo.INT_TYPE_INFO
+		);
+		assertingTransformation.addInput(elements1.getTransformation(), elements1.getKeySelector());
+		assertingTransformation.addInput(elements2.getTransformation(), elements2.getKeySelector());
+		assertingTransformation.addInput(elements3.getTransformation(), elements3.getKeySelector());
+
+		env.addOperator(assertingTransformation);
+		DataStream<Long> counts = new DataStream<>(env, assertingTransformation);
+
+		counts.addSink(new DiscardingSink<>());
+	}
+
+	private static class AssertingOperator extends AbstractStreamOperator<Long>
+			implements OneInputStreamOperator<Tuple2<Integer, byte[]>, Long>, BoundedOneInput {
+		private final Set<Integer> seenKeys = new HashSet<>();
+		private long seenRecords = 0;
+		private Integer currentKey = null;
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) throws Exception {
+			this.seenRecords++;
+			Integer incomingKey = element.getValue().f0;
+			if (!Objects.equals(incomingKey, currentKey)) {
+				if (!seenKeys.add(incomingKey)) {
+					Assert.fail("Received an out of order key: " + incomingKey);
+				}
+				this.currentKey = incomingKey;
+			}
+		}
+
+		@Override
+		public void endInput() {
+			output.collect(new StreamRecord<>(seenRecords));
+		}
+	}
+
+	private static class AssertingTwoInputOperator extends AbstractStreamOperator<Long>
+			implements TwoInputStreamOperator<Tuple2<Integer, byte[]>, Tuple2<Integer, byte[]>, Long>, BoundedMultiInput {
+		private final Set<Integer> seenKeys = new HashSet<>();
+		private long seenRecords = 0;
+		private Integer currentKey = null;
+		private boolean input1Finished = false;
+		private boolean input2Finished = false;
+
+		@Override
+		public void processElement1(StreamRecord<Tuple2<Integer, byte[]>> element) {
+			processElement(element);
+		}
+
+		@Override
+		public void processElement2(StreamRecord<Tuple2<Integer, byte[]>> element) {
+			processElement(element);
+		}
+
+		private void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) {
+			this.seenRecords++;
+			Integer incomingKey = element.getValue().f0;
+			if (!Objects.equals(incomingKey, currentKey)) {
+				if (!seenKeys.add(incomingKey)) {
+					Assert.fail("Received an out of order key: " + incomingKey);
+				}
+				this.currentKey = incomingKey;
+			}
+		}
+
+		@Override
+		public void endInput(int inputId) {
+			if (inputId == 1) {
+				input1Finished = true;
+			}
+
+			if (inputId == 2) {
+				input2Finished = true;
+			}
+
+			if (input1Finished && input2Finished) {
+				output.collect(new StreamRecord<>(seenRecords));
+			}
+		}
+	}
+
+	private static class AssertingThreeInputOperator extends AbstractStreamOperatorV2<Long>
+			implements MultipleInputStreamOperator<Long>, BoundedMultiInput {
+		private final Set<Integer> seenKeys = new HashSet<>();
+		private long seenRecords = 0;
+		private Integer currentKey = null;
+		private boolean input1Finished = false;
+		private boolean input2Finished = false;
+		private boolean input3Finished = false;
+
+		public AssertingThreeInputOperator(
+				StreamOperatorParameters<Long> parameters,
+				int numberOfInputs) {
+			super(parameters, 3);
+			assert numberOfInputs == 3;
+		}
+
+		private void processElement(Tuple2<Integer, byte[]> element) {
+			this.seenRecords++;
+			Integer incomingKey = element.f0;
+			if (!Objects.equals(incomingKey, currentKey)) {
+				if (!seenKeys.add(incomingKey)) {
+					Assert.fail("Received an out of order key: " + incomingKey);
+				}
+				this.currentKey = incomingKey;
+			}
+		}
+
+		@Override
+		public void endInput(int inputId) {
+			if (inputId == 1) {
+				input1Finished = true;
+			}
+
+			if (inputId == 2) {
+				input2Finished = true;
+			}
+
+			if (inputId == 3) {
+				input3Finished = true;
+			}
+
+			if (input1Finished && input2Finished && input3Finished) {
+				output.collect(new StreamRecord<>(seenRecords));
+			}
+		}
+
+		@Override
+		public List<Input> getInputs() {
+			return Arrays.asList(
+				new SingleInput(this::processElement),
+				new SingleInput(this::processElement),
+				new SingleInput(this::processElement)
+			);
+		}
+	}
+
+	private static class AssertingThreeInputOperatorFactory implements StreamOperatorFactory<Long> {
+		@Override
+		@SuppressWarnings("unchecked")
+		public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
+			return (T) new AssertingThreeInputOperator(parameters, 3);
+		}
+
+		@Override
+		public void setChainingStrategy(ChainingStrategy strategy) {
+
+		}
+
+		@Override
+		public ChainingStrategy getChainingStrategy() {
+			return ChainingStrategy.NEVER;
+		}
+
+		@Override
+		public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+			return AssertingThreeInputOperator.class;
+		}
+	}
+
+	private static class SingleInput implements Input<Tuple2<Integer, byte[]>> {
+
+		private final Consumer<Tuple2<Integer, byte[]>> recordConsumer;
+
+		private SingleInput(Consumer<Tuple2<Integer, byte[]>> recordConsumer) {
+			this.recordConsumer = recordConsumer;
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) throws Exception {
+			recordConsumer.accept(element.getValue());
+		}
+
+		@Override
+		public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {
+
+		}
+
+		@Override
+		public void processLatencyMarker(LatencyMarker latencyMarker) {
+
+		}
+
+		@Override
+		public void setKeyContextElement(StreamRecord<Tuple2<Integer, byte[]>> record) {
+
+		}
+	}
+
+	private static class InputGenerator extends SplittableIterator<Tuple2<Integer, byte[]>> {
+
+		private final long numberOfRecords;
+		private long generatedRecords;
+		private final Random rnd = new Random();
+		private final byte[] bytes = new byte[(int) MemorySize.ofMebiBytes(1).getBytes()];
+
+		private InputGenerator(long numberOfRecords) {
+			this.numberOfRecords = numberOfRecords;
+			rnd.nextBytes(bytes);
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public Iterator<Tuple2<Integer, byte[]>>[] split(int numPartitions) {
+			long numberOfRecordsPerPartition = numberOfRecords / numPartitions;
+			long remainder = numberOfRecords % numPartitions;
+			Iterator<Tuple2<Integer, byte[]>>[] iterators = new Iterator[numPartitions];
+
+			for (int i = 0; i < numPartitions - 1; i++) {
+				iterators[i] = new InputGenerator(numberOfRecordsPerPartition);
+			}
+
+			iterators[numPartitions - 1] = new InputGenerator(numberOfRecordsPerPartition + remainder);
+
+			return iterators;
+		}
+
+		@Override
+		public int getMaximumNumberOfSplits() {
+			return (int) Math.min(numberOfRecords, Integer.MAX_VALUE);
+		}
+
+		@Override
+		public boolean hasNext() {
+			return generatedRecords < numberOfRecords;
+		}
+
+		@Override
+		public Tuple2<Integer, byte[]> next() {
+			if (hasNext()) {
+				generatedRecords++;
+				return Tuple2.of(
+					rnd.nextInt(10),

Review comment:
       isn't `rnd.nextInt()` a costly operation? There are various [tricks](https://www.reddit.com/r/java/comments/29ifny/performance_of_randomnextintn/) you could use to speed this up, but I would suggest to pre-generate a static array of random integers in the setup code. It can well be 1_000_000 of records (4MB) if you want, but probably even 1_000 will be more than enough. And here just iterate cyclicly over it over and over again.

##########
File path: src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.SplittableIterator;
+
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.openjdk.jmh.annotations.Scope.Thread;
+
+/**
+ * An end to end test for sorted inputs for a keyed operator with bounded inputs.
+ */
+public class SortingBoundedInputBenchmarks extends BenchmarkBase {
+
+	protected static final int RECORDS_PER_INVOCATION = 900_000;
+
+	public static void main(String[] args) throws RunnerException {
+		Options options = new OptionsBuilder()
+				.verbosity(VerboseMode.NORMAL)
+				.include(".*" + SortingBoundedInputBenchmarks.class.getCanonicalName() + ".*")
+				.build();
+
+		new Runner(options).run();
+	}
+
+	@State(Thread)
+	public static class SortingInputContext extends FlinkEnvironmentContext {
+		@Param({"true", "false"})
+		public boolean useSingleKeyStateBackend;
+
+		@Override
+		protected Configuration createConfiguration() {
+			Configuration configuration = super.createConfiguration();
+			configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, useSingleKeyStateBackend);
+			configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+			return configuration;
+		}
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void oneInputOperator(SortingInputContext context) {

Review comment:
       https://github.com/apache/flink-benchmarks/#naming-convention
   
   rename to `sortedOneInput`, `sortedTwoInput`, `sortedMultiInput`?

##########
File path: src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.SplittableIterator;
+
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.openjdk.jmh.annotations.Scope.Thread;
+
+/**
+ * An end to end test for sorted inputs for a keyed operator with bounded inputs.
+ */
+public class SortingBoundedInputBenchmarks extends BenchmarkBase {
+
+	protected static final int RECORDS_PER_INVOCATION = 900_000;
+
+	public static void main(String[] args) throws RunnerException {
+		Options options = new OptionsBuilder()
+				.verbosity(VerboseMode.NORMAL)
+				.include(".*" + SortingBoundedInputBenchmarks.class.getCanonicalName() + ".*")
+				.build();
+
+		new Runner(options).run();
+	}
+
+	@State(Thread)
+	public static class SortingInputContext extends FlinkEnvironmentContext {
+		@Param({"true", "false"})
+		public boolean useSingleKeyStateBackend;
+
+		@Override
+		protected Configuration createConfiguration() {
+			Configuration configuration = super.createConfiguration();
+			configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, useSingleKeyStateBackend);
+			configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+			return configuration;
+		}
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void oneInputOperator(SortingInputContext context) {
+		StreamExecutionEnvironment env = context.env;
+
+		DataStreamSource<Tuple2<Integer, byte[]>> elements = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		);
+
+		SingleOutputStreamOperator<Long> counts = elements
+			.keyBy(element -> element.f0)
+			.transform(
+				"Asserting operator",
+				BasicTypeInfo.LONG_TYPE_INFO,
+				new AssertingOperator()
+			);
+
+		counts.addSink(new DiscardingSink<>());
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void testTwoInputOperator(SortingInputContext context) {
+		StreamExecutionEnvironment env = context.env;
+
+		DataStreamSource<Tuple2<Integer, byte[]>> elements1 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 2),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		);
+
+		DataStreamSource<Tuple2<Integer, byte[]>> elements2 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 2),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		);
+		SingleOutputStreamOperator<Long> counts = elements1.connect(elements2)
+			.keyBy(
+				element -> element.f0,
+				element -> element.f0
+			)
+			.transform(
+				"Asserting operator",
+				BasicTypeInfo.LONG_TYPE_INFO,
+				new AssertingTwoInputOperator()
+			);
+
+		counts.addSink(new DiscardingSink<>());
+	}
+
+	@Benchmark
+	@OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+	public void testThreeInputOperator(SortingInputContext context) {
+		StreamExecutionEnvironment env = context.env;
+
+		KeyedStream<Tuple2<Integer, byte[]>, Object> elements1 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 3),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		).keyBy(el -> el.f0);
+
+		KeyedStream<Tuple2<Integer, byte[]>, Object> elements2 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 3),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		).keyBy(el -> el.f0);
+
+		KeyedStream<Tuple2<Integer, byte[]>, Object> elements3 = env.fromParallelCollection(
+			new InputGenerator(RECORDS_PER_INVOCATION / 3),
+			new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+		).keyBy(el -> el.f0);
+
+		KeyedMultipleInputTransformation<Long> assertingTransformation = new KeyedMultipleInputTransformation<>(
+			"Asserting operator",
+			new AssertingThreeInputOperatorFactory(),
+			BasicTypeInfo.LONG_TYPE_INFO,
+			-1,
+			BasicTypeInfo.INT_TYPE_INFO
+		);
+		assertingTransformation.addInput(elements1.getTransformation(), elements1.getKeySelector());
+		assertingTransformation.addInput(elements2.getTransformation(), elements2.getKeySelector());
+		assertingTransformation.addInput(elements3.getTransformation(), elements3.getKeySelector());
+
+		env.addOperator(assertingTransformation);
+		DataStream<Long> counts = new DataStream<>(env, assertingTransformation);
+
+		counts.addSink(new DiscardingSink<>());
+	}
+
+	private static class AssertingOperator extends AbstractStreamOperator<Long>
+			implements OneInputStreamOperator<Tuple2<Integer, byte[]>, Long>, BoundedOneInput {
+		private final Set<Integer> seenKeys = new HashSet<>();
+		private long seenRecords = 0;
+		private Integer currentKey = null;
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) throws Exception {
+			this.seenRecords++;
+			Integer incomingKey = element.getValue().f0;
+			if (!Objects.equals(incomingKey, currentKey)) {
+				if (!seenKeys.add(incomingKey)) {
+					Assert.fail("Received an out of order key: " + incomingKey);
+				}
+				this.currentKey = incomingKey;
+			}
+		}

Review comment:
       nit: maybe extract this code to a helper class `ProcessedKeysOrderAsserter` and re-use it for 1, 2 and 3 input operators?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org