You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/23 15:55:42 UTC
[flink] branch release-1.11 updated: [FLINK-18348]
RemoteInputChannel should checkError before checking partitionRequestClient
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 52fa6ab [FLINK-18348] RemoteInputChannel should checkError before checking partitionRequestClient
52fa6ab is described below
commit 52fa6abdb7d1d479b36a4c76ccccfc9a295cb0b1
Author: Jiayi-Liao <bu...@gmail.com>
AuthorDate: Fri Jun 19 18:51:28 2020 +0800
[FLINK-18348] RemoteInputChannel should checkError before checking partitionRequestClient
---
.../network/partition/consumer/BufferManager.java | 6 ++++-
.../network/partition/consumer/InputChannel.java | 4 ++--
.../io/network/partition/consumer/InputGate.java | 2 +-
.../partition/consumer/RemoteInputChannel.java | 26 ++++++++++++----------
.../partition/consumer/SingleInputGate.java | 2 +-
.../network/partition/consumer/UnionInputGate.java | 2 +-
.../runtime/taskmanager/InputGateWithMetrics.java | 2 +-
.../partition/consumer/RemoteInputChannelTest.java | 12 ++++++++++
.../io/AlternatingCheckpointBarrierHandler.java | 2 +-
.../runtime/io/CheckpointBarrierAligner.java | 8 +++----
.../runtime/io/CheckpointBarrierHandler.java | 2 +-
11 files changed, 43 insertions(+), 25 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
index 764d558..429ccb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
@@ -198,7 +198,11 @@ public class BufferManager implements BufferListener, BufferRecycler {
}
}
- inputChannel.notifyBufferAvailable(numAddedBuffers);
+ try {
+ inputChannel.notifyBufferAvailable(numAddedBuffers);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
}
void releaseFloatingBuffers() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 0c47272..975fa5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -133,7 +133,7 @@ public abstract class InputChannel {
* exactly-once mode, the upstream will be blocked and become unavailable. This method
* tries to unblock the corresponding upstream and resume data consumption.
*/
- public abstract void resumeConsumption();
+ public abstract void resumeConsumption() throws IOException;
/**
* Notifies the owning {@link SingleInputGate} that this channel became non-empty.
@@ -154,7 +154,7 @@ public abstract class InputChannel {
public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) throws IOException {
}
- protected void notifyBufferAvailable(int numAvailableBuffers) {
+ protected void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0489fde..ff46913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -114,7 +114,7 @@ public abstract class InputGate implements PullingAsyncDataInput<BufferOrEvent>,
return availabilityHelper.getAvailableFuture();
}
- public abstract void resumeConsumption(int channelIndex);
+ public abstract void resumeConsumption(int channelIndex) throws IOException;
/**
* Returns the channel of this gate.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index ba8fc11..dfbbc6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -157,7 +157,7 @@ public class RemoteInputChannel extends InputChannel {
* Retriggers a remote subpartition request.
*/
void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException {
- checkState(partitionRequestClient != null, "Missing initial subpartition request.");
+ checkPartitionRequestQueueInitialized();
if (increaseBackoff()) {
partitionRequestClient.requestSubpartition(
@@ -169,9 +169,7 @@ public class RemoteInputChannel extends InputChannel {
@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException {
- checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");
-
- checkError();
+ checkPartitionRequestQueueInitialized();
final Buffer next;
final boolean moreAvailable;
@@ -227,9 +225,7 @@ public class RemoteInputChannel extends InputChannel {
@Override
void sendTaskEvent(TaskEvent event) throws IOException {
checkState(!isReleased.get(), "Tried to send task event to producer after channel has been released.");
- checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
-
- checkError();
+ checkPartitionRequestQueueInitialized();
partitionRequestClient.sendTaskEvent(partitionId, event, this);
}
@@ -283,8 +279,8 @@ public class RemoteInputChannel extends InputChannel {
/**
* Enqueue this input channel in the pipeline for notifying the producer of unannounced credit.
*/
- private void notifyCreditAvailable() {
- checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
+ private void notifyCreditAvailable() throws IOException {
+ checkPartitionRequestQueueInitialized();
partitionRequestClient.notifyCreditAvailable(this);
}
@@ -330,16 +326,16 @@ public class RemoteInputChannel extends InputChannel {
* increased credit to the producer.
*/
@Override
- public void notifyBufferAvailable(int numAvailableBuffers) {
+ public void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
if (numAvailableBuffers > 0 && unannouncedCredit.getAndAdd(numAvailableBuffers) == 0) {
notifyCreditAvailable();
}
}
@Override
- public void resumeConsumption() {
+ public void resumeConsumption() throws IOException {
checkState(!isReleased.get(), "Channel released.");
- checkState(partitionRequestClient != null, "Trying to send event to producer before requesting a queue.");
+ checkPartitionRequestQueueInitialized();
// notifies the producer that this channel is ready to
// unblock from checkpoint and resume data consumption
@@ -519,6 +515,12 @@ public class RemoteInputChannel extends InputChannel {
setError(cause);
}
+ private void checkPartitionRequestQueueInitialized() throws IOException {
+ checkError();
+ checkState(partitionRequestClient != null,
+ "Bug: partitionRequestClient is not initialized before processing data and no error is detected.");
+ }
+
private static class BufferReorderingException extends IOException {
private static final long serialVersionUID = -888282210356266816L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 0bd06c0..dcb4e2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -737,7 +737,7 @@ public class SingleInputGate extends IndexedInputGate {
}
@Override
- public void resumeConsumption(int channelIndex) {
+ public void resumeConsumption(int channelIndex) throws IOException {
// BEWARE: consumption resumption only happens for streaming jobs in which all slots
// are allocated together so there should be no UnknownInputChannel. As a result, it
// is safe to not synchronize the requestLock here. We will refactor the code to not
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index c05eef7..ef56a7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -250,7 +250,7 @@ public class UnionInputGate extends InputGate {
}
@Override
- public void resumeConsumption(int channelIndex) {
+ public void resumeConsumption(int channelIndex) throws IOException {
// BEWARE: consumption resumption only happens for streaming jobs in which all
// slots are allocated together so there should be no UnknownInputChannel. We
// will refactor the code to not rely on this assumption in the future.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index fc6b635..806a461 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -56,7 +56,7 @@ public class InputGateWithMetrics extends IndexedInputGate {
}
@Override
- public void resumeConsumption(int channelIndex) {
+ public void resumeConsumption(int channelIndex) throws IOException {
inputGate.resumeConsumption(channelIndex);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 3984dde..a2e1236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -335,6 +335,18 @@ public class RemoteInputChannelTest {
ch.getNextBuffer();
}
+ @Test(expected = PartitionConnectionException.class)
+ public void testPartitionConnectionException() throws IOException {
+ final ConnectionManager connManager = new TestingExceptionConnectionManager();
+ final SingleInputGate gate = createSingleInputGate(1);
+ final RemoteInputChannel ch = InputChannelTestUtils.createRemoteInputChannel(gate, 0, connManager);
+ gate.setInputChannels(ch);
+
+ gate.requestPartitions();
+
+ ch.getNextBuffer();
+ }
+
/**
* Tests to verify the behaviours of three different processes if the number of available
* buffers is less than required buffers.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
index 2fb6e72..b287f1e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
@@ -46,7 +46,7 @@ class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler {
}
@Override
- public void releaseBlocksAndResetBarriers() {
+ public void releaseBlocksAndResetBarriers() throws IOException {
activeHandler.releaseBlocksAndResetBarriers();
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
index a052a70..6ecfb42 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
@@ -100,15 +100,15 @@ public class CheckpointBarrierAligner extends CheckpointBarrierHandler {
}
@Override
- public void releaseBlocksAndResetBarriers() {
+ public void releaseBlocksAndResetBarriers() throws IOException {
LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);
- blockedChannels.entrySet().forEach(blockedChannel -> {
+ for (Map.Entry<InputChannelInfo, Boolean> blockedChannel : blockedChannels.entrySet()) {
if (blockedChannel.getValue()) {
resumeConsumption(blockedChannel.getKey());
}
blockedChannel.setValue(false);
- });
+ }
// the next barrier that comes must assume it is the first
numBarriersReceived = 0;
@@ -338,7 +338,7 @@ public class CheckpointBarrierAligner extends CheckpointBarrierHandler {
return numBarriersReceived > 0;
}
- private void resumeConsumption(InputChannelInfo channelInfo) {
+ private void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
InputGate inputGate = inputGates[channelInfo.getGateIdx()];
checkState(!inputGate.isFinished(), "InputGate already finished.");
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index fb0a319..c6d530b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -52,7 +52,7 @@ public abstract class CheckpointBarrierHandler implements Closeable {
this.toNotifyOnCheckpoint = checkNotNull(toNotifyOnCheckpoint);
}
- public void releaseBlocksAndResetBarriers() {
+ public void releaseBlocksAndResetBarriers() throws IOException {
}
/**