You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "eugen yushin (JIRA)" <ji...@apache.org> on 2018/09/25 13:37:00 UTC

[jira] [Created] (FLINK-10422) Follow AWS specs in Kinesis Consumer

eugen yushin created FLINK-10422:
------------------------------------

             Summary: Follow AWS specs in Kinesis Consumer 
                 Key: FLINK-10422
                 URL: https://issues.apache.org/jira/browse/FLINK-10422
             Project: Flink
          Issue Type: Improvement
          Components: Kinesis Connector
    Affects Versions: 1.6.1
            Reporter: eugen yushin


*Related conversation in mailing list:*

[https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E]

*Summary:*

Flink Kinesis consumer checks shards id for a particular pattern:
{noformat}
"^shardId-\\d{12}"
{noformat}
[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java#L132]

While this inlines with current Kinesis streams server implementation (all
 streams follows this pattern), it confronts with AWS docs:

 
{code:java}
ShardId
 The unique identifier of the shard within the stream.
 Type: String
 Length Constraints: Minimum length of 1. Maximum length of 128.
Pattern: [a-zA-Z0-9_.-]+
 Required: Yes
{code}
 

[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html]

*Intention:*
 We have no guarantees and can't rely on patterns other than provided in AWS
 manifest.
 Any custom implementation of Kinesis mock should rely on AWS manifest which
 claims ShardID to be alfanums. This prevents anyone to use Flink with such
 kind of mocks.

The reason behind the scene to use particular pattern "^shardId-
d12" is to create Flink's custom Shard comparator, filter already seen shards, and
 pass latest shard for client.listShards only to limit the scope for RPC
 call to AWS.

In the meantime, I think we can get rid of this logic at all. The current
 usage in project is:
 - fix Kinesalite bug (I've already opened an issue to cover this:
 [https://github.com/mhart/kinesalite/issues/76] and opened PR: [https://github.com/mhart/kinesalite/pull/77]). We can move this logic to
 test code base to keep production code clean for now
 [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L464]

 - adjust last seen shard id. We can simply omit this cause' AWS client
 won't return already seen shards and we will have new ids only or nothing.
 [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L475]
 [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L406]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)