You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by USERNAME <or...@126.com> on 2020/01/21 09:21:33 UTC
CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?
大家,新年快乐~
[1] TriggerResult.FIRE_AND_PURGE
https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
[2] CountEvictor
https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
Re: Re: CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?
Posted by tison <wa...@gmail.com>.
你读一下 EvictingWindowOperator 相关代码或者说 Evictor#evictBefore 的调用链,里面关于 window
state 的处理是比较 hack 的,用文字说也起不到简练的作用
private void emitWindowContents(W window, Iterable<StreamRecord<IN>>
contents, ListState<StreamRecord<IN>> windowState) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
// Work around type system restrictions...
FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable
.from(contents)
.transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
@Override
public TimestampedValue<IN> apply(StreamRecord<IN> input) {
return TimestampedValue.from(input);
}
});
evictorContext.evictBefore(recordsWithTimestamp,
Iterables.size(recordsWithTimestamp));
FluentIterable<IN> projectedContents = recordsWithTimestamp
.transform(new Function<TimestampedValue<IN>, IN>() {
@Override
public IN apply(TimestampedValue<IN> input) {
return input.getValue();
}
});
processContext.window = triggerContext.window;
userFunction.process(triggerContext.key, triggerContext.window,
processContext, projectedContents, timestampedCollector);
evictorContext.evictAfter(recordsWithTimestamp,
Iterables.size(recordsWithTimestamp));
//work around to fix FLINK-4369, remove the evicted elements from
the windowState.
//this is inefficient, but there is no other way to remove elements
from ListState, which is an AppendingState.
windowState.clear();
for (TimestampedValue<IN> record : recordsWithTimestamp) {
windowState.add(record.getStreamRecord());
}
}
Best,
tison.
USERNAME <or...@126.com> 于2020年1月21日周二 下午8:25写道:
> evict 丢弃掉的数据,在内存或者RocksDB中也会同步删除吗?
>
>
>
>
>
>
> 在 2020-01-21 17:27:38,"tison" <wa...@gmail.com> 写道:
> >正好看到这一部分,还是有的,你考虑下滑动的计数窗口
> >
> >[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState
> >
> >Best,
> >tison.
> >
> >
> >USERNAME <or...@126.com> 于2020年1月21日周二 下午5:21写道:
> >
> >> 大家,新年快乐~
> >>
> >>
> >> [1] TriggerResult.FIRE_AND_PURGE
> >>
> >>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
> >> [2] CountEvictor
> >>
> >>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
> >>
> >>
>
Re:Re: CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?
Posted by USERNAME <or...@126.com>.
evict 丢弃掉的数据,在内存或者RocksDB中也会同步删除吗?
在 2020-01-21 17:27:38,"tison" <wa...@gmail.com> 写道:
>正好看到这一部分,还是有的,你考虑下滑动的计数窗口
>
>[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState
>
>Best,
>tison.
>
>
>USERNAME <or...@126.com> 于2020年1月21日周二 下午5:21写道:
>
>> 大家,新年快乐~
>>
>>
>> [1] TriggerResult.FIRE_AND_PURGE
>>
>> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
>> [2] CountEvictor
>>
>> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>>
>>
Re: CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?
Posted by tison <wa...@gmail.com>.
正好看到这一部分,还是有的,你考虑下滑动的计数窗口
[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState
Best,
tison.
USERNAME <or...@126.com> 于2020年1月21日周二 下午5:21写道:
> 大家,新年快乐~
>
>
> [1] TriggerResult.FIRE_AND_PURGE
>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
> [2] CountEvictor
>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>
>