You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2018/07/04 21:48:04 UTC
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
GitHub user NicoK opened a pull request:
https://github.com/apache/flink/pull/6257
[FLINK-9676][network] clarify contracts of BufferListener#notifyBufferAvailable() and fix a deadlock
## What is the purpose of the change
When recycling exclusive buffers of a `RemoteInputChannel` and recycling (other/floating) buffers to the buffer pool concurrently while the `RemoteInputChannel` is registered as a listener to the buffer pool and adding the exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on `LocalBufferPool#availableMemorySegments` and `RemoteInputChannel#bufferQueue` but acquiring them in reverse order.
One such instance would be (thanks @zhijiangW for finding this):
```
Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle floating buffers
-> lock(LocalBufferPool#availableMemorySegments) -> RemoteInputChannel2#notifyBufferAvailable
-> try to lock(RemoteInputChannel2#bufferQueue)
```
```
Task thread -> RemoteInputChannel2#recycle
-> lock(RemoteInputChannel2#bufferQueue) -> bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
-> try to lock(LocalBufferPool#availableMemorySegments)
```
This PR is a second approach to #6254 and solves the deadlock on the `LocalBufferPool` side as the other solution turned out to be even more complex than what's already in the PR (I'll update that PR in a second).
@pnowojski and @tillrohrmann can you also have a quick look so that this can get into 1.5.1?
## Brief change log
- clarify the contract of `BufferListener#notifyBufferAvailable()` (see in the code)
- make sure that `LocalBufferPool#recycle()` does not break this contract, i.e. call the listener's callback outside the lock around `LocalBufferPool#availableMemorySegments`
## Verifying this change
This change added tests and can be verified as follows:
- added `RemoteInputChannelTest#testConcurrentRecycleAndRelease2` which catches this deadlock quite quickly
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no** (per buffer, but we're only moving recycling out of the synchronized block so if there's any effect, it should be positive)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **JavaDocs**
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/NicoK/flink flink-9676-lbp
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6257.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6257
----
commit d69ff968f8647efa13adcc6f338e411675b36d68
Author: Nico Kruber <ni...@...>
Date: 2018-07-04T15:45:18Z
[FLINK-9676][network] clarify contracts of BufferListener#notifyBufferAvailable() and fix a deadlock
When recycling exclusive buffers of a RemoteInputChannel and recycling
(other/floating) buffers to the buffer pool concurrently while the
RemoteInputChannel is registered as a listener to the buffer pool and adding the
exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on
LocalBufferPool#availableMemorySegments and RemoteInputChannel#bufferQueue but
acquiring them in reverse order.
One such instance would be:
Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle floating buffers
-> lock(LocalBufferPool#availableMemorySegments) -> RemoteInputChannel2#notifyBufferAvailable
-> try to lock(RemoteInputChannel2#bufferQueue)
Task thread -> RemoteInputChannel2#recycle
-> lock(RemoteInputChannel2#bufferQueue) -> bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
-> try to lock(LocalBufferPool#availableMemorySegments)
Therefore, we decouple the listener callback from lock around
LocalBufferPool#availableMemorySegments and implicitly enforce that
RemoteInputChannel2#bufferQueue takes precedence over this lock, i.e. must
be acquired first and should never be taken after having locked on
LocalBufferPool#availableMemorySegments.
----
---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/6257#discussion_r200283214
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
+ BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
+ return;
}
else {
- BufferListener listener = registeredListeners.poll();
+ listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+ return;
}
- else {
- try {
- boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (needMoreBuffers) {
- registeredListeners.add(listener);
- }
+ }
+ }
+
+ // We do not know which locks have been acquired before the recycle() or are needed in the
+ // notification and which other threads also access them.
+ // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
+ boolean success = false;
+ boolean needMoreBuffers = false;
+ try {
+ needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+ success = true;
+ } catch (Throwable ignored) {
+ // handled below, under the lock
+ }
+
+ if (!success || needMoreBuffers) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed) {
+ // cleanup tasks how they would have been done if we only had one synchronized block
+ if (needMoreBuffers) {
+ listener.notifyBufferDestroyed();
--- End diff --
actually, let's do this in a follow-up PR
---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/6257#discussion_r200274433
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
+ BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
+ return;
}
else {
- BufferListener listener = registeredListeners.poll();
+ listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+ return;
}
- else {
- try {
- boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (needMoreBuffers) {
- registeredListeners.add(listener);
- }
+ }
+ }
+
+ // We do not know which locks have been acquired before the recycle() or are needed in the
+ // notification and which other threads also access them.
+ // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
+ boolean success = false;
+ boolean needMoreBuffers = false;
+ try {
+ needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+ success = true;
+ } catch (Throwable ignored) {
+ // handled below, under the lock
+ }
+
+ if (!success || needMoreBuffers) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed) {
+ // cleanup tasks how they would have been done if we only had one synchronized block
+ if (needMoreBuffers) {
+ listener.notifyBufferDestroyed();
--- End diff --
true, this ~could~ should be done outside the lock as well
---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6257
---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:
https://github.com/apache/flink/pull/6257#discussion_r200251500
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
+ BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
+ return;
}
else {
- BufferListener listener = registeredListeners.poll();
+ listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+ return;
}
- else {
- try {
- boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (needMoreBuffers) {
- registeredListeners.add(listener);
- }
+ }
+ }
+
+ // We do not know which locks have been acquired before the recycle() or are needed in the
+ // notification and which other threads also access them.
+ // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
+ boolean success = false;
+ boolean needMoreBuffers = false;
+ try {
+ needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+ success = true;
+ } catch (Throwable ignored) {
+ // handled below, under the lock
+ }
+
+ if (!success || needMoreBuffers) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed) {
+ // cleanup tasks how they would have been done if we only had one synchronized block
+ if (needMoreBuffers) {
+ listener.notifyBufferDestroyed();
--- End diff --
Currently `notifyBufferDestroyed` will do nothing, and we should be careful if implement this method future similar with `notifyBufferAvailable`.
---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:
https://github.com/apache/flink/pull/6257#discussion_r200287730
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
+ BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
+ return;
}
else {
- BufferListener listener = registeredListeners.poll();
+ listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+ return;
}
- else {
- try {
- boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (needMoreBuffers) {
- registeredListeners.add(listener);
- }
+ }
+ }
+
+ // We do not know which locks have been acquired before the recycle() or are needed in the
+ // notification and which other threads also access them.
+ // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
+ boolean success = false;
+ boolean needMoreBuffers = false;
+ try {
+ needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+ success = true;
+ } catch (Throwable ignored) {
+ // handled below, under the lock
+ }
+
+ if (!success || needMoreBuffers) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed) {
+ // cleanup tasks how they would have been done if we only had one synchronized block
+ if (needMoreBuffers) {
+ listener.notifyBufferDestroyed();
}
- catch (Throwable ignored) {
- availableMemorySegments.add(segment);
- availableMemorySegments.notify();
--- End diff --
👍
---
[GitHub] flink issue #6257: [FLINK-9676][network] clarify contracts of BufferListener...
Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:
https://github.com/apache/flink/pull/6257
Thanks for fixing this bug, and we also solve this problem in this way.
This solution seems more lightweight than the way in [6254](https://github.com/apache/flink/pull/6254), and I also think the lock adjusting in `6254` has reference values.
---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:
https://github.com/apache/flink/pull/6257#discussion_r200252253
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
+ BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
+ return;
}
else {
- BufferListener listener = registeredListeners.poll();
+ listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+ return;
}
- else {
- try {
- boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (needMoreBuffers) {
- registeredListeners.add(listener);
- }
+ }
+ }
+
+ // We do not know which locks have been acquired before the recycle() or are needed in the
+ // notification and which other threads also access them.
+ // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
+ boolean success = false;
+ boolean needMoreBuffers = false;
+ try {
+ needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+ success = true;
+ } catch (Throwable ignored) {
+ // handled below, under the lock
+ }
+
+ if (!success || needMoreBuffers) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed) {
+ // cleanup tasks how they would have been done if we only had one synchronized block
+ if (needMoreBuffers) {
+ listener.notifyBufferDestroyed();
}
- catch (Throwable ignored) {
- availableMemorySegments.add(segment);
- availableMemorySegments.notify();
--- End diff --
I am wondering whether we should rethrow this exception under below handling in the end.
For example: During `RemoteInputChannel#notifyBufferAvailable`, if the tag of `isWaitingForFloatingBuffers` is not consistent, we should throw this exception to trigger failover, otherwise we can not find the potential bug.
---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:
https://github.com/apache/flink/pull/6257#discussion_r200287796
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
+ BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
+ return;
}
else {
- BufferListener listener = registeredListeners.poll();
+ listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+ return;
}
- else {
- try {
- boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (needMoreBuffers) {
- registeredListeners.add(listener);
- }
+ }
+ }
+
+ // We do not know which locks have been acquired before the recycle() or are needed in the
+ // notification and which other threads also access them.
+ // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
+ boolean success = false;
+ boolean needMoreBuffers = false;
+ try {
+ needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+ success = true;
+ } catch (Throwable ignored) {
+ // handled below, under the lock
+ }
+
+ if (!success || needMoreBuffers) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed) {
+ // cleanup tasks how they would have been done if we only had one synchronized block
+ if (needMoreBuffers) {
+ listener.notifyBufferDestroyed();
--- End diff --
👍
---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/6257#discussion_r200274038
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
@Override
public void recycle(MemorySegment segment) {
+ BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
+ return;
}
else {
- BufferListener listener = registeredListeners.poll();
+ listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+ return;
}
- else {
- try {
- boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (needMoreBuffers) {
- registeredListeners.add(listener);
- }
+ }
+ }
+
+ // We do not know which locks have been acquired before the recycle() or are needed in the
+ // notification and which other threads also access them.
+ // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
+ boolean success = false;
+ boolean needMoreBuffers = false;
+ try {
+ needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+ success = true;
+ } catch (Throwable ignored) {
+ // handled below, under the lock
+ }
+
+ if (!success || needMoreBuffers) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed) {
+ // cleanup tasks how they would have been done if we only had one synchronized block
+ if (needMoreBuffers) {
+ listener.notifyBufferDestroyed();
}
- catch (Throwable ignored) {
- availableMemorySegments.add(segment);
- availableMemorySegments.notify();
--- End diff --
true, that's why I created FLINK-9755 for this issue and already have code (have to add tests though) - expect a PR soon
---