You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/01 10:32:28 UTC

[GitHub] [flink] aljoscha commented on a change in pull request #13521: [FLINK-19472] Implement a one input sorting DataInput

aljoscha commented on a change in pull request #13521:
URL: https://github.com/apache/flink/pull/13521#discussion_r498141996



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/KeyAndValueSerializer.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A serializer used in {@link SortingDataInput} for serializing elements alongside their key and
+ * timestamp. It serializes the record in a format known by the {@link FixedLengthByteKeyComparator}
+ * and {@link VariableLengthByteKeyComparator}.
+ *
+ * <p>If the key is of known constant length, the length is not serialized with the data.
+ * Therefore the serialized data is as follows:
+ *
+ * <pre>
+ *      [key-length] | &lt;key&gt; | &lt;timestamp&gt; | &lt;record&gt;
+ * </pre>
+ */
+final class KeyAndValueSerializer<IN> extends TypeSerializer<Tuple2<byte[], StreamRecord<IN>>> {
+	private final TypeSerializer<IN> valueSerializer;
+	private final int serializedKeyLength;

Review comment:
       Maybe add a comment explaining that this is either a fixed key length or negative if it is variable.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link StreamTaskInput} which sorts in the incoming records from a chained input. It postpones
+ * emitting the records until it receives {@link InputStatus#END_OF_INPUT} from the chained input.
+ * After it is done it emits a single record at a time from the sorter.
+ *
+ * <p>The sorter uses binary comparison of keys, which are extracted and serialized when received
+ * from the chained input. Moreover the timestamps of incoming records are used for secondary ordering.
+ * For the comparison it uses either {@link FixedLengthByteKeyComparator} if the length of the
+ * serialized key is constant, or {@link VariableLengthByteKeyComparator} otherwise.
+ *
+ * <p>Watermarks, stream statuses, nor latency markers are not propagated downstream as they do not make
+ * sense with buffered records. The input emits a MAX_WATERMARK after all records.
+ *
+ * @param <T> The type of the value in incoming {@link StreamRecord StreamRecords}.
+ * @param <K> The type of the key.
+ */
+public final class SortingDataInput<T, K> implements StreamTaskInput<T> {
+
+	private final StreamTaskInput<T> chained;
+	private final PushSorter<Tuple2<byte[], StreamRecord<T>>> sorter;
+	private final KeySelector<T, K> keySelector;
+	private final TypeSerializer<K> keySerializer;
+	private final DataOutputSerializer dataOutputSerializer;
+	private final ForwardingDataOutput forwardingDataOutput;
+	private MutableObjectIterator<Tuple2<byte[], StreamRecord<T>>> sortedInput = null;
+	private boolean emittedLast;
+
+	public SortingDataInput(
+			StreamTaskInput<T> chained,
+			TypeSerializer<T> typeSerializer,
+			TypeSerializer<K> keySerializer,
+			KeySelector<T, K> keySelector,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			boolean objectReuse,
+			double managedMemoryFraction,
+			Configuration jobConfiguration,
+			AbstractInvokable containingTask) {
+		try {
+			this.forwardingDataOutput = new ForwardingDataOutput();
+			this.keySelector = keySelector;
+			this.keySerializer = keySerializer;
+			int keyLength = keySerializer.getLength();
+			final TypeComparator<Tuple2<byte[], StreamRecord<T>>> comparator;
+			if (keyLength > 0) {
+				this.dataOutputSerializer = new DataOutputSerializer(keyLength);
+				comparator = new FixedLengthByteKeyComparator<>(keyLength);
+			} else {
+				this.dataOutputSerializer = new DataOutputSerializer(64);
+				comparator = new VariableLengthByteKeyComparator<>();
+			}
+			KeyAndValueSerializer<T> keyAndValueSerializer = new KeyAndValueSerializer<>(typeSerializer, keyLength);
+			this.chained = chained;
+			this.sorter = ExternalSorter.newBuilder(
+					memoryManager,
+					containingTask,
+					keyAndValueSerializer,
+					comparator)
+				.memoryFraction(managedMemoryFraction)
+				.enableSpilling(
+					ioManager,
+					jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+				.maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
+				.objectReuse(objectReuse)
+				.largeRecords(true)
+				.build();
+		} catch (MemoryAllocationException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public int getInputIndex() {
+		return chained.getInputIndex();
+	}
+
+	@Override
+	public CompletableFuture<Void> prepareSnapshot(
+			ChannelStateWriter channelStateWriter,
+			long checkpointId) throws IOException {
+		throw new UnsupportedOperationException("Checkpoints are not supported for sorting inputs");
+	}
+
+	@Override
+	public void close() throws IOException {
+		IOException ex = null;
+		try {
+			chained.close();
+		} catch (IOException e) {
+			ex = ExceptionUtils.firstOrSuppressed(e, ex);
+		}
+
+		try {
+			sorter.close();
+		} catch (IOException e) {
+			ex = ExceptionUtils.firstOrSuppressed(e, ex);
+		}
+
+		if (ex != null) {
+			throw ex;
+		}
+	}
+
+	private class ForwardingDataOutput implements DataOutput<T> {
+		@Override
+		public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
+			K key = keySelector.getKey(streamRecord.getValue());
+
+			keySerializer.serialize(key, dataOutputSerializer);
+			byte[] serializedKey = dataOutputSerializer.getCopyOfBuffer();
+			dataOutputSerializer.clear();
+
+			sorter.writeRecord(Tuple2.of(serializedKey, streamRecord));
+		}
+
+		@Override
+		public void emitWatermark(Watermark watermark) throws Exception {
+
+		}
+
+		@Override
+		public void emitStreamStatus(StreamStatus streamStatus) throws Exception {
+
+		}
+
+		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+
+		}
+	}
+
+	@Override
+	public InputStatus emitNext(DataOutput<T> output) throws Exception {
+		if (sortedInput != null) {
+			return emitNextSortedRecord(output);
+		}
+
+		InputStatus inputStatus = chained.emitNext(forwardingDataOutput);
+		if (inputStatus == InputStatus.END_OF_INPUT) {
+			endSorting();
+			return emitNextSortedRecord(output);
+		}
+
+		return inputStatus;
+	}
+
+	@Nonnull
+	private InputStatus emitNextSortedRecord(DataOutput<T> output) throws Exception {
+		if (emittedLast) {
+			return InputStatus.END_OF_INPUT;
+		}
+
+		Tuple2<byte[], StreamRecord<T>> next = sortedInput.next();
+		if (next != null) {
+			output.emitRecord(next.f1);
+			return InputStatus.MORE_AVAILABLE;
+		} else {
+			emittedLast = true;
+			output.emitWatermark(Watermark.MAX_WATERMARK);

Review comment:
       We could keep track of the input watermark and emit the maximum at the end.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
##########
@@ -55,4 +56,14 @@
 						"throughput")
 				)
 				.build());
+
+	@Documentation.ExcludeFromDocumentation("The option is considered internal and should rather not be used" +

Review comment:
       Or in the `StreamNode`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;

Review comment:
       Unused

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link StreamTaskInput} which sorts in the incoming records from a chained input. It postpones
+ * emitting the records until it receives {@link InputStatus#END_OF_INPUT} from the chained input.
+ * After it is done it emits a single record at a time from the sorter.
+ *
+ * <p>The sorter uses binary comparison of keys, which are extracted and serialized when received
+ * from the chained input. Moreover the timestamps of incoming records are used for secondary ordering.
+ * For the comparison it uses either {@link FixedLengthByteKeyComparator} if the length of the
+ * serialized key is constant, or {@link VariableLengthByteKeyComparator} otherwise.
+ *
+ * <p>Watermarks, stream statuses, nor latency markers are not propagated downstream as they do not make
+ * sense with buffered records. The input emits a MAX_WATERMARK after all records.
+ *
+ * @param <T> The type of the value in incoming {@link StreamRecord StreamRecords}.
+ * @param <K> The type of the key.
+ */
+public final class SortingDataInput<T, K> implements StreamTaskInput<T> {
+
+	private final StreamTaskInput<T> chained;

Review comment:
       I think `wrappedInput` might be a better name because `chained` often means that something is attached "behind" or "afterwards".

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/KeyAndValueSerializer.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A serializer used in {@link SortingDataInput} for serializing elements alongside their key and
+ * timestamp. It serializes the record in a format known by the {@link FixedLengthByteKeyComparator}
+ * and {@link VariableLengthByteKeyComparator}.
+ *
+ * <p>If the key is of known constant length, the length is not serialized with the data.
+ * Therefore the serialized data is as follows:
+ *
+ * <pre>
+ *      [key-length] | &lt;key&gt; | &lt;timestamp&gt; | &lt;record&gt;
+ * </pre>
+ */
+final class KeyAndValueSerializer<IN> extends TypeSerializer<Tuple2<byte[], StreamRecord<IN>>> {
+	private final TypeSerializer<IN> valueSerializer;
+	private final int serializedKeyLength;
+
+	KeyAndValueSerializer(TypeSerializer<IN> valueSerializer, int serializedKeyLength) {
+		this.valueSerializer = valueSerializer;
+		this.serializedKeyLength = serializedKeyLength;
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<Tuple2<byte[], StreamRecord<IN>>> duplicate() {
+		return new KeyAndValueSerializer<>(valueSerializer.duplicate(), this.serializedKeyLength);
+	}
+
+	@Override
+	public Tuple2<byte[], StreamRecord<IN>> copy(Tuple2<byte[], StreamRecord<IN>> from) {
+		StreamRecord<IN> fromRecord = from.f1;
+		return Tuple2.of(
+			Arrays.copyOf(from.f0, from.f0.length),
+			fromRecord.copy(valueSerializer.copy(fromRecord.getValue()))
+		);
+	}
+
+	@Override
+	public Tuple2<byte[], StreamRecord<IN>> createInstance() {
+		return Tuple2.of(new byte[0], new StreamRecord<>(valueSerializer.createInstance()));
+	}
+
+	@Override
+	public Tuple2<byte[], StreamRecord<IN>> copy(
+			Tuple2<byte[], StreamRecord<IN>> from,
+			Tuple2<byte[], StreamRecord<IN>> reuse) {
+		StreamRecord<IN> fromRecord = from.f1;
+		StreamRecord<IN> reuseRecord = reuse.f1;
+
+		IN valueCopy = valueSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
+		fromRecord.copyTo(valueCopy, reuseRecord);
+		reuse.f0 = Arrays.copyOf(from.f0, from.f0.length);
+		reuse.f1 = reuseRecord;
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(Tuple2<byte[], StreamRecord<IN>> record, DataOutputView target) throws IOException {
+		if (serializedKeyLength < 0) {
+			target.writeInt(record.f0.length);
+		}
+		target.write(record.f0);
+		StreamRecord<IN> toSerialize = record.f1;
+		target.writeLong(toSerialize.getTimestamp());
+		valueSerializer.serialize(toSerialize.getValue(), target);
+	}
+
+	@Override
+	public Tuple2<byte[], StreamRecord<IN>> deserialize(DataInputView source) throws IOException {
+		final int length = getKeyLength(source);
+		byte[] bytes = new byte[length];
+		source.read(bytes);
+		long timestamp = source.readLong();
+		IN value = valueSerializer.deserialize(source);
+		return Tuple2.of(
+			bytes,
+			new StreamRecord<>(value, timestamp)
+		);
+	}
+
+	@Override
+	public Tuple2<byte[], StreamRecord<IN>> deserialize(Tuple2<byte[], StreamRecord<IN>> reuse, DataInputView source) throws IOException {
+		final int length = getKeyLength(source);
+		byte[] bytes = new byte[length];
+		source.read(bytes);
+		long timestamp = source.readLong();
+		IN value = valueSerializer.deserialize(source);
+		StreamRecord<IN> reuseRecord = reuse.f1;
+		reuseRecord.replace(value, timestamp);
+		reuse.f0 = bytes;
+		reuse.f1 = reuseRecord;
+		return reuse;
+	}
+
+	private int getKeyLength(DataInputView source) throws IOException {
+		final int length;
+		if (serializedKeyLength < 0) {
+			length = source.readInt();
+		} else {
+			length = serializedKeyLength;
+		}
+		return length;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		final int length;
+		if (serializedKeyLength < 0) {
+			length = source.readInt();
+			target.writeInt(length);
+		} else {
+			length = serializedKeyLength;
+		}
+		for (int i = 0; i < length; i++) {
+			target.writeByte(source.readByte());
+		}
+		target.writeLong(source.readLong());
+		valueSerializer.copy(source, target);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		KeyAndValueSerializer<?> that = (KeyAndValueSerializer<?>) o;
+		return Objects.equals(valueSerializer, that.valueSerializer);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(valueSerializer);
+	}
+
+	@Override
+	public TypeSerializerSnapshot<Tuple2<byte[], StreamRecord<IN>>> snapshotConfiguration() {
+		return null;

Review comment:
       Should throw a meaningful exception because it's not meant to be used in a context where snapshots would be needed.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/KeyAndValueSerializer.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A serializer used in {@link SortingDataInput} for serializing elements alongside their key and
+ * timestamp. It serializes the record in a format known by the {@link FixedLengthByteKeyComparator}
+ * and {@link VariableLengthByteKeyComparator}.
+ *
+ * <p>If the key is of known constant length, the length is not serialized with the data.
+ * Therefore the serialized data is as follows:
+ *
+ * <pre>
+ *      [key-length] | &lt;key&gt; | &lt;timestamp&gt; | &lt;record&gt;
+ * </pre>
+ */
+final class KeyAndValueSerializer<IN> extends TypeSerializer<Tuple2<byte[], StreamRecord<IN>>> {

Review comment:
       We should have a test based on `SerializerTestBase` for this.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
##########
@@ -55,4 +56,14 @@
 						"throughput")
 				)
 				.build());
+
+	@Documentation.ExcludeFromDocumentation("The option is considered internal and should rather not be used" +

Review comment:
       I would not expose it as an option for now but only set it in the `StreamConfig`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CircularQueues.java
##########
@@ -75,6 +75,9 @@ public void send(StageRunner.SortStage stage, CircularElement<E> element) {
 
 	@Override
 	public void sendResult(MutableObjectIterator<E> result) {
+		if (iteratorFuture.isDone()) {

Review comment:
       I think this and the changes in `MergeIterator`, `SpillingThread`, and `NormalizedKeySorter` should be in a separate cleanup/refactoring commit.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/FixedLengthByteKeyComparator.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A comparator used in {@link SortingDataInput} which compares records keys and timestamps.
+ * It uses binary format produced by the {@link KeyAndValueSerializer}.
+ *
+ * <p>It assumes keys are always of a fixed length and thus the length of the record is not serialized.
+ */
+final class FixedLengthByteKeyComparator<IN> extends TypeComparator<Tuple2<byte[], StreamRecord<IN>>> {

Review comment:
       It would be good to add tests based on `ComparatorTestBase` for the new comparators.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link StreamTaskInput} which sorts in the incoming records from a chained input. It postpones
+ * emitting the records until it receives {@link InputStatus#END_OF_INPUT} from the chained input.
+ * After it is done it emits a single record at a time from the sorter.
+ *
+ * <p>The sorter uses binary comparison of keys, which are extracted and serialized when received
+ * from the chained input. Moreover the timestamps of incoming records are used for secondary ordering.
+ * For the comparison it uses either {@link FixedLengthByteKeyComparator} if the length of the
+ * serialized key is constant, or {@link VariableLengthByteKeyComparator} otherwise.
+ *
+ * <p>Watermarks, stream statuses, nor latency markers are not propagated downstream as they do not make
+ * sense with buffered records. The input emits a MAX_WATERMARK after all records.
+ *
+ * @param <T> The type of the value in incoming {@link StreamRecord StreamRecords}.
+ * @param <K> The type of the key.
+ */
+public final class SortingDataInput<T, K> implements StreamTaskInput<T> {
+
+	private final StreamTaskInput<T> chained;
+	private final PushSorter<Tuple2<byte[], StreamRecord<T>>> sorter;
+	private final KeySelector<T, K> keySelector;
+	private final TypeSerializer<K> keySerializer;
+	private final DataOutputSerializer dataOutputSerializer;
+	private final ForwardingDataOutput forwardingDataOutput;
+	private MutableObjectIterator<Tuple2<byte[], StreamRecord<T>>> sortedInput = null;
+	private boolean emittedLast;
+
+	public SortingDataInput(
+			StreamTaskInput<T> chained,
+			TypeSerializer<T> typeSerializer,
+			TypeSerializer<K> keySerializer,
+			KeySelector<T, K> keySelector,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			boolean objectReuse,
+			double managedMemoryFraction,
+			Configuration jobConfiguration,
+			AbstractInvokable containingTask) {
+		try {
+			this.forwardingDataOutput = new ForwardingDataOutput();
+			this.keySelector = keySelector;
+			this.keySerializer = keySerializer;
+			int keyLength = keySerializer.getLength();
+			final TypeComparator<Tuple2<byte[], StreamRecord<T>>> comparator;
+			if (keyLength > 0) {
+				this.dataOutputSerializer = new DataOutputSerializer(keyLength);
+				comparator = new FixedLengthByteKeyComparator<>(keyLength);
+			} else {
+				this.dataOutputSerializer = new DataOutputSerializer(64);
+				comparator = new VariableLengthByteKeyComparator<>();
+			}
+			KeyAndValueSerializer<T> keyAndValueSerializer = new KeyAndValueSerializer<>(typeSerializer, keyLength);
+			this.chained = chained;
+			this.sorter = ExternalSorter.newBuilder(
+					memoryManager,
+					containingTask,
+					keyAndValueSerializer,
+					comparator)
+				.memoryFraction(managedMemoryFraction)
+				.enableSpilling(
+					ioManager,
+					jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+				.maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
+				.objectReuse(objectReuse)
+				.largeRecords(true)
+				.build();
+		} catch (MemoryAllocationException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public int getInputIndex() {
+		return chained.getInputIndex();
+	}
+
+	@Override
+	public CompletableFuture<Void> prepareSnapshot(
+			ChannelStateWriter channelStateWriter,
+			long checkpointId) throws IOException {

Review comment:
       My IDE is warning me about a bunch of exceptions that are declared but never thrown in these methods.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/BytesKeyNormalizationUtil.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.streaming.api.operators.sort.FixedLengthByteKeyComparator.TIMESTAMP_BYTE_SIZE;
+
+/**
+ * Utility class for common key normalization used both in {@link VariableLengthByteKeyComparator}
+ * and {@link FixedLengthByteKeyComparator}.
+ */
+final class BytesKeyNormalizationUtil {
+	/**
+	 * Writes the normalized key of given record. The normalized key consists of the key serialized as bytes and
+	 * the timestamp of the record.
+	 *
+	 * <p>NOTE: The key does not represent a logical order. It can be used only for grouping keys!
+	 */
+	static <IN> void putNormalizedKey(
+			Tuple2<byte[], StreamRecord<IN>> record,
+			int dataLength,
+			MemorySegment target,
+			int offset,
+			int numBytes) {
+		byte[] data = record.f0;
+
+		if (dataLength >= numBytes) {
+			putBytesArray(target, offset, numBytes, data);
+		} else {
+			// whole key fits into the normalized key
+			putBytesArray(target, offset, dataLength, data);
+			int lastOffset = offset + numBytes;
+			offset += dataLength;
+			long valueOfTimestamp = record.f1.asRecord().getTimestamp() - Long.MIN_VALUE;
+			if (dataLength + TIMESTAMP_BYTE_SIZE <= numBytes) {
+				// whole timestamp fits into the normalized key
+				target.putLong(offset, valueOfTimestamp);
+				offset += TIMESTAMP_BYTE_SIZE;
+				// fill in the remaining space with zeros
+				while (offset < lastOffset) {
+					target.put(offset++, (byte) 0);
+				}
+			} else {
+				// only part of the timestamp fits into normalized key
+				for (int i = 0; offset < lastOffset; offset++, i++) {
+					target.put(offset, (byte) (valueOfTimestamp >>> ((7 - i) << 3)));
+				}
+			}
+		}
+	}
+
+	private static void putBytesArray(MemorySegment target, int offset, int numBytes, byte[] data) {
+		for (int i = 0; i < numBytes; i++) {
+			// default case, full normalized key. need to explicitly convert to int to
+			// avoid false results due to implicit type conversion to int when subtracting
+			// the min byte value
+			int highByte = data[i] & 0xff;
+			highByte -= Byte.MIN_VALUE;

Review comment:
       Just in case anyone is wondering what's happening here: We're converting the `signed byte` in `data` into an unsigned representation. A Java `byte` goes from `-127` to `128`, i.e. is signed. By subtracting `-127` (MIN_VALUE) here we're shifting the number to be from `0` to `255`. The normalized key sorter sorts bytes as "unsigned", so we need to convert here to maintain a correct ordering.
   
   Maybe add this as a comment in the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org