You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 11:42:52 UTC
[flink] 02/02: [FLINK-10332][network] move data notification out of
the synchronized block
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 607c5675a108114254c747ca3565c6f03a98434e
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Sep 18 12:10:21 2018 +0200
[FLINK-10332][network] move data notification out of the synchronized block
None of the notifications actually rely on being under the lock and may thus
only cause lock contention.
This closes #6693.
---
.../network/partition/PipelinedSubpartition.java | 44 +++++++++++-----------
1 file changed, 22 insertions(+), 22 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index d2d7fdb..fe27d97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -88,6 +88,7 @@ class PipelinedSubpartition extends ResultSubpartition {
private boolean add(BufferConsumer bufferConsumer, boolean finish) {
checkNotNull(bufferConsumer);
+ final boolean notifyDataAvailable;
synchronized (buffers) {
if (isFinished || isReleased) {
bufferConsumer.close();
@@ -98,14 +99,13 @@ class PipelinedSubpartition extends ResultSubpartition {
buffers.add(bufferConsumer);
updateStatistics(bufferConsumer);
increaseBuffersInBacklog(bufferConsumer);
+ notifyDataAvailable = shouldNotifyDataAvailable() || finish;
- if (finish) {
- isFinished = true;
- notifyDataAvailable();
- }
- else {
- maybeNotifyDataAvailable();
- }
+ isFinished |= finish;
+ }
+
+ if (notifyDataAvailable) {
+ notifyDataAvailable();
}
return true;
@@ -220,6 +220,7 @@ class PipelinedSubpartition extends ResultSubpartition {
@Override
public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
+ final boolean notifyDataAvailable;
synchronized (buffers) {
checkState(!isReleased);
checkState(readView == null,
@@ -230,9 +231,10 @@ class PipelinedSubpartition extends ResultSubpartition {
parent.getOwningTaskName(), index, parent.getPartitionId());
readView = new PipelinedSubpartitionView(this, availabilityListener);
- if (!buffers.isEmpty()) {
- notifyDataAvailable();
- }
+ notifyDataAvailable = !buffers.isEmpty();
+ }
+ if (notifyDataAvailable) {
+ notifyDataAvailable();
}
return readView;
@@ -283,26 +285,24 @@ class PipelinedSubpartition extends ResultSubpartition {
@Override
public void flush() {
+ final boolean notifyDataAvailable;
synchronized (buffers) {
if (buffers.isEmpty()) {
return;
}
- if (!flushRequested) {
- flushRequested = true; // set this before the notification!
- // if there is more then 1 buffer, we already notified the reader
- // (at the latest when adding the second buffer)
- if (buffers.size() == 1) {
- notifyDataAvailable();
- }
- }
+ // if there is more then 1 buffer, we already notified the reader
+ // (at the latest when adding the second buffer)
+ notifyDataAvailable = !flushRequested && buffers.size() == 1;
+ flushRequested = true;
+ }
+ if (notifyDataAvailable) {
+ notifyDataAvailable();
}
}
- private void maybeNotifyDataAvailable() {
+ private boolean shouldNotifyDataAvailable() {
// Notify only when we added first finished buffer.
- if (getNumberOfFinishedBuffers() == 1) {
- notifyDataAvailable();
- }
+ return readView != null && !flushRequested && getNumberOfFinishedBuffers() == 1;
}
private void notifyDataAvailable() {