You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2020/01/16 09:45:00 UTC

[jira] [Closed] (FLINK-15220) Add startFromTimestamp in KafkaTableSource

     [ https://issues.apache.org/jira/browse/FLINK-15220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dawid Wysakowicz closed FLINK-15220.
------------------------------------
    Fix Version/s: 1.11.0
       Resolution: Fixed

Implemented in 26c908207ae266da9e85d4fd6f5ffc4aacc74f7d

> Add startFromTimestamp in KafkaTableSource
> ------------------------------------------
>
>                 Key: FLINK-15220
>                 URL: https://issues.apache.org/jira/browse/FLINK-15220
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Paul Lin
>            Assignee: Paul Lin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> KafkaTableSource supports all startup modes in DataStream API except `startFromTimestamp`, but `startFromTimestamp` is a common and valid use case in Table/SQL API as well.
>  
> The proposed changes are as follow:
> h3. Table Descriptor
> A new method should be added to Kafka table descriptor:
> ```
> new Kafka().startFromTimestamp(long millisFromEpoch)
> ```
> And the parameter would be milliseconds from epoch to stay aligned with FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).
> Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be deprecated, we can assume users are using Kafka that supports timestamp by default, and throws exceptions if users try to use timestamp startup mode with deprecated Kafka versions during the property validation phase. 
> h3. YAML & DDL
> YAML and DDL use string-based properties to describe tables, and the proposed keys are as follow:
> ```
> 'connector.startup-mode' = 'timestamp',
> 'connector.startup-timestamp-millis' = '1576145410000',
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)