You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/06/29 08:24:18 UTC

[jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

    [ https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354779#comment-15354779 ] 

ASF GitHub Bot commented on FLINK-4080:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2180

    [FLINK-4080][kinesis-connector] Guarantee exactly-once for Kinesis consumer when fail in middle of aggregated records

    If multiple Kinesis records were aggregated into a single record by KPL, when deaggregated at the consumer, all deaggregated subrecords will have the same sequence number. This breaks the exactly-once guarantee of the `FlinkKinesisConsumer` if it happens to fail while we are still in the middle of processing a deaggregated records (the first record's sequence number will incorrectly mark the whole batch of aggregated records as processed).
    
    To fix this, this PR changes the snapshot state type of `FlinkKinesisConsumer` from `HashMap<KinesisStreamShard, String>` to `HashMap<KinesisStreamShard, SequenceNumber>`.
    `SequenceNumber` is a new class to represent a combination of a "main sequence number" and a "subsequence number". When the `ShardConsumerThread` starts consuming records, we check if the last record after restore was a aggregated record. If yes, we first handle the dangling subrecords.
    
    @rmetzger I'm adding this change to the original Kinesis connector in `master` instead of waiting for the big PR #2131 to be merged, because I think this is a bug we must fix before 1.1, and I'm not sure if #2131 will be merged before the RC for 1.1 comes out. Depending on whether #2131 or this PR gets merged first, I'll rebase the other one accordingly.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4080

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2180
    
----
commit a9593677a4c73c9475b1e85204002d6470f2115a
Author: Gordon Tai <tz...@gmail.com>
Date:   2016-06-29T07:46:35Z

    [FLINK-4080] Guarantee exactly-once for Kinesis consumer for failures in the middle of aggregated records

----


> Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-4080
>                 URL: https://issues.apache.org/jira/browse/FLINK-4080
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>             Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after several tries.
> Kinesis records of the same aggregated batch will have the same sequence number, and different sub-sequence numbers (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). The current code of the consumer is committing state every time it finishes processing a record, even de-aggregated ones. This is a bug since this will incorrectly mark all remaining records of the de-aggregated batch as processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records (either non- or de-aggregated) instead of the basic `Record` class. This gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to indicate that the last seen sequence number of a shard may be a de-aggregated shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record of the 5th record was last seen for shard 0. On restore, we start again from record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 we start from record 3 since record 2 is non-aggregated and already fully processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)