You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/23 15:55:42 UTC

[flink] branch release-1.11 updated: [FLINK-18348] RemoteInputChannel should checkError before checking partitionRequestClient

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 52fa6ab  [FLINK-18348] RemoteInputChannel should checkError before checking partitionRequestClient
52fa6ab is described below

commit 52fa6abdb7d1d479b36a4c76ccccfc9a295cb0b1
Author: Jiayi-Liao <bu...@gmail.com>
AuthorDate: Fri Jun 19 18:51:28 2020 +0800

    [FLINK-18348] RemoteInputChannel should checkError before checking partitionRequestClient
---
 .../network/partition/consumer/BufferManager.java  |  6 ++++-
 .../network/partition/consumer/InputChannel.java   |  4 ++--
 .../io/network/partition/consumer/InputGate.java   |  2 +-
 .../partition/consumer/RemoteInputChannel.java     | 26 ++++++++++++----------
 .../partition/consumer/SingleInputGate.java        |  2 +-
 .../network/partition/consumer/UnionInputGate.java |  2 +-
 .../runtime/taskmanager/InputGateWithMetrics.java  |  2 +-
 .../partition/consumer/RemoteInputChannelTest.java | 12 ++++++++++
 .../io/AlternatingCheckpointBarrierHandler.java    |  2 +-
 .../runtime/io/CheckpointBarrierAligner.java       |  8 +++----
 .../runtime/io/CheckpointBarrierHandler.java       |  2 +-
 11 files changed, 43 insertions(+), 25 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
index 764d558..429ccb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
@@ -198,7 +198,11 @@ public class BufferManager implements BufferListener, BufferRecycler {
 			}
 		}
 
-		inputChannel.notifyBufferAvailable(numAddedBuffers);
+		try {
+			inputChannel.notifyBufferAvailable(numAddedBuffers);
+		} catch (Throwable t) {
+			ExceptionUtils.rethrow(t);
+		}
 	}
 
 	void releaseFloatingBuffers() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 0c47272..975fa5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -133,7 +133,7 @@ public abstract class InputChannel {
 	 * exactly-once mode, the upstream will be blocked and become unavailable. This method
 	 * tries to unblock the corresponding upstream and resume data consumption.
 	 */
-	public abstract void resumeConsumption();
+	public abstract void resumeConsumption() throws IOException;
 
 	/**
 	 * Notifies the owning {@link SingleInputGate} that this channel became non-empty.
@@ -154,7 +154,7 @@ public abstract class InputChannel {
 	public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) throws IOException {
 	}
 
-	protected void notifyBufferAvailable(int numAvailableBuffers) {
+	protected void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0489fde..ff46913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -114,7 +114,7 @@ public abstract class InputGate implements PullingAsyncDataInput<BufferOrEvent>,
 		return availabilityHelper.getAvailableFuture();
 	}
 
-	public abstract void resumeConsumption(int channelIndex);
+	public abstract void resumeConsumption(int channelIndex) throws IOException;
 
 	/**
 	 * Returns the channel of this gate.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index ba8fc11..dfbbc6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -157,7 +157,7 @@ public class RemoteInputChannel extends InputChannel {
 	 * Retriggers a remote subpartition request.
 	 */
 	void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException {
-		checkState(partitionRequestClient != null, "Missing initial subpartition request.");
+		checkPartitionRequestQueueInitialized();
 
 		if (increaseBackoff()) {
 			partitionRequestClient.requestSubpartition(
@@ -169,9 +169,7 @@ public class RemoteInputChannel extends InputChannel {
 
 	@Override
 	Optional<BufferAndAvailability> getNextBuffer() throws IOException {
-		checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");
-
-		checkError();
+		checkPartitionRequestQueueInitialized();
 
 		final Buffer next;
 		final boolean moreAvailable;
@@ -227,9 +225,7 @@ public class RemoteInputChannel extends InputChannel {
 	@Override
 	void sendTaskEvent(TaskEvent event) throws IOException {
 		checkState(!isReleased.get(), "Tried to send task event to producer after channel has been released.");
-		checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
-
-		checkError();
+		checkPartitionRequestQueueInitialized();
 
 		partitionRequestClient.sendTaskEvent(partitionId, event, this);
 	}
@@ -283,8 +279,8 @@ public class RemoteInputChannel extends InputChannel {
 	/**
 	 * Enqueue this input channel in the pipeline for notifying the producer of unannounced credit.
 	 */
-	private void notifyCreditAvailable() {
-		checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
+	private void notifyCreditAvailable() throws IOException {
+		checkPartitionRequestQueueInitialized();
 
 		partitionRequestClient.notifyCreditAvailable(this);
 	}
@@ -330,16 +326,16 @@ public class RemoteInputChannel extends InputChannel {
 	 * increased credit to the producer.
 	 */
 	@Override
-	public void notifyBufferAvailable(int numAvailableBuffers) {
+	public void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
 		if (numAvailableBuffers > 0 && unannouncedCredit.getAndAdd(numAvailableBuffers) == 0) {
 			notifyCreditAvailable();
 		}
 	}
 
 	@Override
-	public void resumeConsumption() {
+	public void resumeConsumption() throws IOException {
 		checkState(!isReleased.get(), "Channel released.");
-		checkState(partitionRequestClient != null, "Trying to send event to producer before requesting a queue.");
+		checkPartitionRequestQueueInitialized();
 
 		// notifies the producer that this channel is ready to
 		// unblock from checkpoint and resume data consumption
@@ -519,6 +515,12 @@ public class RemoteInputChannel extends InputChannel {
 		setError(cause);
 	}
 
+	private void checkPartitionRequestQueueInitialized() throws IOException {
+		checkError();
+		checkState(partitionRequestClient != null,
+				"Bug: partitionRequestClient is not initialized before processing data and no error is detected.");
+	}
+
 	private static class BufferReorderingException extends IOException {
 
 		private static final long serialVersionUID = -888282210356266816L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 0bd06c0..dcb4e2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -737,7 +737,7 @@ public class SingleInputGate extends IndexedInputGate {
 	}
 
 	@Override
-	public void resumeConsumption(int channelIndex) {
+	public void resumeConsumption(int channelIndex) throws IOException {
 		// BEWARE: consumption resumption only happens for streaming jobs in which all slots
 		// are allocated together so there should be no UnknownInputChannel. As a result, it
 		// is safe to not synchronize the requestLock here. We will refactor the code to not
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index c05eef7..ef56a7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -250,7 +250,7 @@ public class UnionInputGate extends InputGate {
 	}
 
 	@Override
-	public void resumeConsumption(int channelIndex) {
+	public void resumeConsumption(int channelIndex) throws IOException {
 		// BEWARE: consumption resumption only happens for streaming jobs in which all
 		// slots are allocated together so there should be no UnknownInputChannel. We
 		// will refactor the code to not rely on this assumption in the future.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index fc6b635..806a461 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -56,7 +56,7 @@ public class InputGateWithMetrics extends IndexedInputGate {
 	}
 
 	@Override
-	public void resumeConsumption(int channelIndex) {
+	public void resumeConsumption(int channelIndex) throws IOException {
 		inputGate.resumeConsumption(channelIndex);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 3984dde..a2e1236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -335,6 +335,18 @@ public class RemoteInputChannelTest {
 		ch.getNextBuffer();
 	}
 
+	@Test(expected = PartitionConnectionException.class)
+	public void testPartitionConnectionException() throws IOException {
+		final ConnectionManager connManager = new TestingExceptionConnectionManager();
+		final SingleInputGate gate = createSingleInputGate(1);
+		final RemoteInputChannel ch = InputChannelTestUtils.createRemoteInputChannel(gate, 0, connManager);
+		gate.setInputChannels(ch);
+
+		gate.requestPartitions();
+
+		ch.getNextBuffer();
+	}
+
 	/**
 	 * Tests to verify the behaviours of three different processes if the number of available
 	 * buffers is less than required buffers.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
index 2fb6e72..b287f1e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
@@ -46,7 +46,7 @@ class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler {
 	}
 
 	@Override
-	public void releaseBlocksAndResetBarriers() {
+	public void releaseBlocksAndResetBarriers() throws IOException {
 		activeHandler.releaseBlocksAndResetBarriers();
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
index a052a70..6ecfb42 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
@@ -100,15 +100,15 @@ public class CheckpointBarrierAligner extends CheckpointBarrierHandler {
 	}
 
 	@Override
-	public void releaseBlocksAndResetBarriers() {
+	public void releaseBlocksAndResetBarriers() throws IOException {
 		LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);
 
-		blockedChannels.entrySet().forEach(blockedChannel -> {
+		for (Map.Entry<InputChannelInfo, Boolean> blockedChannel : blockedChannels.entrySet()) {
 			if (blockedChannel.getValue()) {
 				resumeConsumption(blockedChannel.getKey());
 			}
 			blockedChannel.setValue(false);
-		});
+		}
 
 		// the next barrier that comes must assume it is the first
 		numBarriersReceived = 0;
@@ -338,7 +338,7 @@ public class CheckpointBarrierAligner extends CheckpointBarrierHandler {
 		return numBarriersReceived > 0;
 	}
 
-	private void resumeConsumption(InputChannelInfo channelInfo) {
+	private void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
 		InputGate inputGate = inputGates[channelInfo.getGateIdx()];
 		checkState(!inputGate.isFinished(), "InputGate already finished.");
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index fb0a319..c6d530b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -52,7 +52,7 @@ public abstract class CheckpointBarrierHandler implements Closeable {
 		this.toNotifyOnCheckpoint = checkNotNull(toNotifyOnCheckpoint);
 	}
 
-	public void releaseBlocksAndResetBarriers() {
+	public void releaseBlocksAndResetBarriers() throws IOException {
 	}
 
 	/**