You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/02/06 16:55:42 UTC

[jira] [Commented] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints

    [ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15854349#comment-15854349 ] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3278

    [FLINK-5701] [kafka] FlinkKafkaProducer should check asyncException on checkpoints

    Prior to this change, at-least-once is violated for the FlinkKafkaProducer because we were not failing checkpoints if there were async exceptions from the Kafka producer.
    
    With this PR, on `snapshotState()`, we fail if there previously were async exceptions. We also fail if the flushed records on checkpoint resulted in exceptions.
    
    This PR also improves the tests in `FlinkKafkaProducerBaseTest` to use one-shot latches instead of sleeping for more stable tests. It also removes the test `testAtLeastOnceProducerFailsIfFlushingDisabled()`. My reasoning is that essentially, you _might_ still have at-least-once even if flushing is disabled (i.e. always no pending records to flush on checkpoint), so I don't see the necessity in having that test. I'm open to discussing the removal of that test and adding it back if others think it's necessary.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5701

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3278.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3278
    
----
commit c3eb0a905b86c6265af91c4bda7d2c9da2dc6ce2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-02-06T16:37:13Z

    [FLINK-5701] [kafka] FlinkKafkaProducer should check asyncException on checkpoints

----


> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each invoke() and decremented on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the {{pendingRecords == 0}} and {{asyncException == null}} (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the {{snapshotState}} method both before and after flushing and {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)