You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (JIRA)" <ji...@apache.org> on 2018/05/02 11:05:00 UTC

[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

    [ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460871#comment-16460871 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8500 at 5/2/18 11:04 AM:
---------------------------------------------------------------------

+1 to the general idea; simplifying the schema methods and unifying the keyed / non-keyed versions seems very appealing.

However, I suggest not to expose Kafka's {{ConsumerRecord}} directly, otherwise our API compatibility will be dependent on Kafka changing this class.
 This might be ok, since users should expect that API changes across major Kafka connector versions anyways.
 This would also entail that we might eventually have version-specific serialization schema classes, instead of a base one.

Another approach, if we want to avoid unexpected API breaks due to Kafka, is to have our own `ConsumerRecordMetaInfo` class that we pass via the serialization schema.
 Downside would be that we have to make sure to expose everything in Kafka's `ConsumerRecord` to our own class.
 It's a bit more manual, but should be manageable in a non-API breaking way (whenever we have to add more information to that class).

I more leaning towards the second approach.


was (Author: tzulitai):
+1 to the general idea; simplifying the schema methods and unifying the keyed / non-keyed versions seems very appealing.

However, I suggest not to expose Kafka's {{ConsumerRecord}} directly, otherwise our API compatibility will be dependent on Kafka changing this class.
This might be ok, since users should expect that API changes across major Kafka connector versions anyways.
This would also entail that we might eventually have version-specific serialization schema classes, instead of a base one.

Another approach, if we want to avoid this, is to have our own `ConsumerRecordMetaInfo` class that we pass via the serialization schema.
Downside would be that we have to make sure to expose everything in Kafka's `ConsumerRecord` to our own class.
It's a bit more manual, but should be manageable in a non-API breaking way (whenever we have to add more information to that class).

I more leaning towards the second approach.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>             Fix For: 1.6.0
>
>         Attachments: image-2018-01-30-14-58-58-167.png, image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)