You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/07 14:24:59 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #13529: [FLINK-19473] Implement multi inputs sorting DataInput

pnowojski commented on a change in pull request #13529:
URL: https://github.com/apache/flink/pull/13529#discussion_r501001585



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -260,20 +259,6 @@ private boolean allStreamStatusesAreIdle() {
 		return true;
 	}
 
-	private static class SourceInputProcessor<T> extends StreamOneInputProcessor<T> {

Review comment:
       Nit, change commit title to:
   > [hotfix] Remove SourceInputProcessor in StreamMultipleInputProcessor

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A factory for {@link StreamMultipleInputProcessor}.
+ */
+@Internal
+public class StreamMultipleInputProcessorFactory {
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public static StreamMultipleInputProcessor create(
+			CheckpointedInputGate[] checkpointedInputGates,
+			StreamConfig.InputConfig[] configuredInputs,
+			IOManager ioManager,
+			TaskIOMetricGroup ioMetricGroup,
+			Counter mainOperatorRecordsIn,
+			StreamStatusMaintainer streamStatusMaintainer,
+			MultipleInputStreamOperator<?> mainOperator,
+			MultipleInputSelectionHandler inputSelectionHandler,
+			WatermarkGauge[] inputWatermarkGauges,
+			OperatorChain<?, ?> operatorChain) {
+		checkNotNull(operatorChain);
+		checkNotNull(inputSelectionHandler);
+
+		List<Input> operatorInputs = mainOperator.getInputs();
+		int inputsCount = operatorInputs.size();
+
+		StreamOneInputProcessor<?>[] inputProcessors = new StreamOneInputProcessor[inputsCount];
+		Counter networkRecordsIn = new SimpleCounter();
+		ioMetricGroup.reuseRecordsInputCounter(networkRecordsIn);
+
+		MultiStreamStreamStatusTracker streamStatusTracker = new MultiStreamStreamStatusTracker(inputsCount);
+		checkState(
+			configuredInputs.length == inputsCount,
+			"Number of configured inputs in StreamConfig [%s] doesn't match the main operator's number of inputs [%s]",
+			configuredInputs.length,
+			inputsCount);
+		for (int i = 0; i < inputsCount; i++) {
+			StreamConfig.InputConfig configuredInput = configuredInputs[i];
+			if (configuredInput instanceof StreamConfig.NetworkInputConfig) {
+				StreamConfig.NetworkInputConfig networkInput = (StreamConfig.NetworkInputConfig) configuredInput;
+				StreamTaskNetworkOutput dataOutput = new StreamTaskNetworkOutput<>(
+					operatorInputs.get(i),
+					streamStatusMaintainer,
+					inputWatermarkGauges[i],
+					inputSelectionHandler, streamStatusTracker,
+					i,
+					mainOperatorRecordsIn,
+					networkRecordsIn);
+
+				inputProcessors[i] = new StreamOneInputProcessor(
+					new StreamTaskNetworkInput<>(
+						checkpointedInputGates[networkInput.getInputGateIndex()],
+						networkInput.getTypeSerializer(),
+						ioManager,
+						new StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels()),
+						i),
+					dataOutput,
+					operatorChain);
+			}
+			else if (configuredInput instanceof StreamConfig.SourceInputConfig) {
+				StreamConfig.SourceInputConfig sourceInput = (StreamConfig.SourceInputConfig) configuredInput;
+				Output<StreamRecord<?>> chainedSourceOutput = operatorChain.getChainedSourceOutput(sourceInput);
+				StreamTaskSourceInput<?> sourceTaskInput = operatorChain.getSourceTaskInput(sourceInput);
+
+				inputProcessors[i] = new StreamOneInputProcessor(
+					sourceTaskInput,
+					new StreamTaskSourceOutput(chainedSourceOutput, streamStatusMaintainer, inputWatermarkGauges[i],
+						streamStatusTracker,
+						i),
+					operatorChain);
+			}
+			else {
+				throw new UnsupportedOperationException("Unknown input type: " + configuredInput);
+			}
+		}
+
+		return new StreamMultipleInputProcessor(
+			inputSelectionHandler,
+			inputProcessors
+		);
+	}
+
+
+	/**
+	 * Stream status tracker for the inputs. We need to keep track for determining when
+	 * to forward stream status changes downstream.
+	 */
+	private static class MultiStreamStreamStatusTracker {
+		private final StreamStatus[] streamStatuses;
+
+		private MultiStreamStreamStatusTracker(int numberOfInputs) {
+			this.streamStatuses = new StreamStatus[numberOfInputs];
+			Arrays.fill(streamStatuses, StreamStatus.ACTIVE);
+		}
+
+		public void setStreamStatus(int index, StreamStatus streamStatus) {
+			streamStatuses[index] = streamStatus;
+		}
+
+		public StreamStatus getStreamStatus(int index) {
+			return streamStatuses[index];
+		}
+
+		public boolean allStreamStatusesAreIdle() {
+			for (StreamStatus streamStatus : streamStatuses) {
+				if (streamStatus.isActive()) {
+					return false;
+				}
+			}
+			return true;
+		}
+	}

Review comment:
       Do I understand correctly, that previously this functionality was hidden/implemented by non static accesses between `StreamMultipleInputProcessor#StreamTaskNetworkOutput` (non static class) and `StreamMultipleInputProcessor`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -171,6 +168,10 @@ private int selectNextReadingInputIndex() throws IOException {
 		updateAvailability();
 		checkInputSelectionAgainstIsFinished();
 
+		if (inputSelectionHandler.isInputUnavailable(0) && inputSelectionHandler.isInputUnavailable(1)) {
+			fullCheckAndSetAvailable();
+		}

Review comment:
       Again as explained above, `StreamTwoInputProcessor#updateAvailability()` should do the trick (% the starvation check in L181)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/TwoInputSelectionHandler.java
##########
@@ -70,6 +70,10 @@ boolean areAllInputsSelected() {
 		return inputSelection.areAllInputsSelected();
 	}
 
+	boolean isInputUnavailable(int inputIndex) {

Review comment:
       nit: I would flip the method to `isInputAvailable(...)` and use it with `!` operator

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -124,6 +120,7 @@ private int selectFirstReadingInputIndex() throws IOException {
 	}
 
 	private void checkFinished(InputStatus status) throws Exception {
+		updateAvailability();

Review comment:
       why do we need this? The currently processed input should be updated via the returned `InputStatus`, no need to check it. If the other input is unavailable, and both will become unavailable, we would do the full check via waiting on availability future and we update the inputs (via cheap `isAproximatelyAvailable()`) once future is completed.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An entry class for creating coupled, sorting inputs. The inputs are sorted independently and afterwards
+ * the inputs are being merged in order. It is done, by reporting availability only for the input which is
+ * current head for the sorted inputs.
+ */
+public class MultiInputSortingDataInputs<K> {

Review comment:
       Is this class being used anywhere in this commit besides in tests?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/EndOfInputAware.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+
+/**
+ * An interface for {@link OperatorChain} to extract the feature of ending the input.
+ *
+ * <p>It's purpose is mainly to make it easier to instantiate {@link StreamOneInputProcessor} which needs to
+ * notify the chain that an input has ended.
+ */
+@Internal
+public interface EndOfInputAware {
+
+	/**
+	 * Ends the main operator input specified by {@code inputId}).
+	 *
+	 * @param inputId the input ID starts from 1 which indicates the first input.
+	 */
+	void endMainOperatorInput(int inputId) throws Exception;
+}

Review comment:
       1. name `MainOperator` doesn't fit here I think. I would rename it to `endInput`, but...
   2 why not use `BoundedMultiInput` interface instead? It's basically the same thing 😅 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -23,16 +23,12 @@
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;

Review comment:
       re
   > [hotfix] Fix availability handling in TwoInputStreamOperator
   
   0. Can you explain a bit more what's the actual issue (I don't see what is this commit fixing)?
   1. Can you create a bug ticket in the jira about about this and change commit from hotfix to [FLINK-XYZ]?
   2. Definitely this commit is missing some test. If you will struggle to reproduce (`StreamTwoInputProcessor` is tested on the task level in `StreamTaskSelectiveReadingTest` and `TwoInputStreamTaskTest`. Note that `TwoInputStreamTaskTestHarness` that is used there is in the process of being deprecated and:
     a) it would be just nice to implement the test based on `StreamTaskMailboxTestHarness`. So far it's mostly used for multiple input tasks tests, but it has been used in source tasks and one input tasks tests as well (for example `StreamTaskTest#testNotifyCheckpointOnClosedOperator`, so it should be easy to use for the two inputs as well.
     b) mailbox version of the test harness might be much much better at reproducing a problem that depends on timing as you can execute it step by step
   3. it would be best actually to merge this commit in separate PR to verify the performance impact of this bug fix alone (to narrow down suspects in case of regression detected after merging)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A factory for {@link StreamMultipleInputProcessor}.
+ */
+@Internal
+public class StreamMultipleInputProcessorFactory {
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public static StreamMultipleInputProcessor create(
+			CheckpointedInputGate[] checkpointedInputGates,
+			StreamConfig.InputConfig[] configuredInputs,
+			IOManager ioManager,
+			TaskIOMetricGroup ioMetricGroup,
+			Counter mainOperatorRecordsIn,
+			StreamStatusMaintainer streamStatusMaintainer,
+			MultipleInputStreamOperator<?> mainOperator,
+			MultipleInputSelectionHandler inputSelectionHandler,
+			WatermarkGauge[] inputWatermarkGauges,
+			OperatorChain<?, ?> operatorChain) {
+		checkNotNull(operatorChain);
+		checkNotNull(inputSelectionHandler);
+
+		List<Input> operatorInputs = mainOperator.getInputs();
+		int inputsCount = operatorInputs.size();
+
+		StreamOneInputProcessor<?>[] inputProcessors = new StreamOneInputProcessor[inputsCount];
+		Counter networkRecordsIn = new SimpleCounter();
+		ioMetricGroup.reuseRecordsInputCounter(networkRecordsIn);
+
+		MultiStreamStreamStatusTracker streamStatusTracker = new MultiStreamStreamStatusTracker(inputsCount);
+		checkState(
+			configuredInputs.length == inputsCount,
+			"Number of configured inputs in StreamConfig [%s] doesn't match the main operator's number of inputs [%s]",
+			configuredInputs.length,
+			inputsCount);
+		for (int i = 0; i < inputsCount; i++) {
+			StreamConfig.InputConfig configuredInput = configuredInputs[i];
+			if (configuredInput instanceof StreamConfig.NetworkInputConfig) {
+				StreamConfig.NetworkInputConfig networkInput = (StreamConfig.NetworkInputConfig) configuredInput;
+				StreamTaskNetworkOutput dataOutput = new StreamTaskNetworkOutput<>(
+					operatorInputs.get(i),
+					streamStatusMaintainer,
+					inputWatermarkGauges[i],
+					inputSelectionHandler, streamStatusTracker,
+					i,
+					mainOperatorRecordsIn,
+					networkRecordsIn);
+
+				inputProcessors[i] = new StreamOneInputProcessor(
+					new StreamTaskNetworkInput<>(
+						checkpointedInputGates[networkInput.getInputGateIndex()],
+						networkInput.getTypeSerializer(),
+						ioManager,
+						new StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels()),
+						i),
+					dataOutput,
+					operatorChain);
+			}
+			else if (configuredInput instanceof StreamConfig.SourceInputConfig) {
+				StreamConfig.SourceInputConfig sourceInput = (StreamConfig.SourceInputConfig) configuredInput;
+				Output<StreamRecord<?>> chainedSourceOutput = operatorChain.getChainedSourceOutput(sourceInput);
+				StreamTaskSourceInput<?> sourceTaskInput = operatorChain.getSourceTaskInput(sourceInput);
+
+				inputProcessors[i] = new StreamOneInputProcessor(
+					sourceTaskInput,
+					new StreamTaskSourceOutput(chainedSourceOutput, streamStatusMaintainer, inputWatermarkGauges[i],
+						streamStatusTracker,
+						i),

Review comment:
       nit: formatting

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortedInputITCase.java
##########
@@ -52,13 +56,16 @@
 import static org.junit.Assert.assertThat;
 
 /**
- * Longer running IT tests for {@link SortingDataInputTest}. For quicker smoke tests see {@link SortingDataInputTest}.
+ * Longer running IT tests for {@link SortingDataInput} and {@link MultiInputSortingDataInputs}.
+ *
+ * @see SortingDataInputTest
+ * @see MultiInputSortingDataInputsTest
  */
-public class SortingDataInputITCase {
+public class SortedInputITCase {

Review comment:
       Names are here a bit confusing. `Sorted` vs `Sorting` and "longer running" is encoded as `ITCase` vs `Test`.
   
   Maybe:
   `SortedInputITCase` -> `LargeSortedInputITCase` (or `LargeSortingInputITCase`)
   
   Also maybe unify `SortingDataInput` vs `SortedInput`, or does it make sense as it is?
   

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
##########
@@ -40,38 +48,78 @@
  */
 public class StreamTwoInputProcessorFactory {
 	public static <IN1, IN2> StreamTwoInputProcessor<IN1, IN2> create(
+			AbstractInvokable ownerTask,
 			CheckpointedInputGate[] checkpointedInputGates,
-			TypeSerializer<IN1> inputSerializer1,
-			TypeSerializer<IN2> inputSerializer2,
 			IOManager ioManager,
+			MemoryManager memoryManager,
 			TaskIOMetricGroup taskIOMetricGroup,
 			StreamStatusMaintainer streamStatusMaintainer,
 			TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
 			TwoInputSelectionHandler inputSelectionHandler,
 			WatermarkGauge input1WatermarkGauge,
 			WatermarkGauge input2WatermarkGauge,
 			OperatorChain<?, ?> operatorChain,
+			StreamConfig streamConfig,
+			Configuration taskManagerConfig,
+			Configuration jobConfig,
+			ExecutionConfig executionConfig,
+			ClassLoader userClassloader,
 			Counter numRecordsIn) {
 
 		checkNotNull(operatorChain);
 		checkNotNull(inputSelectionHandler);
+
 		StreamStatusTracker statusTracker = new StreamStatusTracker();
 		taskIOMetricGroup.reuseRecordsInputCounter(numRecordsIn);
+		TypeSerializer<IN1> typeSerializer1 = streamConfig.getTypeSerializerIn(0, userClassloader);
+		StreamTaskInput<IN1> input1 = new StreamTaskNetworkInput<>(
+			checkpointedInputGates[0],
+			typeSerializer1,
+			ioManager,
+			new StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels()),
+			0);
+		TypeSerializer<IN2> typeSerializer2 = streamConfig.getTypeSerializerIn(1, userClassloader);
+		StreamTaskInput<IN2> input2 = new StreamTaskNetworkInput<>(
+			checkpointedInputGates[1],
+			typeSerializer2,
+			ioManager,
+			new StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels()),
+			1);
+
+		if (streamConfig.shouldSortInputs()) {
+			@SuppressWarnings("unchecked")
+			MultiInputSortingDataInputs<Object> multiInputs = new MultiInputSortingDataInputs<Object>(

Review comment:
       (I remember someone complaining about unchecked warnings 🙈 and `Object`? 😈  )

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -67,7 +63,8 @@ public StreamTwoInputProcessor(
 
 	@Override
 	public CompletableFuture<?> getAvailableFuture() {
-		if (inputSelectionHandler.areAllInputsSelected()) {
+		if (inputSelectionHandler.areAllInputsSelected() ||
+				(inputSelectionHandler.isInputUnavailable(0) && !inputSelectionHandler.isInputUnavailable(1))) {

Review comment:
       I don't get this condition. Why first has to be unavailable, while second available?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org