You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/03 12:55:05 UTC
[flink] branch release-1.13 updated: [FLINK-22368] Deque channel
after releasing on EndOfPartition
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new b41c5fa [FLINK-22368] Deque channel after releasing on EndOfPartition
b41c5fa is described below
commit b41c5fab3e3eebb04b891ec2ac0688bedc2d94eb
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Apr 22 00:49:13 2021 +0200
[FLINK-22368] Deque channel after releasing on EndOfPartition
...and don't enqueue the channel if it received EndOfPartition
previously.
Leaving a released channel enqueued may lead to
CancelTaskException which can prevent EndOfPartitionEvent
propagation and the job being stuck.
---
.../partition/consumer/SingleInputGate.java | 33 +++++++++++++---------
1 file changed, 20 insertions(+), 13 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 1b09e60..03a2a50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -156,6 +156,7 @@ public class SingleInputGate extends IndexedInputGate {
@GuardedBy("inputChannelsWithData")
private final BitSet enqueuedInputChannelsWithData;
+ @GuardedBy("inputChannelsWithData")
private final BitSet channelsWithEndOfPartitionEvents;
@GuardedBy("inputChannelsWithData")
@@ -729,16 +730,24 @@ public class SingleInputGate extends IndexedInputGate {
}
if (event.getClass() == EndOfPartitionEvent.class) {
- channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
-
- if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
+ synchronized (inputChannelsWithData) {
+ checkState(!channelsWithEndOfPartitionEvents.get(currentChannel.getChannelIndex()));
+ channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
+ hasReceivedAllEndOfPartitionEvents =
+ channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels;
+
+ enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+ if (inputChannelsWithData.contains(currentChannel)) {
+ inputChannelsWithData.getAndRemove(channel -> channel == currentChannel);
+ }
+ }
+ if (hasReceivedAllEndOfPartitionEvents) {
// Because of race condition between:
// 1. releasing inputChannelsWithData lock in this method and reaching this place
- // 2. empty data notification that re-enqueues a channel
- // we can end up with moreAvailable flag set to true, while we expect no more data.
+ // 2. empty data notification that re-enqueues a channel we can end up with
+ // moreAvailable flag set to true, while we expect no more data.
checkState(!moreAvailable || !pollNext().isPresent());
moreAvailable = false;
- hasReceivedAllEndOfPartitionEvents = true;
markAvailable();
}
@@ -859,12 +868,6 @@ public class SingleInputGate extends IndexedInputGate {
return;
}
- if (channel.isReleased()) {
- // when channel is closed, EndOfPartitionEvent is send and a final notification
- // if EndOfPartitionEvent causes a release, we must ignore the notification
- return;
- }
-
if (!queueChannelUnsafe(channel, priority)) {
return;
}
@@ -889,13 +892,17 @@ public class SingleInputGate extends IndexedInputGate {
}
/**
- * Queues the channel if not already enqueued, potentially raising the priority.
+ * Queues the channel if not already enqueued and not received EndOfPartition, potentially
+ * raising the priority.
*
* @return true iff it has been enqueued/prioritized = some change to {@link
* #inputChannelsWithData} happened
*/
private boolean queueChannelUnsafe(InputChannel channel, boolean priority) {
assert Thread.holdsLock(inputChannelsWithData);
+ if (channelsWithEndOfPartitionEvents.get(channel.getChannelIndex())) {
+ return false;
+ }
final boolean alreadyEnqueued =
enqueuedInputChannelsWithData.get(channel.getChannelIndex());