You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/24 08:45:41 UTC

[GitHub] [flink] leonardBang commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

leonardBang commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r561827373



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
##########
@@ -20,9 +20,13 @@
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 
 /** Utility to create a {@link StateTtlConfig} object. */
-public class StateTtlConfigUtil {
+public class StateConfigUtil {
+
+    private static final String ROCKSDB_KEYED_STATE_BACKEDN =

Review comment:
       typo
   ```suggestion
       private static final String ROCKSDB_KEYED_STATE_BACKEND =
   ``` 

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.MathUtils;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utilities to create {@link SliceAssigner}s. */
+@Internal
+public final class SliceAssigners {
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Utilities
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /**
+     * Creates a tumbling window {@link SliceAssigner} that assigns elements to slices of tumbling
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on
+     *     processing time.
+     * @param size the size of the generated windows.
+     */
+    public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration size) {
+        return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0);
+    }
+
+    /**
+     * Creates a hopping window {@link SliceAssigner} that assigns elements to slices of hopping
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param size the size of the generated windows.
+     * @param slide the slide interval of the generated windows.
+     */
+    public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration size, Duration slide) {
+        return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), slide.toMillis(), 0);
+    }
+
+    /**
+     * Creates a cumulative window {@link SliceAssigner} that assigns elements to slices of
+     * cumulative windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param maxSize the max size of the generated windows.
+     * @param step the step interval of the generated windows.
+     */
+    public static CumulativeSliceAssigner cumulative(
+            int rowtimeIndex, Duration maxSize, Duration step) {
+        return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), step.toMillis(), 0);
+    }
+
+    /**
+     * Creates a {@link SliceAssigner} that assigns elements which has been attached window start
+     * and window end timestamp to slices. The assigned slice is equal to the given window.
+     *
+     * @param windowEndIndex the index of window end field in the input row, mustn't be a negative
+     *     value.
+     * @param windowSize the size of the generated window.
+     */
+    public static WindowedSliceAssigner windowed(int windowEndIndex, Duration windowSize) {
+        return new WindowedSliceAssigner(windowEndIndex, windowSize.toMillis());
+    }
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Slice Assigners
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /** The {@link SliceAssigner} for tumbling windows. */
+    public static final class TumblingSliceAssigner extends AbstractSliceAssigner
+            implements SliceUnsharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
+        public TumblingSliceAssigner withOffset(Duration offset) {
+            return new TumblingSliceAssigner(rowtimeIndex, size, offset.toMillis());
+        }
+
+        private final long size;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        private TumblingSliceAssigner(int rowtimeIndex, long size, long offset) {
+            super(rowtimeIndex);
+            checkArgument(
+                    size > 0,
+                    String.format(
+                            "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
+                            size));
+            checkArgument(
+                    Math.abs(offset) < size,
+                    String.format(
+                            "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
+                            size, offset));
+            this.size = size;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
+            return start + size;
+        }
+
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            reuseList.reset(windowEnd);
+            return reuseList;
+        }
+    }
+
+    /** The {@link SliceAssigner} for hopping windows. */
+    public static final class HoppingSliceAssigner extends AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link HoppingSliceAssigner} with a new specified offset. */
+        public HoppingSliceAssigner withOffset(Duration offset) {
+            return new HoppingSliceAssigner(rowtimeIndex, size, slide, offset.toMillis());
+        }
+
+        private final long size;
+        private final long slide;
+        private final long offset;
+        private final long sliceSize;
+        private final int numSlicesPerWindow;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        protected HoppingSliceAssigner(int rowtimeIndex, long size, long slide, long offset) {
+            super(rowtimeIndex);
+            if (size <= 0 || slide <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window must satisfy slide > 0 and size > 0, but got slide %dms and size %dms.",
+                                slide, size));
+            }
+            if (size % slide != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window requires size must be an integral multiple of slide, but got size %dms and slide %dms.",
+                                size, slide));
+            }
+            this.size = size;
+            this.slide = slide;
+            this.offset = offset;
+            this.sliceSize = ArithmeticUtils.gcd(size, slide);
+            this.numSlicesPerWindow = MathUtils.checkedDownCast(size / sliceSize);
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, sliceSize);
+            return start + sliceSize;
+        }
+
+        @Override
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            // we need to cleanup the first slice of the window
+            long windowStart = getWindowStart(windowEnd);
+            long firstSliceEnd = windowStart + sliceSize;
+            reuseList.reset(firstSliceEnd);
+            return reuseList;
+        }
+
+        @Override
+        public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
+            // the iterable to list all the slices of the triggered window
+            Iterable<Long> toBeMerged =
+                    new HoppingSlicesIterable(sliceEnd, sliceSize, numSlicesPerWindow);
+            // null namespace means use heap data views, instead of state state views
+            callback.merge(null, toBeMerged);
+        }
+
+        @Override
+        public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
+            if (isWindowEmpty.get()) {
+                return Optional.empty();
+            } else {
+                return Optional.of(windowEnd + sliceSize);
+            }
+        }
+    }
+
+    /** The {@link SliceAssigner} for cumulative windows. */
+    public static final class CumulativeSliceAssigner extends AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link CumulativeSliceAssigner} with a new specified offset. */
+        public CumulativeSliceAssigner withOffset(Duration offset) {
+            return new CumulativeSliceAssigner(rowtimeIndex, maxSize, step, offset.toMillis());
+        }
+
+        private final long maxSize;
+        private final long step;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        protected CumulativeSliceAssigner(int rowtimeIndex, long maxSize, long step, long offset) {
+            super(rowtimeIndex);
+            if (maxSize <= 0 || step <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cumulative Window parameters must satisfy maxSize > 0 and step > 0, but got maxSize %dms and step %dms.",
+                                maxSize, step));
+            }
+            if (maxSize % step != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cumulative Window requires maxSize must be an integral multiple of step, but got maxSize %dms and step %dms.",
+                                maxSize, step));
+            }
+
+            this.maxSize = maxSize;
+            this.step = step;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, step);
+            return start + step;
+        }
+
+        @Override
+        public long getWindowStart(long windowEnd) {
+            return TimeWindow.getWindowStartWithOffset(windowEnd - 1, offset, maxSize);
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            long windowStart = getWindowStart(windowEnd);
+            long firstSliceEnd = windowStart + step;
+            long lastSliceEnd = windowStart + maxSize;
+            if (windowEnd == firstSliceEnd) {
+                // we reuse state in the first slice, skip cleanup for the first slice
+                return Collections.emptyList();

Review comment:
       ```suggestion
                   return reuseList.clear();
   ```
   And I think we can rename the `reuseList` to `expiredSlices` which describe the logic better.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java
##########
@@ -94,7 +94,8 @@ public BytesMap(
             MemoryManager memoryManager,
             long memorySize,
             TypeSerializer<K> keySerializer) {
-        this.memoryPool = new LazyMemorySegmentPool(owner, memoryManager, memorySize);

Review comment:
        * Base class for {@link BytesHashMap} and {@link BytesMultiMap}.
   

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java
##########
@@ -37,37 +34,26 @@
     private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
 
     private final Object owner;
-    private final @Nullable MemoryManager memoryManager;
+    private final MemoryManager memoryManager;
     private final ArrayList<MemorySegment> cachePages;
     private final int maxPages;
     private final int perRequestPages;
 
     private int pageUsage;
 
-    public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, long memorySize) {
-        this(
-                owner,
-                memoryManager,
-                (int) memorySize
-                        / (memoryManager == null
-                                ? MemoryManager.DEFAULT_PAGE_SIZE
-                                : memoryManager.getPageSize()));
-    }
-
     public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, int maxPages) {

Review comment:
       There code path may come in with nullable `MemoryManager `,  see `BufferDataOverWindowOperator`  and `DummyEnvironment` 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
##########
@@ -218,7 +218,7 @@ public void allocatePages(Object owner, Collection<MemorySegment> target, int nu
         Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
         Preconditions.checkArgument(
                 numberOfPages <= totalNumberOfPages,
-                "Cannot allocate more segments %d than the max number %d",
+                "Cannot allocate more segments %s than the max number %s",

Review comment:
       nice catch, Could you also fix `Illegal negative checkpoint id: %d` in `FsCheckpointStorageAccess`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
+
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.aggregate.window.combines.WindowCombineFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
+import org.apache.flink.table.runtime.util.KeyValueIterator;
+import org.apache.flink.table.runtime.util.WindowKey;
+import org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo;
+import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.EOFException;
+import java.util.Iterator;
+
+/**
+ * An implementation of {@link WindowBuffer} that buffers input elements in a {@link
+ * WindowBytesMultiMap} and combines buffered elements into state when flushing.
+ */
+public final class RecordsWindowBuffer implements WindowBuffer {
+
+    private final WindowCombineFunction combineFunction;
+    private final WindowBytesMultiMap recordsBuffer;
+    private final WindowKey reuseWindowKey;
+    private final RowDataSerializer recordSerializer;
+
+    private long minTriggerTime = Long.MAX_VALUE;
+
+    public RecordsWindowBuffer(
+            Object operatorOwner,
+            MemoryManager memoryManager,
+            long memorySize,
+            WindowCombineFunction combineFunction,
+            LogicalType[] keyTypes,
+            RowType inputType) {
+        this.combineFunction = combineFunction;
+        LogicalType[] inputFieldTypes =
+                inputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .toArray(LogicalType[]::new);
+        this.recordsBuffer =
+                new WindowBytesMultiMap(
+                        operatorOwner, memoryManager, memorySize, keyTypes, inputFieldTypes);
+        this.recordSerializer = new RowDataSerializer(inputFieldTypes);
+        this.reuseWindowKey = new WindowKeySerializer(keyTypes.length).createInstance();
+    }
+
+    @Override
+    public void addElement(BinaryRowData key, long sliceEnd, RowData element) throws Exception {
+        // track the lowest trigger time, if watermark exceeds the trigger time, it means there
+        minTriggerTime = Math.min(sliceEnd - 1, minTriggerTime);
+
+        reuseWindowKey.replace(sliceEnd, key);
+        LookupInfo<WindowKey, Iterator<RowData>> lookup = recordsBuffer.lookup(reuseWindowKey);
+        try {
+            recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element));
+        } catch (EOFException e) {
+            // buffer is full, flush it to state
+            flush();
+            // remember to add the input element again
+            addElement(key, sliceEnd, element);
+        }
+    }
+
+    @Override
+    public void advanceProgress(long progress) throws Exception {
+        if (progress >= minTriggerTime) {

Review comment:
       ```suggestion
           if (progress > minTriggerTime) {
   ```
   when progress==minTriggerTime, the  watermark won't  advance

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceSharedAssigner.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * A {@link SliceAssigner} which shares slices for windows, which means a window is divided into
+ * multiple slices and need to merge the slices into windows when emitting windows.
+ *
+ * <p>Classical window of {@link SliceSharedAssigner} is hopping window.
+ */
+@Internal
+public interface SliceSharedAssigner extends SliceAssigner {
+
+    /**
+     * Determines which slices (if any) should be merged.
+     *
+     * @param sliceEnd the triggered slice, identified by end timestamp
+     * @param callback a callback that can be invoked to signal which slices should be merged.
+     */
+    void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception;
+
+    /**
+     * Returns the optional end timestamp of next window which should be triggered. Empty if no
+     * following window to trigger for now.
+     *
+     * <p>The purpose of this method is avoid register too many timers for each hopping and
+     * cumulative slice, e.g. HOP(1day, 10s) needs register 4300 timers for every slice. In order to

Review comment:
       Should be24 * 60 * 60 / GCD(10, 24 * 60 * 60) - 1 = 8639 ?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.MathUtils;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utilities to create {@link SliceAssigner}s. */
+@Internal
+public final class SliceAssigners {
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Utilities
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /**
+     * Creates a tumbling window {@link SliceAssigner} that assigns elements to slices of tumbling
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on
+     *     processing time.
+     * @param size the size of the generated windows.
+     */
+    public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration size) {
+        return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0);
+    }
+
+    /**
+     * Creates a hopping window {@link SliceAssigner} that assigns elements to slices of hopping
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param size the size of the generated windows.
+     * @param slide the slide interval of the generated windows.
+     */
+    public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration size, Duration slide) {
+        return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), slide.toMillis(), 0);
+    }
+
+    /**
+     * Creates a cumulative window {@link SliceAssigner} that assigns elements to slices of
+     * cumulative windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param maxSize the max size of the generated windows.
+     * @param step the step interval of the generated windows.
+     */
+    public static CumulativeSliceAssigner cumulative(
+            int rowtimeIndex, Duration maxSize, Duration step) {
+        return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), step.toMillis(), 0);
+    }
+
+    /**
+     * Creates a {@link SliceAssigner} that assigns elements which has been attached window start
+     * and window end timestamp to slices. The assigned slice is equal to the given window.
+     *
+     * @param windowEndIndex the index of window end field in the input row, mustn't be a negative
+     *     value.
+     * @param windowSize the size of the generated window.
+     */
+    public static WindowedSliceAssigner windowed(int windowEndIndex, Duration windowSize) {
+        return new WindowedSliceAssigner(windowEndIndex, windowSize.toMillis());
+    }
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Slice Assigners
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /** The {@link SliceAssigner} for tumbling windows. */
+    public static final class TumblingSliceAssigner extends AbstractSliceAssigner
+            implements SliceUnsharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
+        public TumblingSliceAssigner withOffset(Duration offset) {
+            return new TumblingSliceAssigner(rowtimeIndex, size, offset.toMillis());
+        }
+
+        private final long size;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        private TumblingSliceAssigner(int rowtimeIndex, long size, long offset) {
+            super(rowtimeIndex);
+            checkArgument(
+                    size > 0,
+                    String.format(
+                            "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
+                            size));
+            checkArgument(
+                    Math.abs(offset) < size,
+                    String.format(
+                            "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
+                            size, offset));
+            this.size = size;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
+            return start + size;
+        }
+
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            reuseList.reset(windowEnd);
+            return reuseList;
+        }
+    }
+
+    /** The {@link SliceAssigner} for hopping windows. */
+    public static final class HoppingSliceAssigner extends AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link HoppingSliceAssigner} with a new specified offset. */
+        public HoppingSliceAssigner withOffset(Duration offset) {
+            return new HoppingSliceAssigner(rowtimeIndex, size, slide, offset.toMillis());
+        }
+
+        private final long size;
+        private final long slide;
+        private final long offset;
+        private final long sliceSize;
+        private final int numSlicesPerWindow;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        protected HoppingSliceAssigner(int rowtimeIndex, long size, long slide, long offset) {
+            super(rowtimeIndex);
+            if (size <= 0 || slide <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window must satisfy slide > 0 and size > 0, but got slide %dms and size %dms.",
+                                slide, size));
+            }
+            if (size % slide != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window requires size must be an integral multiple of slide, but got size %dms and slide %dms.",

Review comment:
       Hopping window hasn't this limitation, only the sliced hopping window requires, we can improve the message

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/StateKeyContext.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.state;
+
+import org.apache.flink.table.data.RowData;
+
+/** Context to switch current key in state backend. */
+public interface StateKeyContext {
+    /** Sets current state key to given value. */

Review comment:
       minor: Add one blank line at the class begining

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.MathUtils;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utilities to create {@link SliceAssigner}s. */
+@Internal
+public final class SliceAssigners {
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Utilities
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /**
+     * Creates a tumbling window {@link SliceAssigner} that assigns elements to slices of tumbling
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on
+     *     processing time.
+     * @param size the size of the generated windows.
+     */
+    public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration size) {
+        return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0);
+    }
+
+    /**
+     * Creates a hopping window {@link SliceAssigner} that assigns elements to slices of hopping
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param size the size of the generated windows.
+     * @param slide the slide interval of the generated windows.
+     */
+    public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration size, Duration slide) {
+        return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), slide.toMillis(), 0);
+    }
+
+    /**
+     * Creates a cumulative window {@link SliceAssigner} that assigns elements to slices of
+     * cumulative windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param maxSize the max size of the generated windows.
+     * @param step the step interval of the generated windows.
+     */
+    public static CumulativeSliceAssigner cumulative(
+            int rowtimeIndex, Duration maxSize, Duration step) {
+        return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), step.toMillis(), 0);
+    }
+
+    /**
+     * Creates a {@link SliceAssigner} that assigns elements which has been attached window start
+     * and window end timestamp to slices. The assigned slice is equal to the given window.
+     *
+     * @param windowEndIndex the index of window end field in the input row, mustn't be a negative
+     *     value.
+     * @param windowSize the size of the generated window.
+     */
+    public static WindowedSliceAssigner windowed(int windowEndIndex, Duration windowSize) {
+        return new WindowedSliceAssigner(windowEndIndex, windowSize.toMillis());
+    }
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Slice Assigners
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /** The {@link SliceAssigner} for tumbling windows. */
+    public static final class TumblingSliceAssigner extends AbstractSliceAssigner
+            implements SliceUnsharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
+        public TumblingSliceAssigner withOffset(Duration offset) {
+            return new TumblingSliceAssigner(rowtimeIndex, size, offset.toMillis());
+        }
+
+        private final long size;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        private TumblingSliceAssigner(int rowtimeIndex, long size, long offset) {
+            super(rowtimeIndex);
+            checkArgument(
+                    size > 0,
+                    String.format(
+                            "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
+                            size));
+            checkArgument(
+                    Math.abs(offset) < size,
+                    String.format(
+                            "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",

Review comment:
       ```suggestion
                               "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %d ms and offset %d ms.",
   ```

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
+
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.aggregate.window.combines.WindowCombineFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
+import org.apache.flink.table.runtime.util.KeyValueIterator;
+import org.apache.flink.table.runtime.util.WindowKey;
+import org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo;
+import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.EOFException;
+import java.util.Iterator;
+
+/**
+ * An implementation of {@link WindowBuffer} that buffers input elements in a {@link
+ * WindowBytesMultiMap} and combines buffered elements into state when flushing.
+ */
+public final class RecordsWindowBuffer implements WindowBuffer {
+
+    private final WindowCombineFunction combineFunction;
+    private final WindowBytesMultiMap recordsBuffer;
+    private final WindowKey reuseWindowKey;
+    private final RowDataSerializer recordSerializer;
+
+    private long minTriggerTime = Long.MAX_VALUE;
+
+    public RecordsWindowBuffer(
+            Object operatorOwner,
+            MemoryManager memoryManager,
+            long memorySize,
+            WindowCombineFunction combineFunction,
+            LogicalType[] keyTypes,
+            RowType inputType) {
+        this.combineFunction = combineFunction;
+        LogicalType[] inputFieldTypes =
+                inputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .toArray(LogicalType[]::new);
+        this.recordsBuffer =
+                new WindowBytesMultiMap(
+                        operatorOwner, memoryManager, memorySize, keyTypes, inputFieldTypes);
+        this.recordSerializer = new RowDataSerializer(inputFieldTypes);
+        this.reuseWindowKey = new WindowKeySerializer(keyTypes.length).createInstance();
+    }
+
+    @Override
+    public void addElement(BinaryRowData key, long sliceEnd, RowData element) throws Exception {
+        // track the lowest trigger time, if watermark exceeds the trigger time, it means there
+        minTriggerTime = Math.min(sliceEnd - 1, minTriggerTime);

Review comment:
       Looks like missed some note..

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link SlicingWindowOperator} implements an optimized processing for aligned windows which
+ * can apply the slicing optimization. The core idea of slicing optimization is to divide all
+ * elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices).
+ *
+ * <h3>Concept of Aligned Window and Unaligned Window</h3>
+ *
+ * <p>We divide windows into 2 categories: Aligned Windows and Unaligned Windows.
+ *
+ * <p>Aligned Windows are windows have predetermined window boundaries and windows can be divided
+ * into finite number of non-overlapping chunks. The boundary of an aligned window is determined
+ * independently from the time characteristic of the data stream, or messages it receives. For
+ * example, hopping (sliding) window is an aligned window as the window boundaries are predetermined
+ * based on the window size and slide. Aligned windows include tumbling, hopping, cumulative
+ * windows.
+ *
+ * <p>Unaligned Windows are windows determined dynamically based on elements. For example, session
+ * window is an unaligned window as the window boundaries are determined based on the messages
+ * timestamps and their correlations. Currently, unaligned windows include session window only.
+ *
+ * <p>Because aligned windows can be divided into finite number of non-overlapping chunks (a.k.a.
+ * slices), which can apply efficient processing to share intermediate results.
+ *
+ * <h3>Concept of Slice</h3>
+ *
+ * <p>Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the
+ * chunks are slices. It has the following properties:
+ *
+ * <ul>
+ *   <li>An element must only belong to a single slice.
+ *   <li>Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j.
+ *   <li>A window is consist of a finite number of slices.
+ * </ul>
+ *
+ * <h3>Abstraction of Slicing Window Operator</h3>
+ *
+ * <p>A slicing window operator is a simple wrap of {@link SlicingWindowProcessor}. It delegates all
+ * the important methods to the underlying processor, where the processor can have different
+ * implementation for aggregate and topk or others.
+ *
+ * <p>A {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign slices
+ * and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an example.
+ *
+ * <p>Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows,
+ * therefore, it doesn't support unaligned windows, e.g. session window.
+ *
+ * <p>Note: currently, {@link SlicingWindowOperator} doesn't support early-fire and late-arrival.
+ * Thus late elements (elements belong to emitted windows) will be simply dropped.
+ */
+@Internal
+public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
+    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+
+    /** The concrete window operator implementation. */
+    private final SlicingWindowProcessor<W> windowProcessor;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** The service to register timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    /** The tracked processing time triggered last time. */
+    private transient long lastTriggeredProcessingTime;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    public SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor) {
+        this.windowProcessor = windowProcessor;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        lastTriggeredProcessingTime = Long.MIN_VALUE;
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        internalTimerService =
+                getInternalTimerService(
+                        "window-timers", windowProcessor.createWindowSerializer(), this);
+
+        windowProcessor.open(
+                new WindowProcessorContext<>(
+                        getContainingTask(),
+                        getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        internalTimerService,
+                        getKeyedStateBackend(),
+                        collector,
+                        getRuntimeContext()));
+
+        // metrics
+        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.lateRecordsDroppedRate =
+                metrics.meter(
+                        LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(numLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = internalTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return internalTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowProcessor.close();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            windowProcessor.close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData inputRow = element.getValue();
+        BinaryRowData currentKey = (BinaryRowData) getCurrentKey();
+        boolean isElementDropped = windowProcessor.processElement(currentKey, inputRow);
+        if (isElementDropped) {
+            // markEvent will increase numLateRecordsDropped
+            lateRecordsDroppedRate.markEvent();
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        windowProcessor.advanceProgress(mark.getTimestamp());
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+        onTimer(timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+        long timestamp = timer.getTimestamp();
+        if (timestamp > lastTriggeredProcessingTime) {
+            // similar to the watermark advance,
+            // we need to notify WindowProcessor first to flush buffer into state
+            lastTriggeredProcessingTime = timestamp;
+            windowProcessor.advanceProgress(timestamp);
+            // register timer again, because we need to skip current timer to avoid duplicate output
+            setCurrentKey(timer.getKey());
+            internalTimerService.registerProcessingTimeTimer(timer.getNamespace(), timestamp);

Review comment:
       I'm confusing about this code pice, the `lastTriggeredProcessingTime` is always the biggest timestamp of multiple keys,  but watermark is always the smallest of multiple keys. The test result is as expected even I call `onTimer` in current timer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org