You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/05/04 13:02:40 UTC

[flink] branch master updated: [FLINK-22548][network] Remove illegal unsynchronized access to PipelinedSubpartition#buffers

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1554292  [FLINK-22548][network] Remove illegal unsynchronized access to PipelinedSubpartition#buffers
1554292 is described below

commit 15542925765598a89e82fcdabf5a5f3a8954d94e
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue May 4 12:32:30 2021 +0200

    [FLINK-22548][network] Remove illegal unsynchronized access to PipelinedSubpartition#buffers
    
    After peeking last buffer, this last buffer could have been processed
    and recycled by the task thread, before NetworkActionsLogger.traceRecover
    would manage to check it's content causing IllegalReferenceCount exceptions.
    
    Further more, this log seemed excessive and was accidentally added after
    debugging session, so it's ok to remove it.
---
 .../runtime/io/network/partition/PipelinedSubpartition.java    | 10 +---------
 1 file changed, 1 insertion(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c562028..e660c28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -130,21 +130,13 @@ public class PipelinedSubpartition extends ResultSubpartition
     @Override
     public void addRecovered(BufferConsumer bufferConsumer) throws IOException {
         NetworkActionsLogger.traceRecover(
-                "ResultSubpartitionRecoveredStateHandler#recover",
+                "PipelinedSubpartition#addRecovered",
                 bufferConsumer,
                 parent.getOwningTaskName(),
                 subpartitionInfo);
         if (!add(bufferConsumer, Integer.MIN_VALUE)) {
             throw new IOException("Buffer consumer couldn't be added to ResultSubpartition");
         }
-        BufferConsumerWithPartialRecordLength last = buffers.peekLast();
-        if (last != null) {
-            NetworkActionsLogger.traceRecover(
-                    "PipelinedSubpartition#addRecover",
-                    last.getBufferConsumer(),
-                    parent.getOwningTaskName(),
-                    subpartitionInfo);
-        }
     }
 
     @Override