You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nico Kruber (JIRA)" <ji...@apache.org> on 2018/10/09 16:02:00 UTC

[jira] [Commented] (FLINK-10367) Avoid recursion stack overflow during releasing SingleInputGate

    [ https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643673#comment-16643673 ] 

Nico Kruber commented on FLINK-10367:
-------------------------------------

As a rough sketch for option 3 (maybe this can be optimised):
{code}
	public enum NotificationResult {
		NONE(false, false),
		BUFFER_USED_FINISHED(true, false),
		BUFFER_USED_NEED_MORE(true, true);

		private final boolean bufferUsed;
		private final boolean needsMoreBuffers;

		NotificationResult(boolean bufferUsed, boolean needsMoreBuffers) {
			this.bufferUsed = bufferUsed;
			this.needsMoreBuffers = needsMoreBuffers;
		}

		public boolean bufferUsed() {
			return bufferUsed;
		}

		public boolean needsMoreBuffers() {
			return needsMoreBuffers;
		}
	}

	@Override
	public void recycle(MemorySegment segment) {
		BufferListener listener;
		NetworkBuffer buffer;
		NotificationResult notificationResult = NotificationResult.NONE; // some enum
		while (notificationResult.bufferUsed()) {
			synchronized (availableMemorySegments) {
				if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
					returnMemorySegment(segment);
					return;
				} else {
					listener = registeredListeners.poll();

					if (listener == null) {
						availableMemorySegments.add(segment);
						availableMemorySegments.notify();
						return;
					}
				}
			}

			// We do not know which locks have been acquired before the recycle() or are needed in the
			// notification and which other threads also access them.
			// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
			// Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer
			// (either directly or later during error handling) and therefore eventually end up in this
			// method again.
			if (buffer == null) {
				buffer = new NetworkBuffer(segment, this);
			}
			notificationResult = listener.notifyBufferAvailable(buffer);

			if (notificationResult.needsMoreBuffers()) {
				synchronized (availableMemorySegments) {
					if (isDestroyed) {
						// cleanup tasks how they would have been done if we only had one synchronized block
						listener.notifyBufferDestroyed();
					} else {
						registeredListeners.add(listener);
					}
				}
			}
		}
	}
{code}

Not really nice, but maybe better than adding code that ignores exceptions in particular cases...

> Avoid recursion stack overflow during releasing SingleInputGate
> ---------------------------------------------------------------
>
>                 Key: FLINK-10367
>                 URL: https://issues.apache.org/jira/browse/FLINK-10367
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to release all the input channels, then destroy the {{BufferPool}}.  For {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers to the {{BufferPool}} {{which assigns this recycled buffer to the other listeners(RemoteInputChannel}}). 
> It may exist recursive call in this process. If the listener is already released before, it will directly recycle this buffer to the {{BufferPool}} which takes another listener to notify available buffer. The above process may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will cause {{StackOverflow}} error because of recursion. And in our testing job, the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
>  # When the input channel is released, it should notify the {{BufferPool}} of unregistering this listener, otherwise it is inconsistent between them.
>  # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to release all the internal input channels. To do so, all the listeners in {{BufferPool}} will be removed during destroying, and the input channel will not have further interactions during {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to expand another interface method for removing buffer listener, further currently the internal data structure in {{BufferPool}} can not support remove a listener directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)