You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/05/04 13:02:40 UTC
[flink] branch master updated: [FLINK-22548][network] Remove
illegal unsynchronized access to PipelinedSubpartition#buffers
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1554292 [FLINK-22548][network] Remove illegal unsynchronized access to PipelinedSubpartition#buffers
1554292 is described below
commit 15542925765598a89e82fcdabf5a5f3a8954d94e
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue May 4 12:32:30 2021 +0200
[FLINK-22548][network] Remove illegal unsynchronized access to PipelinedSubpartition#buffers
After peeking last buffer, this last buffer could have been processed
and recycled by the task thread, before NetworkActionsLogger.traceRecover
would manage to check it's content causing IllegalReferenceCount exceptions.
Further more, this log seemed excessive and was accidentally added after
debugging session, so it's ok to remove it.
---
.../runtime/io/network/partition/PipelinedSubpartition.java | 10 +---------
1 file changed, 1 insertion(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c562028..e660c28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -130,21 +130,13 @@ public class PipelinedSubpartition extends ResultSubpartition
@Override
public void addRecovered(BufferConsumer bufferConsumer) throws IOException {
NetworkActionsLogger.traceRecover(
- "ResultSubpartitionRecoveredStateHandler#recover",
+ "PipelinedSubpartition#addRecovered",
bufferConsumer,
parent.getOwningTaskName(),
subpartitionInfo);
if (!add(bufferConsumer, Integer.MIN_VALUE)) {
throw new IOException("Buffer consumer couldn't be added to ResultSubpartition");
}
- BufferConsumerWithPartialRecordLength last = buffers.peekLast();
- if (last != null) {
- NetworkActionsLogger.traceRecover(
- "PipelinedSubpartition#addRecover",
- last.getBufferConsumer(),
- parent.getOwningTaskName(),
- subpartitionInfo);
- }
}
@Override