You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2018/08/01 12:21:48 UTC

Re: Access to Kafka Event Time

Any feedbaxk?

On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi <vi...@gmail.com>
wrote:

> In fact it may be available else where too ( for example ProcessFunction
> etc ) but do we have no need to create one, it is just a data relay ( kafka
> to hdfs ) and any intermediate processing should be avoided if possible
> IMHO.
>
> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> We have a use case where multiple topics are streamed to hdfsand we would
>> want to created buckets based on ingestion time ( the time the event were
>> pushed to kafka ). Our producers to kafka will set that the event time
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>>
>> suggests that the the "previousElementTimeStamp" will provide that
>> timestamp provided "EventTime" characteristic is set. It also provides for
>> the element. In out case the element will expose setIngestionTIme(long
>> time) method. Is the element in this method
>>
>> public long extractTimestamp(Long element, long previousElementTimestamp)
>>
>>  passed by reference and can it be safely ( loss lessly ) mutated for
>> downstream operators ?
>>
>>
>> That said there is another place where that record time stamp is
>> available.
>>
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141
>>
>> Is it possible to change the signature of the
>>
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46
>>
>> to add record timestamp as the last argument ?
>>
>> Regards,
>>
>> Vishal
>>
>>
>>
>>
>>
>>
>>
>

Re: Access to Kafka Event Time

Posted by Vishal Santoshi <vi...@gmail.com>.
Thanks a lot! Awesome that 1.6 will have the ts of the element....

On Tue, Aug 7, 2018, 4:19 AM Aljoscha Krettek <al...@apache.org> wrote:

> Hi Vishal,
>
> to answer the original question: it should not assumed that mutations of
> the element will be reflected downstream. For your situation this means
> that you have to use a ProcessingFunction to put the timestamp of a record
> into the record itself.
>
> Also, Flink 1.6 will come with the next version of the BucketingSink
> called StreamingFileSink, where the Bucketer interface was updated to allow
> access to the element timestamp. The new interface is now called
> BucketAssigner.
>
> Best,
> Aljoscha
>
> On 1. Aug 2018, at 16:36, Hequn Cheng <ch...@gmail.com> wrote:
>
> Hi Vishal,
>
> > We have a use case where multiple topics are streamed to hdfs and we
> would want to created buckets based on ingestion time
> If I understand correctly, you want to create buckets based on event time.
> Maybe you can use window[1]. For example, a tumbling window of 5 minutes
> groups rows in 5 minutes intervals. And you can get window start
> time(TUMBLE_START(time_attr, interval)) and end time(TUMBLE_END(time_attr,
> interval)) when output data.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows
>
> On Wed, Aug 1, 2018 at 8:21 PM, Vishal Santoshi <vishal.santoshi@gmail.com
> > wrote:
>
>> Any feedbaxk?
>>
>> On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi <vi...@gmail.com>
>> wrote:
>>
>>> In fact it may be available else where too ( for example ProcessFunction
>>> etc ) but do we have no need to create one, it is just a data relay ( kafka
>>> to hdfs ) and any intermediate processing should be avoided if possible
>>> IMHO.
>>>
>>> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> We have a use case where multiple topics are streamed to hdfsand we
>>>> would want to created buckets based on ingestion time ( the time the event
>>>> were pushed to kafka ). Our producers to kafka will set that the event time
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>>>>
>>>> suggests that the the "previousElementTimeStamp" will provide that
>>>> timestamp provided "EventTime" characteristic is set. It also provides for
>>>> the element. In out case the element will expose setIngestionTIme(long
>>>> time) method. Is the element in this method
>>>>
>>>> public long extractTimestamp(Long element, long previousElementTimestamp)
>>>>
>>>>  passed by reference and can it be safely ( loss lessly ) mutated for
>>>> downstream operators ?
>>>>
>>>>
>>>> That said there is another place where that record time stamp is
>>>> available.
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141
>>>>
>>>> Is it possible to change the signature of the
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46
>>>>
>>>> to add record timestamp as the last argument ?
>>>>
>>>> Regards,
>>>>
>>>> Vishal
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>
>

Re: Access to Kafka Event Time

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Vishal,

to answer the original question: it should not assumed that mutations of the element will be reflected downstream. For your situation this means that you have to use a ProcessingFunction to put the timestamp of a record into the record itself.

Also, Flink 1.6 will come with the next version of the BucketingSink called StreamingFileSink, where the Bucketer interface was updated to allow access to the element timestamp. The new interface is now called BucketAssigner.

Best,
Aljoscha

> On 1. Aug 2018, at 16:36, Hequn Cheng <ch...@gmail.com> wrote:
> 
> Hi Vishal,
> 
> > We have a use case where multiple topics are streamed to hdfs and we would want to created buckets based on ingestion time 
> If I understand correctly, you want to create buckets based on event time. Maybe you can use window[1]. For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. And you can get window start time(TUMBLE_START(time_attr, interval)) and end time(TUMBLE_END(time_attr, interval)) when output data.
> 
> Best, Hequn
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows>
> 
> On Wed, Aug 1, 2018 at 8:21 PM, Vishal Santoshi <vishal.santoshi@gmail.com <ma...@gmail.com>> wrote:
> Any feedbaxk?
> 
> On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi <vishal.santoshi@gmail.com <ma...@gmail.com>> wrote:
> In fact it may be available else where too ( for example ProcessFunction etc ) but do we have no need to create one, it is just a data relay ( kafka to hdfs ) and any intermediate processing should be avoided if possible IMHO.
> 
> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi <vishal.santoshi@gmail.com <ma...@gmail.com>> wrote:
> We have a use case where multiple topics are streamed to hdfsand we would want to created buckets based on ingestion time ( the time the event were pushed to kafka ). Our producers to kafka will set that the event time
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 <https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>
> 
> suggests that the the "previousElementTimeStamp" will provide that timestamp provided "EventTime" characteristic is set. It also provides for the element. In out case the element will expose setIngestionTIme(long time) method. Is the element in this method
> public long extractTimestamp(Long element, long previousElementTimestamp)
>  passed by reference and can it be safely ( loss lessly ) mutated for downstream operators ?
> 
> 
> That said there is another place where that record time stamp is available.
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141 <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141>
> 
> Is it possible to change the signature of the 
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46 <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46>
> 
> to add record timestamp as the last argument ? 
> 
> Regards, 
> 
> Vishal
> 
> 
> 
> 
> 
> 
> 
> 


Re: Access to Kafka Event Time

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Vishal,

> We have a use case where multiple topics are streamed to hdfs and we
would want to created buckets based on ingestion time
If I understand correctly, you want to create buckets based on event time.
Maybe you can use window[1]. For example, a tumbling window of 5 minutes
groups rows in 5 minutes intervals. And you can get window start
time(TUMBLE_START(time_attr, interval)) and end time(TUMBLE_END(time_attr,
interval)) when output data.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows

On Wed, Aug 1, 2018 at 8:21 PM, Vishal Santoshi <vi...@gmail.com>
wrote:

> Any feedbaxk?
>
> On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> In fact it may be available else where too ( for example ProcessFunction
>> etc ) but do we have no need to create one, it is just a data relay ( kafka
>> to hdfs ) and any intermediate processing should be avoided if possible
>> IMHO.
>>
>> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> We have a use case where multiple topics are streamed to hdfsand we
>>> would want to created buckets based on ingestion time ( the time the event
>>> were pushed to kafka ). Our producers to kafka will set that the event time
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.5/dev/connectors/kafka.html#using-kafka-
>>> timestamps-and-flink-event-time-in-kafka-010
>>>
>>> suggests that the the "previousElementTimeStamp" will provide that
>>> timestamp provided "EventTime" characteristic is set. It also provides for
>>> the element. In out case the element will expose setIngestionTIme(long
>>> time) method. Is the element in this method
>>>
>>> public long extractTimestamp(Long element, long previousElementTimestamp)
>>>
>>>  passed by reference and can it be safely ( loss lessly ) mutated for
>>> downstream operators ?
>>>
>>>
>>> That said there is another place where that record time stamp is
>>> available.
>>>
>>> https://github.com/apache/flink/blob/master/flink-
>>> connectors/flink-connector-kafka-0.9/src/main/java/org/
>>> apache/flink/streaming/connectors/kafka/internal/
>>> Kafka09Fetcher.java#L141
>>>
>>> Is it possible to change the signature of the
>>>
>>> https://github.com/apache/flink/blob/master/flink-
>>> connectors/flink-connector-kafka-base/src/main/java/org/
>>> apache/flink/streaming/util/serialization/KeyedDeserializationSchema.
>>> java#L46
>>>
>>> to add record timestamp as the last argument ?
>>>
>>> Regards,
>>>
>>> Vishal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>