You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Paul Lin (Jira)" <ji...@apache.org> on 2020/01/12 14:38:00 UTC

[jira] [Updated] (FLINK-15220) Add startFromTimestamp in KafkaTableSource

     [ https://issues.apache.org/jira/browse/FLINK-15220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Paul Lin updated FLINK-15220:
-----------------------------
    Description: 
KafkaTableSource supports all startup modes in DataStream API except `startFromTimestamp`, but `startFromTimestamp` is a common and valid use case in Table/SQL API as well.

 

The proposed changes are as follow:
h3. Table Descriptor

A new method should be added to Kafka table descriptor:

```

new Kafka().startFromTimestamp(long millisFromEpoch)

```

And the parameter would be milliseconds from epoch to stay aligned with FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).

Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be deprecated, we can assume users are using Kafka that supports timestamp by default, and throws exceptions if users try to use timestamp startup mode with deprecated Kafka versions during the property validation phase. 
h3. YAML & DDL

YAML and DDL use string-based properties to describe tables, and the proposed keys are as follow:

```

'connector.startup-mode' = 'timestamp',

'connector.startup-timestamp-millis' = '1576145410000',

```

  was:
KafkaTableSource supports all startup modes in DataStream API except `startFromTimestamp`, but `startFromTimestamp` is a common and valid use case in Table/SQL API as well.

 

The proposed changes are as follow:
h3. Table Descriptor

A new method should be added to Kafka table descriptor:

```

new Kafka().startFromTimestamp(long millisFromEpoch)

```

And the parameter would be milliseconds from epoch to stay aligned with FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).

Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be deprecated, we can assume users are using Kafka that supports timestamp by default, and throws exceptions if users try to use timestamp startup mode with deprecated Kafka versions during the property validation phase. 
h3. YAML & DDL

YAML and DDL use string-based properties to describe tables, and the proposed keys are as follow:

```

'connector.startup-mode' = 'timestamp',

'connector.startup-timestamp-millis' = '1576145410000',

'connector.startup-timestamp' = '2019-12-12 10:11:23.123'

```

The timestamp would need to be in form of milliseconds from epoch or "yyyy-MM-dd HH:mm:ss[.SSS]". If both are provided, a validation exception would be thrown.


> Add startFromTimestamp in KafkaTableSource
> ------------------------------------------
>
>                 Key: FLINK-15220
>                 URL: https://issues.apache.org/jira/browse/FLINK-15220
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Paul Lin
>            Assignee: Paul Lin
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> KafkaTableSource supports all startup modes in DataStream API except `startFromTimestamp`, but `startFromTimestamp` is a common and valid use case in Table/SQL API as well.
>  
> The proposed changes are as follow:
> h3. Table Descriptor
> A new method should be added to Kafka table descriptor:
> ```
> new Kafka().startFromTimestamp(long millisFromEpoch)
> ```
> And the parameter would be milliseconds from epoch to stay aligned with FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).
> Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be deprecated, we can assume users are using Kafka that supports timestamp by default, and throws exceptions if users try to use timestamp startup mode with deprecated Kafka versions during the property validation phase. 
> h3. YAML & DDL
> YAML and DDL use string-based properties to describe tables, and the proposed keys are as follow:
> ```
> 'connector.startup-mode' = 'timestamp',
> 'connector.startup-timestamp-millis' = '1576145410000',
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)