You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Navneeth Krishnan <re...@gmail.com> on 2018/03/30 04:23:57 UTC

Record timestamp from kafka

Hi,

Is there way to get the kafka timestamp in deserialization schema? All
records are written to kafka with timestamp and I would like to set that
timestamp to every record that is ingested. Thanks.

Re: Record timestamp from kafka

Posted by Ben Yan <ya...@gmail.com>.

> On Apr 10, 2018, at 7:32 PM, Ben Yan <ya...@gmail.com> wrote:
> 
> Hi Chesnay:
> 
>         I think it would be better without such a limitation.I want to consult another problem. When I use BucketingSink(I use aws s3), the filename of a few files after checkpoint still hasn't changed, resulting in the underline prefix of the final generation of a small number of files. After analysis, it is found that it is due to the eventually consistent  of S3.Are there any better solutions available?thanks
> See : https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22 <https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22>  
> Best
> Ben
> 
>> On Apr 10, 2018, at 6:29 PM, Ben Yan <yan.xiao.bin.mail@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Fabian.
>> 
>> 	If I use ProcessFunction , I can get it! But I want to know  that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language.
>> Thanks!
>> 
>> Best
>> Ben
>> 
>>> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Navneeth,
>>> 
>>> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1].
>>> Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object.
>>> 
>>> Best, Fabian
>>> 
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction>
>>> 
>>> 2018-03-30 7:45 GMT+02:00 Ben Yan <yan.xiao.bin.mail@gmail.com <ma...@gmail.com>>:
>>> hi,
>>> Is that what you mean?
>>> See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145> 
>>> 
>>> Best
>>> Ben
>>> 
>>>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <reachnavneeth2@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.
>>> 
>>> 
>> 
> 


Re: Record timestamp from kafka

Posted by Ben Yan <ya...@gmail.com>.
Hi Fabian:

        I think it would be better without such a limitation.I want to consult another problem. When I use BucketingSink(I use aws s3), the filename of a few files after checkpoint still hasn't changed, resulting in the underline prefix of the final generation of a small number of files. After analysis, it is found that it is due to the eventually consistent  of S3.Are there any better solutions available?thanks

Best
Ben

	https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22 <https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22> 

> On Apr 10, 2018, at 6:29 PM, Ben Yan <ya...@gmail.com> wrote:
> 
> Hi Fabian.
> 
> 	If I use ProcessFunction , I can get it! But I want to know  that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language.
> Thanks!
> 
> Best
> Ben
> 
>> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Navneeth,
>> 
>> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1].
>> Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object.
>> 
>> Best, Fabian
>> 
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction>
>> 
>> 2018-03-30 7:45 GMT+02:00 Ben Yan <yan.xiao.bin.mail@gmail.com <ma...@gmail.com>>:
>> hi,
>> Is that what you mean?
>> See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145> 
>> 
>> Best
>> Ben
>> 
>>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <reachnavneeth2@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.
>> 
>> 
> 


Re: Record timestamp from kafka

Posted by Chesnay Schepler <ch...@apache.org>.
You must use a ProcessFunction for this, the timestamps are not exposed 
in any way to map/flatmap functions.

On 10.04.2018 12:29, Ben Yan wrote:
> Hi Fabian.
>
> If I use ProcessFunction , I can get it! But I want to know  that how 
> to get Kafka timestamp in like flatmap and map methods of datastream 
> using scala programming language.
> Thanks!
>
> Best
> Ben
>
>> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <fhueske@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>> Hi Navneeth,
>>
>> Flink's KafkaConsumer automatically attaches Kafka's ingestion 
>> timestamp if you configure EventTime for an application [1].
>> Since Flink treats record timestamps as meta data, they are not 
>> directly accessible by most functions. You can implement a 
>> ProcessFunction [2] to access the timestamp of a record via the 
>> ProcessFunction's Context object.
>>
>> Best, Fabian
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
>>
>> 2018-03-30 7:45 GMT+02:00 Ben Yan <yan.xiao.bin.mail@gmail.com 
>> <ma...@gmail.com>>:
>>
>>     hi,
>>     Is that what you mean?
>>     See :
>>     https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145
>>     <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145>
>>
>>
>>     Best
>>     Ben
>>
>>>     On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan
>>>     <reachnavneeth2@gmail.com <ma...@gmail.com>> wrote:
>>>
>>>     Hi,
>>>
>>>     Is there way to get the kafka timestamp in deserialization
>>>     schema? All records are written to kafka with timestamp and I
>>>     would like to set that timestamp to every record that is
>>>     ingested. Thanks.
>>
>>
>


Re: Record timestamp from kafka

Posted by Ben Yan <ya...@gmail.com>.
Hi Fabian.

	If I use ProcessFunction , I can get it! But I want to know  that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language.
Thanks!

Best
Ben

> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Navneeth,
> 
> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1].
> Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object.
> 
> Best, Fabian
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction>
> 
> 2018-03-30 7:45 GMT+02:00 Ben Yan <yan.xiao.bin.mail@gmail.com <ma...@gmail.com>>:
> hi,
> Is that what you mean?
> See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145> 
> 
> Best
> Ben
> 
>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <reachnavneeth2@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.
> 
> 


Re: Record timestamp from kafka

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Navneeth,

Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if
you configure EventTime for an application [1].
Since Flink treats record timestamps as meta data, they are not directly
accessible by most functions. You can implement a ProcessFunction [2] to
access the timestamp of a record via the ProcessFunction's Context object.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction

2018-03-30 7:45 GMT+02:00 Ben Yan <ya...@gmail.com>:

> hi,
> Is that what you mean?
> See : https://issues.apache.org/jira/browse/FLINK-8500?page=
> com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&
> focusedCommentId=16377145#comment-16377145
> <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145>
>
>
> Best
> Ben
>
> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <re...@gmail.com>
> wrote:
>
> Hi,
>
> Is there way to get the kafka timestamp in deserialization schema? All
> records are written to kafka with timestamp and I would like to set that
> timestamp to every record that is ingested. Thanks.
>
>
>

Re: Record timestamp from kafka

Posted by Ben Yan <ya...@gmail.com>.
hi,
Is that what you mean?
See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145> 

Best
Ben

> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <re...@gmail.com> wrote:
> 
> Hi,
> 
> Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.