You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/23 08:11:00 UTC

[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

    [ https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335479#comment-16335479 ] 

ASF GitHub Bot commented on FLINK-8484:
---------------------------------------

GitHub user pluppens opened a pull request:

    https://github.com/apache/flink/pull/5337

    [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis consumer snapshot restoration is able to handle recently closed shards

    FLINK-8484: ensure that a state change in the StreamShardMetadata other than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not result in the shard not being able to be restored. This handles the corner case where a shard might have been closed (ending sequence number set to not-null) since the last savepoint or checkpoint when a job is restarted from a snapshot state.
    
    ## Brief change log
     - Created a new method to perform the sequence number lookup
     - Ensure that a lookup for a given existing Kinesis shard does not rely on equals(), but rather checks for equality on the stream name and shard id only
    
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
     - A new unit test was added in `FlinkKinesisConsumerTest` called `testFindSequenceNumberToRestoreFrom()` which tests the lookup mechanism
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pluppens/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5337.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5337
    
----
commit 5c756390002a2e1c00c7368bea3e1135b7722a20
Author: Philip Luppens <ph...@...>
Date:   2018-01-23T08:00:23Z

    FLINK-8484: ensure that a state change in the StreamShardMetadata other than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not result in the shard not being able to be restored. This handles the corner case where a shard might have been closed (ending sequence number set to not-null) since the last savepoint or checkpoint when a job is restarted from a snapshot state.

----


> Kinesis consumer re-reads closed shards on job restart
> ------------------------------------------------------
>
>                 Key: FLINK-8484
>                 URL: https://issues.apache.org/jira/browse/FLINK-8484
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>    Affects Versions: 1.3.2
>            Reporter: Philip Luppens
>            Priority: Major
>              Labels: bug, flink, kinesis
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis stream up and down during peak times. What we’ve noticed is that, while we were having closed shards, any Flink job restart with check- or save-point would result in shards being re-read from the event horizon, duplicating our events.
>  
> We started checking the checkpoint state, and found that the shards were stored correctly with the proper sequence number (including for closed shards), but that upon restarts, the older closed shards would be read from the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned from the KinesisDataFetcher against the shards’ metadata from the restoration point, but we do this via a containsKey() call, which means we’ll use the StreamShardMetadata’s equals() method. However, this checks for all properties, including the endingSequenceNumber, which might have changed between the restored state’s checkpoint and our data fetch, thus failing the equality check, failing the containsKey() check, and resulting in the shard being re-read from the event horizon, even though it was present in the restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream name to restore the state of the shards we’ve already seen, and this seems to work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)