You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "eugen yushin (JIRA)" <ji...@apache.org> on 2018/09/25 13:37:00 UTC

[jira] [Updated] (FLINK-10422) Follow AWS specs in Kinesis Consumer

     [ https://issues.apache.org/jira/browse/FLINK-10422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

eugen yushin updated FLINK-10422:
---------------------------------
    Labels: pull-request-available  (was: )

> Follow AWS specs in Kinesis Consumer 
> -------------------------------------
>
>                 Key: FLINK-10422
>                 URL: https://issues.apache.org/jira/browse/FLINK-10422
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.6.1
>            Reporter: eugen yushin
>            Priority: Major
>              Labels: pull-request-available
>
> *Related conversation in mailing list:*
> [https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E]
> *Summary:*
> Flink Kinesis consumer checks shards id for a particular pattern:
> {noformat}
> "^shardId-\\d{12}"
> {noformat}
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java#L132]
> While this inlines with current Kinesis streams server implementation (all
>  streams follows this pattern), it confronts with AWS docs:
>  
> {code:java}
> ShardId
>  The unique identifier of the shard within the stream.
>  Type: String
>  Length Constraints: Minimum length of 1. Maximum length of 128.
> Pattern: [a-zA-Z0-9_.-]+
>  Required: Yes
> {code}
>  
> [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html]
> *Intention:*
>  We have no guarantees and can't rely on patterns other than provided in AWS
>  manifest.
>  Any custom implementation of Kinesis mock should rely on AWS manifest which
>  claims ShardID to be alfanums. This prevents anyone to use Flink with such
>  kind of mocks.
> The reason behind the scene to use particular pattern "^shardId-
> d12" is to create Flink's custom Shard comparator, filter already seen shards, and
>  pass latest shard for client.listShards only to limit the scope for RPC
>  call to AWS.
> In the meantime, I think we can get rid of this logic at all. The current
>  usage in project is:
>  - fix Kinesalite bug (I've already opened an issue to cover this:
>  [https://github.com/mhart/kinesalite/issues/76] and opened PR: [https://github.com/mhart/kinesalite/pull/77]). We can move this logic to
>  test code base to keep production code clean for now
>  [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L464]
>  - adjust last seen shard id. We can simply omit this cause' AWS client
>  won't return already seen shards and we will have new ids only or nothing.
>  [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L475]
>  [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L406]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)