You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nico Kruber (JIRA)" <ji...@apache.org> on 2018/10/09 16:02:00 UTC
[jira] [Commented] (FLINK-10367) Avoid recursion stack overflow
during releasing SingleInputGate
[ https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643673#comment-16643673 ]
Nico Kruber commented on FLINK-10367:
-------------------------------------
As a rough sketch for option 3 (maybe this can be optimised):
{code}
public enum NotificationResult {
NONE(false, false),
BUFFER_USED_FINISHED(true, false),
BUFFER_USED_NEED_MORE(true, true);
private final boolean bufferUsed;
private final boolean needsMoreBuffers;
NotificationResult(boolean bufferUsed, boolean needsMoreBuffers) {
this.bufferUsed = bufferUsed;
this.needsMoreBuffers = needsMoreBuffers;
}
public boolean bufferUsed() {
return bufferUsed;
}
public boolean needsMoreBuffers() {
return needsMoreBuffers;
}
}
@Override
public void recycle(MemorySegment segment) {
BufferListener listener;
NetworkBuffer buffer;
NotificationResult notificationResult = NotificationResult.NONE; // some enum
while (notificationResult.bufferUsed()) {
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
return;
} else {
listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
return;
}
}
}
// We do not know which locks have been acquired before the recycle() or are needed in the
// notification and which other threads also access them.
// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
// Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer
// (either directly or later during error handling) and therefore eventually end up in this
// method again.
if (buffer == null) {
buffer = new NetworkBuffer(segment, this);
}
notificationResult = listener.notifyBufferAvailable(buffer);
if (notificationResult.needsMoreBuffers()) {
synchronized (availableMemorySegments) {
if (isDestroyed) {
// cleanup tasks how they would have been done if we only had one synchronized block
listener.notifyBufferDestroyed();
} else {
registeredListeners.add(listener);
}
}
}
}
}
{code}
Not really nice, but maybe better than adding code that ignores exceptions in particular cases...
> Avoid recursion stack overflow during releasing SingleInputGate
> ---------------------------------------------------------------
>
> Key: FLINK-10367
> URL: https://issues.apache.org/jira/browse/FLINK-10367
> Project: Flink
> Issue Type: Improvement
> Components: Network
> Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Minor
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to release all the input channels, then destroy the {{BufferPool}}. For {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers to the {{BufferPool}} {{which assigns this recycled buffer to the other listeners(RemoteInputChannel}}).
> It may exist recursive call in this process. If the listener is already released before, it will directly recycle this buffer to the {{BufferPool}} which takes another listener to notify available buffer. The above process may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will cause {{StackOverflow}} error because of recursion. And in our testing job, the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
> # When the input channel is released, it should notify the {{BufferPool}} of unregistering this listener, otherwise it is inconsistent between them.
> # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to release all the internal input channels. To do so, all the listeners in {{BufferPool}} will be removed during destroying, and the input channel will not have further interactions during {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to expand another interface method for removing buffer listener, further currently the internal data structure in {{BufferPool}} can not support remove a listener directly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)