You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by kant kodali <ka...@gmail.com> on 2020/03/03 21:10:32 UTC

How to print the aggregated state everytime it is updated?

Hi All,

I have a custom aggregated state that is represent by Set<Long> and I have
a stream of values coming in from Kafka where I inspect, compute the custom
aggregation and store it in Set<Long>. Now, I am trying to figureout how do
I print the updated value everytime this state is updated?

Imagine I have a Datastream<Set<Long>>

I tried few things already but keep running into the following exception.
Not sure why? Do I need to call assignTimestampsAndWatermark? I thought
watermarks are not mandatory in Flink especially when I want to keep this
aggregated state forever. any simple code sample on how to print the
streaming aggregated state represented by Datastream<Set<Long>> will be
great! You can imagine my Set<Long> has a toString() method that takes
cares of printing..and I just want to see those values in stdout.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
(= no timestamp marker). Is the time characteristic set to
'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?

Re: How to print the aggregated state everytime it is updated?

Posted by Arvid Heise <ar...@ververica.com>.
Hi Kant,

if you only want to output every second, you probably want to use a
ProcessFunction with timers [1].

Basically, this function holds the states and manages the updates to it.
The updates should also be stored in a local/non-state variable *changes*.
Whenever the timer triggers, you would output *changes *(possibly to a side
output) and reset it.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers

On Fri, Mar 6, 2020 at 4:39 PM Robert Metzger <rm...@apache.org> wrote:

> Hey,
>
> I don't think you need to use a window operator for this use case. A
> reduce (or fold) operation should be enough:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
>
>
> On Fri, Mar 6, 2020 at 11:50 AM kant kodali <ka...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for this. so how can I emulate an infinite window while outputting
>> every second? simply put, I want to store the state forever (say years) and
>> since rocksdb is my state backend I am assuming I can state the state until
>> I run out of disk. However I want to see all the updates to the states
>> every second. sounds to me I need to have a window of one second, compute
>> for that window and pass it on to next window or is there some other way?
>>
>> Thanks
>>
>> On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu <qc...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> From the description, you use window operator, and set to event time.
>>> then you should call `DataStream.assignTimestampsAndWatermarks` to set
>>> the timestamp and watermark.
>>> Window is triggered when the watermark exceed the window end time
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> kant kodali <ka...@gmail.com> 于2020年3月4日周三 上午5:11写道:
>>>
>>>> Hi All,
>>>>
>>>> I have a custom aggregated state that is represent by Set<Long> and I
>>>> have a stream of values coming in from Kafka where I inspect, compute the
>>>> custom aggregation and store it in Set<Long>. Now, I am trying to figureout
>>>> how do I print the updated value everytime this state is updated?
>>>>
>>>> Imagine I have a Datastream<Set<Long>>
>>>>
>>>> I tried few things already but keep running into the following
>>>> exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I
>>>> thought watermarks are not mandatory in Flink especially when I want to
>>>> keep this aggregated state forever. any simple code sample on how to print
>>>> the streaming aggregated state represented by Datastream<Set<Long>> will be
>>>> great! You can imagine my Set<Long> has a toString() method that takes
>>>> cares of printing..and I just want to see those values in stdout.
>>>>
>>>> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
>>>> timestamp (= no timestamp marker). Is the time characteristic set to
>>>> 'ProcessingTime', or did you forget to call
>>>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>>>
>>>

Re: How to print the aggregated state everytime it is updated?

Posted by Robert Metzger <rm...@apache.org>.
Hey,

I don't think you need to use a window operator for this use case. A reduce
(or fold) operation should be enough:
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/


On Fri, Mar 6, 2020 at 11:50 AM kant kodali <ka...@gmail.com> wrote:

> Hi,
>
> Thanks for this. so how can I emulate an infinite window while outputting
> every second? simply put, I want to store the state forever (say years) and
> since rocksdb is my state backend I am assuming I can state the state until
> I run out of disk. However I want to see all the updates to the states
> every second. sounds to me I need to have a window of one second, compute
> for that window and pass it on to next window or is there some other way?
>
> Thanks
>
> On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu <qc...@gmail.com>
> wrote:
>
>> Hi
>>
>> From the description, you use window operator, and set to event time.
>> then you should call `DataStream.assignTimestampsAndWatermarks` to set
>> the timestamp and watermark.
>> Window is triggered when the watermark exceed the window end time
>>
>> Best,
>> Congxian
>>
>>
>> kant kodali <ka...@gmail.com> 于2020年3月4日周三 上午5:11写道:
>>
>>> Hi All,
>>>
>>> I have a custom aggregated state that is represent by Set<Long> and I
>>> have a stream of values coming in from Kafka where I inspect, compute the
>>> custom aggregation and store it in Set<Long>. Now, I am trying to figureout
>>> how do I print the updated value everytime this state is updated?
>>>
>>> Imagine I have a Datastream<Set<Long>>
>>>
>>> I tried few things already but keep running into the following
>>> exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I
>>> thought watermarks are not mandatory in Flink especially when I want to
>>> keep this aggregated state forever. any simple code sample on how to print
>>> the streaming aggregated state represented by Datastream<Set<Long>> will be
>>> great! You can imagine my Set<Long> has a toString() method that takes
>>> cares of printing..and I just want to see those values in stdout.
>>>
>>> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
>>> timestamp (= no timestamp marker). Is the time characteristic set to
>>> 'ProcessingTime', or did you forget to call
>>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>>
>>

Re: How to print the aggregated state everytime it is updated?

Posted by kant kodali <ka...@gmail.com>.
Hi,

Thanks for this. so how can I emulate an infinite window while outputting
every second? simply put, I want to store the state forever (say years) and
since rocksdb is my state backend I am assuming I can state the state until
I run out of disk. However I want to see all the updates to the states
every second. sounds to me I need to have a window of one second, compute
for that window and pass it on to next window or is there some other way?

Thanks

On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu <qc...@gmail.com> wrote:

> Hi
>
> From the description, you use window operator, and set to event time. then
> you should call `DataStream.assignTimestampsAndWatermarks` to set the
> timestamp and watermark.
> Window is triggered when the watermark exceed the window end time
>
> Best,
> Congxian
>
>
> kant kodali <ka...@gmail.com> 于2020年3月4日周三 上午5:11写道:
>
>> Hi All,
>>
>> I have a custom aggregated state that is represent by Set<Long> and I
>> have a stream of values coming in from Kafka where I inspect, compute the
>> custom aggregation and store it in Set<Long>. Now, I am trying to figureout
>> how do I print the updated value everytime this state is updated?
>>
>> Imagine I have a Datastream<Set<Long>>
>>
>> I tried few things already but keep running into the following exception.
>> Not sure why? Do I need to call assignTimestampsAndWatermark? I thought
>> watermarks are not mandatory in Flink especially when I want to keep this
>> aggregated state forever. any simple code sample on how to print the
>> streaming aggregated state represented by Datastream<Set<Long>> will be
>> great! You can imagine my Set<Long> has a toString() method that takes
>> cares of printing..and I just want to see those values in stdout.
>>
>> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
>> timestamp (= no timestamp marker). Is the time characteristic set to
>> 'ProcessingTime', or did you forget to call
>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>
>

Re: How to print the aggregated state everytime it is updated?

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

From the description, you use window operator, and set to event time. then
you should call `DataStream.assignTimestampsAndWatermarks` to set the
timestamp and watermark.
Window is triggered when the watermark exceed the window end time

Best,
Congxian


kant kodali <ka...@gmail.com> 于2020年3月4日周三 上午5:11写道:

> Hi All,
>
> I have a custom aggregated state that is represent by Set<Long> and I have
> a stream of values coming in from Kafka where I inspect, compute the custom
> aggregation and store it in Set<Long>. Now, I am trying to figureout how do
> I print the updated value everytime this state is updated?
>
> Imagine I have a Datastream<Set<Long>>
>
> I tried few things already but keep running into the following exception.
> Not sure why? Do I need to call assignTimestampsAndWatermark? I thought
> watermarks are not mandatory in Flink especially when I want to keep this
> aggregated state forever. any simple code sample on how to print the
> streaming aggregated state represented by Datastream<Set<Long>> will be
> great! You can imagine my Set<Long> has a toString() method that takes
> cares of printing..and I just want to see those values in stdout.
>
> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
> (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>