You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (JIRA)" <ji...@apache.org> on 2016/04/16 05:47:25 UTC

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

    [ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15243991#comment-15243991 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3229:
--------------------------------------------

(Duplicate comment from FLINK-3211. Posting it here also to keep the issue updated.)

https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis

Here is the initial working version of FlinkKinesisConsumer that I have been testing in off-production environments, updated corresponding to the recent Flink 1.0 changes.
I'm still refactoring the code just a bit for easier unit tests. The PR will be very soon.

> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-3229
>                 URL: https://issues.apache.org/jira/browse/FLINK-3229
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming Connectors
>    Affects Versions: 1.0.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties config = new Properties();
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_RETRIES, "3");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_BACKOFF_MILLIS, "1000");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_START_POSITION_TYPE, "latest");
> config.put(FlinkKinesisConsumer.CONFIG_AWS_REGION, "us-east-1");
> AWSCredentialsProvider credentials = // credentials API in AWS SDK
> DataStream<T> kinesisRecords = env
>     .addSource(new FlinkKinesisConsumer<>(
>         listOfStreams, credentials, new SimpleStringSchema(), config
>     ));
> {code}
> Currently still considering which read start positions to support ("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER"). The discussions for this can be found in https://issues.apache.org/jira/browser/FLINK-3211.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)