You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2016/09/22 12:55:20 UTC

[jira] [Commented] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

    [ https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15513227#comment-15513227 ] 

Stephan Ewen commented on FLINK-4660:
-------------------------------------

We looked through this a bit, and the problem may be something else. Flink does not close the {{FileSystem}} objects, but it also caches them, so there should be only one {{FileSystem}} object per TaskManager.
The connections you see as open may be {{FsDataInputStream}} connections to S3, reloading the state. Previous versions of Flink did not ensure that the streams were closes in case that the recovery was intercepted by another failure (such as File Not Found due to eventual consistency).

The latest version of Flink more thoroughly closes these streams. Can you check if that fixes your problem?

For the eventual consistency issue, let's continue the discussion in https://issues.apache.org/jira/browse/FLINK-4218

> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-4660
>                 URL: https://issues.apache.org/jira/browse/FLINK-4660
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>            Reporter: Zhenzhong Xu
>            Priority: Critical
>         Attachments: Screen Shot 2016-09-20 at 2.49.14 PM.png, Screen Shot 2016-09-20 at 2.49.32 PM.png
>
>
> Flink job with checkpoints enabled and configured to use S3A file system backend, sometimes experiences checkpointing failure due to S3 consistency issue. This behavior is also reported by other people and documented in https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem implementation, which can potentially leak S3 client connections, and eventually get into a restarting loop with “Timeout waiting for a connection from pool” exception thrown from aws client.
> I looked at the code, seems HadoopFileSystem.java never invoke close() method on fs object upon failure, but the FileSystem may be re-initialized every time the job gets restarted.
> A few evidence I observed:
> 1. When I set the connection pool limit to 128, and below commands shows 128 connections are stuck in CLOSE_WAIT state.
> !Screen Shot 2016-09-20 at 2.49.14 PM.png|align=left, vspace=5! 
> 2. task manager logs indicates that state backend file system consistently getting initialized upon job restarting.
> !Screen Shot 2016-09-20 at 2.49.32 PM.png!
> 3. Log indicates there is NPE during cleanning up of stream task which was caused by “Timeout waiting for connection from pool” exception when trying to create a directory in S3 bucket.
> 2016-09-02 08:17:50,886 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of stream task
> java.lang.NullPointerException
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
> at java.lang.Thread.run(Thread.java:745)
> 4.It appears StreamTask from invoking checkpointing operation, to handling failure, there is no logic associated with closing Hadoop File System object (which internally includes S3 aws client object), which resides in HadoopFileSystem.java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)