You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yidan zhao <hi...@gmail.com> on 2023/04/07 03:19:40 UTC

WindowAssigner中windowStagger作用

如题,目前看实现,这个 windowStagger 是针对 opeartor 的众多 subtask 之间,针对每个 subtask
生成了一个固定的 offset 作用于该 subtask 处理的元素。因为 staggerOffset 是在 assignWindows
中生成,而且只有第一次会生成,后续复用。如下:

if (staggerOffset == null) {
    staggerOffset =
            windowStagger.getStaggerOffset(context.getCurrentProcessingTime(),
size);
}
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start =
        TimeWindow.getWindowStartWithOffset(
                timestamp, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));


所以,这个功能的目的是,分散不同 subtask
之间的窗口周期offset嘛?那如果实际生产中,是一个TM只有1个slot,整个作业只有1个window类算子的话。这个windowstagger好像没起到分散cpu压力的作用?还是说他的作用在于分散
window 算子整体输出时机而已,并不在乎单个 subtask 的输出时机对嘛?