You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2018/07/04 21:48:04 UTC

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/6257

    [FLINK-9676][network] clarify contracts of BufferListener#notifyBufferAvailable() and fix a deadlock

    ## What is the purpose of the change
    
    When recycling exclusive buffers of a `RemoteInputChannel` and recycling (other/floating) buffers to the buffer pool concurrently while the `RemoteInputChannel` is registered as a listener to the buffer pool and adding the exclusive buffer triggers a floating buffer to be recycled back to the same
    buffer pool, a deadlock would occur holding locks on `LocalBufferPool#availableMemorySegments` and `RemoteInputChannel#bufferQueue` but acquiring them in reverse order.
    
    One such instance would be (thanks @zhijiangW for finding this):
    
    ```
    Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle floating buffers
     -> lock(LocalBufferPool#availableMemorySegments) -> RemoteInputChannel2#notifyBufferAvailable
     -> try to lock(RemoteInputChannel2#bufferQueue)
    ```
    ```
    Task thread -> RemoteInputChannel2#recycle
     -> lock(RemoteInputChannel2#bufferQueue) -> bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
     -> try to lock(LocalBufferPool#availableMemorySegments)
    ```
    
    This PR is a second approach to #6254 and solves the deadlock on the `LocalBufferPool` side as the other solution turned out to be even more complex than what's already in the PR (I'll update that PR in a second).
    @pnowojski and @tillrohrmann can you also have a quick look so that this can get into 1.5.1?
    
    ## Brief change log
    
    - clarify the contract of `BufferListener#notifyBufferAvailable()` (see in the code)
    - make sure that `LocalBufferPool#recycle()` does not break this contract, i.e. call the listener's callback outside the lock around `LocalBufferPool#availableMemorySegments`
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - added `RemoteInputChannelTest#testConcurrentRecycleAndRelease2` which catches this deadlock quite quickly
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no** (per buffer, but we're only moving recycling out of the synchronized block so if there's any effect, it should be positive)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
      - The S3 file system connector: **no**
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **JavaDocs**


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-9676-lbp

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6257.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6257
    
----
commit d69ff968f8647efa13adcc6f338e411675b36d68
Author: Nico Kruber <ni...@...>
Date:   2018-07-04T15:45:18Z

    [FLINK-9676][network] clarify contracts of BufferListener#notifyBufferAvailable() and fix a deadlock
    
    When recycling exclusive buffers of a RemoteInputChannel and recycling
    (other/floating) buffers to the buffer pool concurrently while the
    RemoteInputChannel is registered as a listener to the buffer pool and adding the
    exclusive buffer triggers a floating buffer to be recycled back to the same
    buffer pool, a deadlock would occur holding locks on
    LocalBufferPool#availableMemorySegments and RemoteInputChannel#bufferQueue but
    acquiring them in reverse order.
    
    One such instance would be:
    
    Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle floating buffers
    	-> lock(LocalBufferPool#availableMemorySegments) -> RemoteInputChannel2#notifyBufferAvailable
    	-> try to lock(RemoteInputChannel2#bufferQueue)
    
    Task thread -> RemoteInputChannel2#recycle
    	-> lock(RemoteInputChannel2#bufferQueue) -> bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
    	-> try to lock(LocalBufferPool#availableMemorySegments)
    
    Therefore, we decouple the listener callback from lock around
    LocalBufferPool#availableMemorySegments and implicitly enforce that
    RemoteInputChannel2#bufferQueue takes precedence over this lock, i.e. must
    be acquired first and should never be taken after having locked on
    LocalBufferPool#availableMemorySegments.

----


---

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6257#discussion_r200283214
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
    @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
     
     	@Override
     	public void recycle(MemorySegment segment) {
    +		BufferListener listener;
     		synchronized (availableMemorySegments) {
     			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
     				returnMemorySegment(segment);
    +				return;
     			}
     			else {
    -				BufferListener listener = registeredListeners.poll();
    +				listener = registeredListeners.poll();
     
     				if (listener == null) {
     					availableMemorySegments.add(segment);
     					availableMemorySegments.notify();
    +					return;
     				}
    -				else {
    -					try {
    -						boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    -						if (needMoreBuffers) {
    -							registeredListeners.add(listener);
    -						}
    +			}
    +		}
    +
    +		// We do not know which locks have been acquired before the recycle() or are needed in the
    +		// notification and which other threads also access them.
    +		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
    +		boolean success = false;
    +		boolean needMoreBuffers = false;
    +		try {
    +			needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    +			success = true;
    +		} catch (Throwable ignored) {
    +			// handled below, under the lock
    +		}
    +
    +		if (!success || needMoreBuffers) {
    +			synchronized (availableMemorySegments) {
    +				if (isDestroyed) {
    +					// cleanup tasks how they would have been done if we only had one synchronized block
    +					if (needMoreBuffers) {
    +						listener.notifyBufferDestroyed();
    --- End diff --
    
    actually, let's do this in a follow-up PR


---

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6257#discussion_r200274433
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
    @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
     
     	@Override
     	public void recycle(MemorySegment segment) {
    +		BufferListener listener;
     		synchronized (availableMemorySegments) {
     			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
     				returnMemorySegment(segment);
    +				return;
     			}
     			else {
    -				BufferListener listener = registeredListeners.poll();
    +				listener = registeredListeners.poll();
     
     				if (listener == null) {
     					availableMemorySegments.add(segment);
     					availableMemorySegments.notify();
    +					return;
     				}
    -				else {
    -					try {
    -						boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    -						if (needMoreBuffers) {
    -							registeredListeners.add(listener);
    -						}
    +			}
    +		}
    +
    +		// We do not know which locks have been acquired before the recycle() or are needed in the
    +		// notification and which other threads also access them.
    +		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
    +		boolean success = false;
    +		boolean needMoreBuffers = false;
    +		try {
    +			needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    +			success = true;
    +		} catch (Throwable ignored) {
    +			// handled below, under the lock
    +		}
    +
    +		if (!success || needMoreBuffers) {
    +			synchronized (availableMemorySegments) {
    +				if (isDestroyed) {
    +					// cleanup tasks how they would have been done if we only had one synchronized block
    +					if (needMoreBuffers) {
    +						listener.notifyBufferDestroyed();
    --- End diff --
    
    true, this ~could~ should be done outside the lock as well


---

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6257


---

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6257#discussion_r200251500
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
    @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
     
     	@Override
     	public void recycle(MemorySegment segment) {
    +		BufferListener listener;
     		synchronized (availableMemorySegments) {
     			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
     				returnMemorySegment(segment);
    +				return;
     			}
     			else {
    -				BufferListener listener = registeredListeners.poll();
    +				listener = registeredListeners.poll();
     
     				if (listener == null) {
     					availableMemorySegments.add(segment);
     					availableMemorySegments.notify();
    +					return;
     				}
    -				else {
    -					try {
    -						boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    -						if (needMoreBuffers) {
    -							registeredListeners.add(listener);
    -						}
    +			}
    +		}
    +
    +		// We do not know which locks have been acquired before the recycle() or are needed in the
    +		// notification and which other threads also access them.
    +		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
    +		boolean success = false;
    +		boolean needMoreBuffers = false;
    +		try {
    +			needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    +			success = true;
    +		} catch (Throwable ignored) {
    +			// handled below, under the lock
    +		}
    +
    +		if (!success || needMoreBuffers) {
    +			synchronized (availableMemorySegments) {
    +				if (isDestroyed) {
    +					// cleanup tasks how they would have been done if we only had one synchronized block
    +					if (needMoreBuffers) {
    +						listener.notifyBufferDestroyed();
    --- End diff --
    
    Currently `notifyBufferDestroyed` will do nothing, and we should be careful if implement this method future similar with `notifyBufferAvailable`.


---

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6257#discussion_r200287730
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
    @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
     
     	@Override
     	public void recycle(MemorySegment segment) {
    +		BufferListener listener;
     		synchronized (availableMemorySegments) {
     			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
     				returnMemorySegment(segment);
    +				return;
     			}
     			else {
    -				BufferListener listener = registeredListeners.poll();
    +				listener = registeredListeners.poll();
     
     				if (listener == null) {
     					availableMemorySegments.add(segment);
     					availableMemorySegments.notify();
    +					return;
     				}
    -				else {
    -					try {
    -						boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    -						if (needMoreBuffers) {
    -							registeredListeners.add(listener);
    -						}
    +			}
    +		}
    +
    +		// We do not know which locks have been acquired before the recycle() or are needed in the
    +		// notification and which other threads also access them.
    +		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
    +		boolean success = false;
    +		boolean needMoreBuffers = false;
    +		try {
    +			needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    +			success = true;
    +		} catch (Throwable ignored) {
    +			// handled below, under the lock
    +		}
    +
    +		if (!success || needMoreBuffers) {
    +			synchronized (availableMemorySegments) {
    +				if (isDestroyed) {
    +					// cleanup tasks how they would have been done if we only had one synchronized block
    +					if (needMoreBuffers) {
    +						listener.notifyBufferDestroyed();
     					}
    -					catch (Throwable ignored) {
    -						availableMemorySegments.add(segment);
    -						availableMemorySegments.notify();
    --- End diff --
    
    👍


---

[GitHub] flink issue #6257: [FLINK-9676][network] clarify contracts of BufferListener...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/6257
  
    Thanks for fixing this bug, and we also solve this problem in this way. 
    
    This solution seems more lightweight than the way in [6254](https://github.com/apache/flink/pull/6254), and I also think the lock adjusting in `6254` has reference values.


---

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6257#discussion_r200252253
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
    @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
     
     	@Override
     	public void recycle(MemorySegment segment) {
    +		BufferListener listener;
     		synchronized (availableMemorySegments) {
     			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
     				returnMemorySegment(segment);
    +				return;
     			}
     			else {
    -				BufferListener listener = registeredListeners.poll();
    +				listener = registeredListeners.poll();
     
     				if (listener == null) {
     					availableMemorySegments.add(segment);
     					availableMemorySegments.notify();
    +					return;
     				}
    -				else {
    -					try {
    -						boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    -						if (needMoreBuffers) {
    -							registeredListeners.add(listener);
    -						}
    +			}
    +		}
    +
    +		// We do not know which locks have been acquired before the recycle() or are needed in the
    +		// notification and which other threads also access them.
    +		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
    +		boolean success = false;
    +		boolean needMoreBuffers = false;
    +		try {
    +			needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    +			success = true;
    +		} catch (Throwable ignored) {
    +			// handled below, under the lock
    +		}
    +
    +		if (!success || needMoreBuffers) {
    +			synchronized (availableMemorySegments) {
    +				if (isDestroyed) {
    +					// cleanup tasks how they would have been done if we only had one synchronized block
    +					if (needMoreBuffers) {
    +						listener.notifyBufferDestroyed();
     					}
    -					catch (Throwable ignored) {
    -						availableMemorySegments.add(segment);
    -						availableMemorySegments.notify();
    --- End diff --
    
    I am wondering whether we should rethrow this exception under below handling in the end.
    
    For example: During `RemoteInputChannel#notifyBufferAvailable`, if the tag of  `isWaitingForFloatingBuffers` is not consistent, we should throw this exception to trigger failover, otherwise we can not find the potential bug.


---

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6257#discussion_r200287796
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
    @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
     
     	@Override
     	public void recycle(MemorySegment segment) {
    +		BufferListener listener;
     		synchronized (availableMemorySegments) {
     			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
     				returnMemorySegment(segment);
    +				return;
     			}
     			else {
    -				BufferListener listener = registeredListeners.poll();
    +				listener = registeredListeners.poll();
     
     				if (listener == null) {
     					availableMemorySegments.add(segment);
     					availableMemorySegments.notify();
    +					return;
     				}
    -				else {
    -					try {
    -						boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    -						if (needMoreBuffers) {
    -							registeredListeners.add(listener);
    -						}
    +			}
    +		}
    +
    +		// We do not know which locks have been acquired before the recycle() or are needed in the
    +		// notification and which other threads also access them.
    +		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
    +		boolean success = false;
    +		boolean needMoreBuffers = false;
    +		try {
    +			needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    +			success = true;
    +		} catch (Throwable ignored) {
    +			// handled below, under the lock
    +		}
    +
    +		if (!success || needMoreBuffers) {
    +			synchronized (availableMemorySegments) {
    +				if (isDestroyed) {
    +					// cleanup tasks how they would have been done if we only had one synchronized block
    +					if (needMoreBuffers) {
    +						listener.notifyBufferDestroyed();
    --- End diff --
    
    👍


---

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6257#discussion_r200274038
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---
    @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
     
     	@Override
     	public void recycle(MemorySegment segment) {
    +		BufferListener listener;
     		synchronized (availableMemorySegments) {
     			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
     				returnMemorySegment(segment);
    +				return;
     			}
     			else {
    -				BufferListener listener = registeredListeners.poll();
    +				listener = registeredListeners.poll();
     
     				if (listener == null) {
     					availableMemorySegments.add(segment);
     					availableMemorySegments.notify();
    +					return;
     				}
    -				else {
    -					try {
    -						boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    -						if (needMoreBuffers) {
    -							registeredListeners.add(listener);
    -						}
    +			}
    +		}
    +
    +		// We do not know which locks have been acquired before the recycle() or are needed in the
    +		// notification and which other threads also access them.
    +		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
    +		boolean success = false;
    +		boolean needMoreBuffers = false;
    +		try {
    +			needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    +			success = true;
    +		} catch (Throwable ignored) {
    +			// handled below, under the lock
    +		}
    +
    +		if (!success || needMoreBuffers) {
    +			synchronized (availableMemorySegments) {
    +				if (isDestroyed) {
    +					// cleanup tasks how they would have been done if we only had one synchronized block
    +					if (needMoreBuffers) {
    +						listener.notifyBufferDestroyed();
     					}
    -					catch (Throwable ignored) {
    -						availableMemorySegments.add(segment);
    -						availableMemorySegments.notify();
    --- End diff --
    
    true, that's why I created FLINK-9755 for this issue and already have code (have to add tests though) - expect a PR soon


---