You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/03 12:55:05 UTC

[flink] branch release-1.13 updated: [FLINK-22368] Deque channel after releasing on EndOfPartition

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new b41c5fa  [FLINK-22368] Deque channel after releasing on EndOfPartition
b41c5fa is described below

commit b41c5fab3e3eebb04b891ec2ac0688bedc2d94eb
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Apr 22 00:49:13 2021 +0200

    [FLINK-22368] Deque channel after releasing on EndOfPartition
    
    ...and don't enqueue the channel if it received EndOfPartition
    previously.
    
    Leaving a released channel enqueued may lead to
    CancelTaskException which can prevent EndOfPartitionEvent
    propagation and the job being stuck.
---
 .../partition/consumer/SingleInputGate.java        | 33 +++++++++++++---------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 1b09e60..03a2a50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -156,6 +156,7 @@ public class SingleInputGate extends IndexedInputGate {
     @GuardedBy("inputChannelsWithData")
     private final BitSet enqueuedInputChannelsWithData;
 
+    @GuardedBy("inputChannelsWithData")
     private final BitSet channelsWithEndOfPartitionEvents;
 
     @GuardedBy("inputChannelsWithData")
@@ -729,16 +730,24 @@ public class SingleInputGate extends IndexedInputGate {
         }
 
         if (event.getClass() == EndOfPartitionEvent.class) {
-            channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
-
-            if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
+            synchronized (inputChannelsWithData) {
+                checkState(!channelsWithEndOfPartitionEvents.get(currentChannel.getChannelIndex()));
+                channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
+                hasReceivedAllEndOfPartitionEvents =
+                        channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels;
+
+                enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+                if (inputChannelsWithData.contains(currentChannel)) {
+                    inputChannelsWithData.getAndRemove(channel -> channel == currentChannel);
+                }
+            }
+            if (hasReceivedAllEndOfPartitionEvents) {
                 // Because of race condition between:
                 // 1. releasing inputChannelsWithData lock in this method and reaching this place
-                // 2. empty data notification that re-enqueues a channel
-                // we can end up with moreAvailable flag set to true, while we expect no more data.
+                // 2. empty data notification that re-enqueues a channel we can end up with
+                // moreAvailable flag set to true, while we expect no more data.
                 checkState(!moreAvailable || !pollNext().isPresent());
                 moreAvailable = false;
-                hasReceivedAllEndOfPartitionEvents = true;
                 markAvailable();
             }
 
@@ -859,12 +868,6 @@ public class SingleInputGate extends IndexedInputGate {
                     return;
                 }
 
-                if (channel.isReleased()) {
-                    // when channel is closed, EndOfPartitionEvent is send and a final notification
-                    // if EndOfPartitionEvent causes a release, we must ignore the notification
-                    return;
-                }
-
                 if (!queueChannelUnsafe(channel, priority)) {
                     return;
                 }
@@ -889,13 +892,17 @@ public class SingleInputGate extends IndexedInputGate {
     }
 
     /**
-     * Queues the channel if not already enqueued, potentially raising the priority.
+     * Queues the channel if not already enqueued and not received EndOfPartition, potentially
+     * raising the priority.
      *
      * @return true iff it has been enqueued/prioritized = some change to {@link
      *     #inputChannelsWithData} happened
      */
     private boolean queueChannelUnsafe(InputChannel channel, boolean priority) {
         assert Thread.holdsLock(inputChannelsWithData);
+        if (channelsWithEndOfPartitionEvents.get(channel.getChannelIndex())) {
+            return false;
+        }
 
         final boolean alreadyEnqueued =
                 enqueuedInputChannelsWithData.get(channel.getChannelIndex());