You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yuxin Tan (Jira)" <ji...@apache.org> on 2022/03/31 09:17:00 UTC

[jira] [Commented] (FLINK-26555) Missing close in FileSystemJobResultStore

    [ https://issues.apache.org/jira/browse/FLINK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17515188#comment-17515188 ] 

Yuxin Tan commented on FLINK-26555:
-----------------------------------

[~mapohl] , hello, about the issue, we encountered an exception as follows. Could you please help take a look?

When a job is finished in a session cluster, the job result may flush failed.
!image-2022-03-31-16-43-56-322.png|width=819,height=423!
{code:java}
mapper.writeValue(os, new JsonJobResultEntry(jobResultEntry)); {code}
About this line in the patch, I checked the source code and found it called the method 
{code:java}
this._writeValueAndClose(this.createGenerator(out, JsonEncoding.UTF8), value); {code}
 and an _UTF8JsonGenerator_ is inited and used. 
{code:java}
protected final void _writeValueAndClose(JsonGenerator g, Object value) throws IOException {
    SerializationConfig cfg = this.getSerializationConfig();
    if (cfg.isEnabled(SerializationFeature.CLOSE_CLOSEABLE) && value instanceof Closeable) {
        this._writeCloseable(g, value, cfg);
    } else {
        try {
            this._serializerProvider(cfg).serializeValue(g, value);
        } catch (Exception var5) {
            ClassUtil.closeOnFailAndThrowAsIOE(g, var5);
            return;
        }

        g.close();
    }
} {code}
The _UTF8JsonGenerator#close_ will be called finally and I found the OutputStream may be closed in the method when some features of Json generator is enabled.
{code:java}
public void close() throws IOException {
    ...
    if (this._outputStream != null) {
        if (!this._ioContext.isResourceManaged() && !this.isEnabled(Feature.AUTO_CLOSE_TARGET)) {
            if (this.isEnabled(Feature.FLUSH_PASSED_TO_STREAM)) {
                this._outputStream.flush();
            }
        } else {
            this._outputStream.close();
        }
    }
    ...
} {code}
If the output stream is closed after {_}writeValue{_}, the above _ClosedChannelException_ may be thrown when calling the _flush_ method added. 
{code:java}
os.flush(); {code}
I found this patch has changed  the initialization of the output stream to the try-with-resource mode. Generally, the data will be flushed before the file system is closed. Could we delete this line of code  _os.flush();_ to avoid the exception?

[~mapohl] , WDYT about the exception and could you help take a look when having free time? If I missed something, please correct it at any time. Thanks very much.

 

> Missing close in FileSystemJobResultStore
> -----------------------------------------
>
>                 Key: FLINK-26555
>                 URL: https://issues.apache.org/jira/browse/FLINK-26555
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.15.0
>            Reporter: Matthias Pohl
>            Assignee: Matthias Pohl
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>         Attachments: image-2022-03-31-16-39-44-189.png, image-2022-03-31-16-42-56-530.png, image-2022-03-31-16-43-56-322.png
>
>
> {{FileSystemJobResultStore.createDirtyResultInternal}} does not close the opened {{OutputStream}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)