You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (JIRA)" <ji...@apache.org> on 2017/02/22 17:25:44 UTC
[jira] [Resolved] (FLINK-5701) FlinkKafkaProducer should check
asyncException on checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai resolved FLINK-5701.
----------------------------------------
Resolution: Fixed
Assignee: Tzu-Li (Gordon) Tai
Fix Version/s: 1.2.1
1.3.0
Resolved for {{master}} via http://git-wip-us.apache.org/repos/asf/flink/commit/646490c
Resolved for {{release-1.2}} via http://git-wip-us.apache.org/repos/asf/flink/commit/576cc89
> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Streaming Connectors
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Critical
> Fix For: 1.3.0, 1.2.1
>
>
> Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each invoke() and decremented on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the {{pendingRecords == 0}} and {{asyncException == null}} (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the {{snapshotState}} method both before and after flushing and {{pendingRecords}} becomes 0.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)