You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2019/07/29 12:58:44 UTC

Re: Why is the size of each checkpoint increasing?

I think the size of the checkpoint strongly depends on the data you are
feeding into this function. Depending on the actual values, it might be
that you never fire the window. Please verify what carData actually returns.

Cheers,
Till

On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <ch...@gmail.com> wrote:

>
> Flink version is 1.81
> The eaxmple is adapted according to TopSpeedWindowing
>
> DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
>         .assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
>         .keyBy(0)
>         .countWindow(countSize, slideSize)
>         .trigger(DeltaTrigger.of(triggerMeters,
>                 new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
>                     private static final long serialVersionUID = 1L;
>
>                     @Override
>                     public double getDelta(
>                             Tuple4<Integer, Integer, Double, Long> oldDataPoint,
>                             Tuple4<Integer, Integer, Double, Long> newDataPoint) {
>                         return newDataPoint.f2 - oldDataPoint.f2;
>                     }
>                 }, carData.getType().createSerializer(env.getConfig())))
>         .maxBy(1).setParallelism(parallelism);
>
>
> The size of each checkpoint will increase from 100k to 100m.
>
> Why is the size of each checkpoint increasing?
>
> In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal
>
> @Override
> public void clear(W window, TriggerContext ctx) throws Exception {
>    ctx.getPartitionedState(stateDesc).clear();
> }
>
>
>
> Has anyone encountered a similar problem?
>
>
>
>
>
> Darling
> Andrew D.Lin
>
>
>
>

Re: Why is the size of each checkpoint increasing?

Posted by Congxian Qiu <qc...@gmail.com>.
Hi  Andrew
From Flink doc[1], there is "Flink guarantees removal only for time-based
windows and not for other types, *e.g.* global windows (see Window Assigners
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#window-assigners>).
",
Seems the state of the fired window wouldn't be cleared for a count window,
you can verify this to see each result of your trigger is what value of
`TriggerResult`.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#window-lifecycle
Best,
Congxian


陈Darling <ch...@gmail.com> 于2019年7月30日周二 下午7:20写道:

> Thanks Rohrmann. Your answer inspired me.
>
> CountWindow  defaults to using CountTrigger, but I set the trigger again.
>
> Parallelism is 1
>
> .trigger(DeltaTrigger.of(50,deltaFunction,stateSerializer)
>
>
> Through testing,I found that the data is generated much faster than tigger
> and countSize,slideSize is 300000,
> DeltaTrigger threhold is 50.
> The size  of CountWindow is bigger than trigger size.
>
> will it be caused by this reason?
>
>
>
> Darling
> Andrew D.Lin
>
>
>
> 下面是被转发的邮件:
>
> *发件人: *Till Rohrmann <tr...@apache.org>
> *主题: **回复: Why is the size of each checkpoint increasing?*
> *日期: *2019年7月29日 GMT+8 下午8:58:44
> *收件人: *陈Darling <ch...@gmail.com>
> *抄送: *user@flink.apache.org
>
> I think the size of the checkpoint strongly depends on the data you are
> feeding into this function. Depending on the actual values, it might be
> that you never fire the window. Please verify what carData actually returns.
>
> Cheers,
> Till
>
> On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <ch...@gmail.com>
> wrote:
>
>>
>> Flink version is 1.81
>> The eaxmple is adapted according to TopSpeedWindowing
>>
>> DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
>>         .assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
>>         .keyBy(0)
>>         .countWindow(countSize, slideSize)
>>         .trigger(DeltaTrigger.of(triggerMeters,
>>                 new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
>>                     private static final long serialVersionUID = 1L;
>>
>>                     @Override
>>                     public double getDelta(
>>                             Tuple4<Integer, Integer, Double, Long> oldDataPoint,
>>                             Tuple4<Integer, Integer, Double, Long> newDataPoint) {
>>                         return newDataPoint.f2 - oldDataPoint.f2;
>>                     }
>>                 }, carData.getType().createSerializer(env.getConfig())))
>>         .maxBy(1).setParallelism(parallelism);
>>
>>
>> The size of each checkpoint will increase from 100k to 100m.
>>
>> Why is the size of each checkpoint increasing?
>>
>> In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal
>>
>> @Override
>> public void clear(W window, TriggerContext ctx) throws Exception {
>>    ctx.getPartitionedState(stateDesc).clear();
>> }
>>
>>
>>
>> Has anyone encountered a similar problem?
>>
>>
>>
>>
>>
>> Darling
>> Andrew D.Lin
>>
>>
>>
>>
>

Fwd: Why is the size of each checkpoint increasing?

Posted by 陈Darling <ch...@gmail.com>.
Thanks Rohrmann. Your answer inspired me.

CountWindow  defaults to using CountTrigger, but I set the trigger again.  

Parallelism is 1

.trigger(DeltaTrigger.of(50,deltaFunction,stateSerializer)


Through testing,I found that the data is generated much faster than tigger and countSize,slideSize is 300000,
DeltaTrigger threhold is 50. 
The size  of CountWindow is bigger than trigger size.

will it be caused by this reason?



Darling 
Andrew D.Lin



> 下面是被转发的邮件:
> 
> 发件人: Till Rohrmann <tr...@apache.org>
> 主题: 回复: Why is the size of each checkpoint increasing?
> 日期: 2019年7月29日 GMT+8 下午8:58:44
> 收件人: 陈Darling <ch...@gmail.com>
> 抄送: user@flink.apache.org
> 
> I think the size of the checkpoint strongly depends on the data you are feeding into this function. Depending on the actual values, it might be that you never fire the window. Please verify what carData actually returns.
> 
> Cheers,
> Till
> 
> On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <chendonglin521@gmail.com <ma...@gmail.com>> wrote:
> 
> Flink version is 1.81
> The eaxmple is adapted according to TopSpeedWindowing
> DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
>         .assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
>         .keyBy(0)
>         .countWindow(countSize, slideSize)
>         .trigger(DeltaTrigger.of(triggerMeters,
>                 new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
>                     private static final long serialVersionUID = 1L;
> 
>                     @Override
>                     public double getDelta(
>                             Tuple4<Integer, Integer, Double, Long> oldDataPoint,
>                             Tuple4<Integer, Integer, Double, Long> newDataPoint) {
>                         return newDataPoint.f2 - oldDataPoint.f2;
>                     }
>                 }, carData.getType().createSerializer(env.getConfig())))
>         .maxBy(1).setParallelism(parallelism);
> 
> The size of each checkpoint will increase from 100k to 100m.
> 
> Why is the size of each checkpoint increasing? 
> In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal
> @Override
> public void clear(W window, TriggerContext ctx) throws Exception {
>    ctx.getPartitionedState(stateDesc).clear();
> }
> 
> 
> Has anyone encountered a similar problem?
> 
> 
> 
> 
> 
> Darling 
> Andrew D.Lin
> 
> 
> 


Fwd: Why is the size of each checkpoint increasing?

Posted by 陈Darling <ch...@gmail.com>.
Thanks Rohrmann. Your answer inspired me.

CountWindow  defaults to using CountTrigger, but I set the trigger again.  

Parallelism is 1

.trigger(DeltaTrigger.of(50,deltaFunction,stateSerializer)


Through testing,I found that the data is generated much faster than tigger and countSize,slideSize is 300000,
DeltaTrigger threhold is 50. 
The size  of CountWindow is bigger than trigger size.

will it be caused by this reason?



Darling 
Andrew D.Lin



> 下面是被转发的邮件:
> 
> 发件人: Till Rohrmann <trohrmann@apache.org <ma...@apache.org>>
> 主题: 回复: Why is the size of each checkpoint increasing?
> 日期: 2019年7月29日 GMT+8 下午8:58:44
> 收件人: 陈Darling <chendonglin521@gmail.com <ma...@gmail.com>>
> 抄送: user@flink.apache.org <ma...@flink.apache.org>
> 
> I think the size of the checkpoint strongly depends on the data you are feeding into this function. Depending on the actual values, it might be that you never fire the window. Please verify what carData actually returns.
> 
> Cheers,
> Till
> 
> On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <chendonglin521@gmail.com <ma...@gmail.com>> wrote:
> 
> Flink version is 1.81
> The eaxmple is adapted according to TopSpeedWindowing
> DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
>         .assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
>         .keyBy(0)
>         .countWindow(countSize, slideSize)
>         .trigger(DeltaTrigger.of(triggerMeters,
>                 new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
>                     private static final long serialVersionUID = 1L;
> 
>                     @Override
>                     public double getDelta(
>                             Tuple4<Integer, Integer, Double, Long> oldDataPoint,
>                             Tuple4<Integer, Integer, Double, Long> newDataPoint) {
>                         return newDataPoint.f2 - oldDataPoint.f2;
>                     }
>                 }, carData.getType().createSerializer(env.getConfig())))
>         .maxBy(1).setParallelism(parallelism);
> 
> The size of each checkpoint will increase from 100k to 100m.
> 
> Why is the size of each checkpoint increasing? 
> In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal
> @Override
> public void clear(W window, TriggerContext ctx) throws Exception {
>    ctx.getPartitionedState(stateDesc).clear();
> }
> 
> 
> Has anyone encountered a similar problem?
> 
> 
> 
> 
> 
> Darling 
> Andrew D.Lin
> 
> 
>