You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chris Slotterback (JIRA)" <ji...@apache.org> on 2019/05/08 16:50:00 UTC

[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

    [ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835749#comment-16835749 ] 

Chris Slotterback commented on FLINK-10455:
-------------------------------------------

The way I was able to reproduce this was forcing the job into a failed state by timing out writing a checkpoint on the filesystem. We occasionally see latency spikes in our env resulting in these job restarts. Most of the jobs are able to recover fine, but when using exactly once for the kafka producer the job gets stuck in this loop. My assumption is any job failure will reach the client processDisconnect method after the class loader is gone.

> Potential Kafka producer leak in case of failures
> -------------------------------------------------
>
>                 Key: FLINK-10455
>                 URL: https://issues.apache.org/jira/browse/FLINK-10455
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.5.2
>            Reporter: Nico Kruber
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we may get an {{ProducerFencedException}}. Documentation around {{ProducerFencedException}} explicitly states that we should close the producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} simply iterates over the {{pendingCommitTransactions}} which is not touched during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources from the previous attempt will still linger around.
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)