You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/20 10:34:39 UTC

[GitHub] [flink] wuchong opened a new pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

wuchong opened a new pull request #14708:
URL: https://github.com/apache/flink/pull/14708


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   We have supported cumulative windows in FLINK-19605. However, the current cumulative window is not efficient, because the slices are not shared.
   
   We leverages the slicing ideas proposed in FLINK-7001 and this design doc [1]. The slicing is an optimized implementation for hopping, cumulative, tumbling windows. Besides of that, we introduced ManagedMemory based mini-batch optimization for the slicing window aggregate operator, this can tremendously reduce the accessing of state and get the higher throughtput without latency loss.
   
   [1]: https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit#
   
   ## Brief change log
   
   For slicing window abstraction:
   
   - Introduced a basic `SlicingWindowOperator` to support slicing window operators, for aggregate and topn in the future. 
   - Introduce the abstraction of `SliceAssigner` and several implementations, e.g. tumbling, hopping, cumulative.
   
   For window aggregate operators:
   
   - Introduce the abstraction of `WindowBuffer` to support different strategies to buffer data. Currently we only support buffering raw records using `WindowBytesMultiMap`. In the future, we will support buffering accumulators using `WindowBytesHashMap`. 
   - Introduce the abstraction of `WindowCombineFunction` to support different strategies to combine buffered data into state. Currently we only support combine and accumulate raw records into state. In the future, we will support combine accumulators into state.
   - Implements `SliceSharedWindowAggProcessor` which aggregates data based on shared slices. 
   - Implements `SliceUnsharedWindowAggProcessor` which aggregates data based on unshared slices.
   - A `SlicingWindowAggOperatorBuilder` for fluently build a window aggregate operator.
   
   ## Verifying this change
   
   - Added unit tests for every SliceAssigners for every methods
   - Added unit tests for tumbling, hopping, cumulative windows for event-time and processing-time modes. 
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563298480



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java
##########
@@ -37,37 +34,26 @@
     private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
 
     private final Object owner;
-    private final @Nullable MemoryManager memoryManager;
+    private final MemoryManager memoryManager;
     private final ArrayList<MemorySegment> cachePages;
     private final int maxPages;
     private final int perRequestPages;
 
     private int pageUsage;
 
-    public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, long memorySize) {
-        this(
-                owner,
-                memoryManager,
-                (int) memorySize
-                        / (memoryManager == null
-                                ? MemoryManager.DEFAULT_PAGE_SIZE
-                                : memoryManager.getPageSize()));
-    }
-
     public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, int maxPages) {

Review comment:
       The nullable `MemoryManager` is introduced in this month, see facd51fcf5c78c5f733c203c38e3a093ba8a5218. This is introduced because streaming operator can't use MemoryManager at that time, and we have to allocate memory from JVM heap. But with FLINK-20860 is resolved, we don't need this hack code anymore. 
   
    `BufferDataOverWindowOperator` uses `getContainingTask().getEnvironment().getMemoryManager()` to get the `MemoryManager` which should never be null in our cases, becuase we will never use `DummyEnvironment` in our tests. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311",
       "triggerID" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e00fb2507a257ad657632047b3d2ec2e5b38a737 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311) 
   * c49626cfa8cd631c731f7db6395465af1711bd20 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311",
       "triggerID" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12426",
       "triggerID" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8d3e884dfaf6092dac5e15363beb60702d90e1c0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8d3e884dfaf6092dac5e15363beb60702d90e1c0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c49626cfa8cd631c731f7db6395465af1711bd20 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12426) 
   * 8d3e884dfaf6092dac5e15363beb60702d90e1c0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 888fb9f7a4fee6a724bdbde0550f0192008af55e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311",
       "triggerID" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12426",
       "triggerID" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c49626cfa8cd631c731f7db6395465af1711bd20 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12426) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-766367200


   Thanks for the reviewing @leonardBang . Hope I have addressed all your concerns. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563445932



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link SlicingWindowOperator} implements an optimized processing for aligned windows which
+ * can apply the slicing optimization. The core idea of slicing optimization is to divide all
+ * elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices).
+ *
+ * <h3>Concept of Aligned Window and Unaligned Window</h3>
+ *
+ * <p>We divide windows into 2 categories: Aligned Windows and Unaligned Windows.
+ *
+ * <p>Aligned Windows are windows have predetermined window boundaries and windows can be divided
+ * into finite number of non-overlapping chunks. The boundary of an aligned window is determined
+ * independently from the time characteristic of the data stream, or messages it receives. For
+ * example, hopping (sliding) window is an aligned window as the window boundaries are predetermined
+ * based on the window size and slide. Aligned windows include tumbling, hopping, cumulative
+ * windows.
+ *
+ * <p>Unaligned Windows are windows determined dynamically based on elements. For example, session
+ * window is an unaligned window as the window boundaries are determined based on the messages
+ * timestamps and their correlations. Currently, unaligned windows include session window only.
+ *
+ * <p>Because aligned windows can be divided into finite number of non-overlapping chunks (a.k.a.
+ * slices), which can apply efficient processing to share intermediate results.
+ *
+ * <h3>Concept of Slice</h3>
+ *
+ * <p>Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the
+ * chunks are slices. It has the following properties:
+ *
+ * <ul>
+ *   <li>An element must only belong to a single slice.
+ *   <li>Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j.
+ *   <li>A window is consist of a finite number of slices.
+ * </ul>
+ *
+ * <h3>Abstraction of Slicing Window Operator</h3>
+ *
+ * <p>A slicing window operator is a simple wrap of {@link SlicingWindowProcessor}. It delegates all
+ * the important methods to the underlying processor, where the processor can have different
+ * implementation for aggregate and topk or others.
+ *
+ * <p>A {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign slices
+ * and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an example.
+ *
+ * <p>Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows,
+ * therefore, it doesn't support unaligned windows, e.g. session window.
+ *
+ * <p>Note: currently, {@link SlicingWindowOperator} doesn't support early-fire and late-arrival.
+ * Thus late elements (elements belong to emitted windows) will be simply dropped.
+ */
+@Internal
+public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
+    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+
+    /** The concrete window operator implementation. */
+    private final SlicingWindowProcessor<W> windowProcessor;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** The service to register timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    /** The tracked processing time triggered last time. */
+    private transient long lastTriggeredProcessingTime;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    public SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor) {
+        this.windowProcessor = windowProcessor;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        lastTriggeredProcessingTime = Long.MIN_VALUE;
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        internalTimerService =
+                getInternalTimerService(
+                        "window-timers", windowProcessor.createWindowSerializer(), this);
+
+        windowProcessor.open(
+                new WindowProcessorContext<>(
+                        getContainingTask(),
+                        getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        internalTimerService,
+                        getKeyedStateBackend(),
+                        collector,
+                        getRuntimeContext()));
+
+        // metrics
+        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.lateRecordsDroppedRate =
+                metrics.meter(
+                        LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(numLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = internalTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return internalTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowProcessor.close();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            windowProcessor.close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData inputRow = element.getValue();
+        BinaryRowData currentKey = (BinaryRowData) getCurrentKey();
+        boolean isElementDropped = windowProcessor.processElement(currentKey, inputRow);
+        if (isElementDropped) {
+            // markEvent will increase numLateRecordsDropped
+            lateRecordsDroppedRate.markEvent();
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        windowProcessor.advanceProgress(mark.getTimestamp());
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+        onTimer(timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+        long timestamp = timer.getTimestamp();
+        if (timestamp > lastTriggeredProcessingTime) {
+            // similar to the watermark advance,
+            // we need to notify WindowProcessor first to flush buffer into state
+            lastTriggeredProcessingTime = timestamp;
+            windowProcessor.advanceProgress(timestamp);
+            // register timer again, because we need to skip current timer to avoid duplicate output
+            setCurrentKey(timer.getKey());
+            internalTimerService.registerProcessingTimeTimer(timer.getNamespace(), timestamp);

Review comment:
       I delete the processing timer in `CombineRecordsFunction` at the same time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763509713


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 888fb9f7a4fee6a724bdbde0550f0192008af55e (Wed Jan 20 10:36:45 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 480de3a5924eb8a615e173e3044a2e6cf22e16e4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284) 
   * e00fb2507a257ad657632047b3d2ec2e5b38a737 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563307764



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link SlicingWindowOperator} implements an optimized processing for aligned windows which
+ * can apply the slicing optimization. The core idea of slicing optimization is to divide all
+ * elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices).
+ *
+ * <h3>Concept of Aligned Window and Unaligned Window</h3>
+ *
+ * <p>We divide windows into 2 categories: Aligned Windows and Unaligned Windows.
+ *
+ * <p>Aligned Windows are windows have predetermined window boundaries and windows can be divided
+ * into finite number of non-overlapping chunks. The boundary of an aligned window is determined
+ * independently from the time characteristic of the data stream, or messages it receives. For
+ * example, hopping (sliding) window is an aligned window as the window boundaries are predetermined
+ * based on the window size and slide. Aligned windows include tumbling, hopping, cumulative
+ * windows.
+ *
+ * <p>Unaligned Windows are windows determined dynamically based on elements. For example, session
+ * window is an unaligned window as the window boundaries are determined based on the messages
+ * timestamps and their correlations. Currently, unaligned windows include session window only.
+ *
+ * <p>Because aligned windows can be divided into finite number of non-overlapping chunks (a.k.a.
+ * slices), which can apply efficient processing to share intermediate results.
+ *
+ * <h3>Concept of Slice</h3>
+ *
+ * <p>Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the
+ * chunks are slices. It has the following properties:
+ *
+ * <ul>
+ *   <li>An element must only belong to a single slice.
+ *   <li>Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j.
+ *   <li>A window is consist of a finite number of slices.
+ * </ul>
+ *
+ * <h3>Abstraction of Slicing Window Operator</h3>
+ *
+ * <p>A slicing window operator is a simple wrap of {@link SlicingWindowProcessor}. It delegates all
+ * the important methods to the underlying processor, where the processor can have different
+ * implementation for aggregate and topk or others.
+ *
+ * <p>A {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign slices
+ * and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an example.
+ *
+ * <p>Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows,
+ * therefore, it doesn't support unaligned windows, e.g. session window.
+ *
+ * <p>Note: currently, {@link SlicingWindowOperator} doesn't support early-fire and late-arrival.
+ * Thus late elements (elements belong to emitted windows) will be simply dropped.
+ */
+@Internal
+public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
+    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+
+    /** The concrete window operator implementation. */
+    private final SlicingWindowProcessor<W> windowProcessor;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** The service to register timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    /** The tracked processing time triggered last time. */
+    private transient long lastTriggeredProcessingTime;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    public SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor) {
+        this.windowProcessor = windowProcessor;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        lastTriggeredProcessingTime = Long.MIN_VALUE;
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        internalTimerService =
+                getInternalTimerService(
+                        "window-timers", windowProcessor.createWindowSerializer(), this);
+
+        windowProcessor.open(
+                new WindowProcessorContext<>(
+                        getContainingTask(),
+                        getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        internalTimerService,
+                        getKeyedStateBackend(),
+                        collector,
+                        getRuntimeContext()));
+
+        // metrics
+        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.lateRecordsDroppedRate =
+                metrics.meter(
+                        LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(numLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = internalTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return internalTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowProcessor.close();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            windowProcessor.close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData inputRow = element.getValue();
+        BinaryRowData currentKey = (BinaryRowData) getCurrentKey();
+        boolean isElementDropped = windowProcessor.processElement(currentKey, inputRow);
+        if (isElementDropped) {
+            // markEvent will increase numLateRecordsDropped
+            lateRecordsDroppedRate.markEvent();
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        windowProcessor.advanceProgress(mark.getTimestamp());
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+        onTimer(timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+        long timestamp = timer.getTimestamp();
+        if (timestamp > lastTriggeredProcessingTime) {
+            // similar to the watermark advance,
+            // we need to notify WindowProcessor first to flush buffer into state
+            lastTriggeredProcessingTime = timestamp;
+            windowProcessor.advanceProgress(timestamp);
+            // register timer again, because we need to skip current timer to avoid duplicate output
+            setCurrentKey(timer.getKey());
+            internalTimerService.registerProcessingTimeTimer(timer.getNamespace(), timestamp);

Review comment:
       > the lastTriggeredProcessingTime is always the biggest timestamp of multiple keys
   Nope. The `lastTriggeredProcessingTime` should be the smallest timestamp of multiple timers (keys in your question?) which are not triggered. Because the timers are triggered by timer time in order. 
   
   > The test result is as expected even I call onTimer in current timer.
   Are you sure? The `SlicingWindowAggOperatorTest` will fail if I remove the `return` in `onProcessingTime` method.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link SlicingWindowOperator} implements an optimized processing for aligned windows which
+ * can apply the slicing optimization. The core idea of slicing optimization is to divide all
+ * elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices).
+ *
+ * <h3>Concept of Aligned Window and Unaligned Window</h3>
+ *
+ * <p>We divide windows into 2 categories: Aligned Windows and Unaligned Windows.
+ *
+ * <p>Aligned Windows are windows have predetermined window boundaries and windows can be divided
+ * into finite number of non-overlapping chunks. The boundary of an aligned window is determined
+ * independently from the time characteristic of the data stream, or messages it receives. For
+ * example, hopping (sliding) window is an aligned window as the window boundaries are predetermined
+ * based on the window size and slide. Aligned windows include tumbling, hopping, cumulative
+ * windows.
+ *
+ * <p>Unaligned Windows are windows determined dynamically based on elements. For example, session
+ * window is an unaligned window as the window boundaries are determined based on the messages
+ * timestamps and their correlations. Currently, unaligned windows include session window only.
+ *
+ * <p>Because aligned windows can be divided into finite number of non-overlapping chunks (a.k.a.
+ * slices), which can apply efficient processing to share intermediate results.
+ *
+ * <h3>Concept of Slice</h3>
+ *
+ * <p>Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the
+ * chunks are slices. It has the following properties:
+ *
+ * <ul>
+ *   <li>An element must only belong to a single slice.
+ *   <li>Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j.
+ *   <li>A window is consist of a finite number of slices.
+ * </ul>
+ *
+ * <h3>Abstraction of Slicing Window Operator</h3>
+ *
+ * <p>A slicing window operator is a simple wrap of {@link SlicingWindowProcessor}. It delegates all
+ * the important methods to the underlying processor, where the processor can have different
+ * implementation for aggregate and topk or others.
+ *
+ * <p>A {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign slices
+ * and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an example.
+ *
+ * <p>Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows,
+ * therefore, it doesn't support unaligned windows, e.g. session window.
+ *
+ * <p>Note: currently, {@link SlicingWindowOperator} doesn't support early-fire and late-arrival.
+ * Thus late elements (elements belong to emitted windows) will be simply dropped.
+ */
+@Internal
+public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
+    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+
+    /** The concrete window operator implementation. */
+    private final SlicingWindowProcessor<W> windowProcessor;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** The service to register timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    /** The tracked processing time triggered last time. */
+    private transient long lastTriggeredProcessingTime;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    public SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor) {
+        this.windowProcessor = windowProcessor;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        lastTriggeredProcessingTime = Long.MIN_VALUE;
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        internalTimerService =
+                getInternalTimerService(
+                        "window-timers", windowProcessor.createWindowSerializer(), this);
+
+        windowProcessor.open(
+                new WindowProcessorContext<>(
+                        getContainingTask(),
+                        getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        internalTimerService,
+                        getKeyedStateBackend(),
+                        collector,
+                        getRuntimeContext()));
+
+        // metrics
+        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.lateRecordsDroppedRate =
+                metrics.meter(
+                        LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(numLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = internalTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return internalTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowProcessor.close();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            windowProcessor.close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData inputRow = element.getValue();
+        BinaryRowData currentKey = (BinaryRowData) getCurrentKey();
+        boolean isElementDropped = windowProcessor.processElement(currentKey, inputRow);
+        if (isElementDropped) {
+            // markEvent will increase numLateRecordsDropped
+            lateRecordsDroppedRate.markEvent();
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        windowProcessor.advanceProgress(mark.getTimestamp());
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+        onTimer(timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+        long timestamp = timer.getTimestamp();
+        if (timestamp > lastTriggeredProcessingTime) {
+            // similar to the watermark advance,
+            // we need to notify WindowProcessor first to flush buffer into state
+            lastTriggeredProcessingTime = timestamp;
+            windowProcessor.advanceProgress(timestamp);
+            // register timer again, because we need to skip current timer to avoid duplicate output
+            setCurrentKey(timer.getKey());
+            internalTimerService.registerProcessingTimeTimer(timer.getNamespace(), timestamp);

Review comment:
       > the lastTriggeredProcessingTime is always the biggest timestamp of multiple keys
   
   Nope. The `lastTriggeredProcessingTime` should be the smallest timestamp of multiple timers (keys in your question?) which are not triggered. Because the timers are triggered by timer time in order. 
   
   > The test result is as expected even I call onTimer in current timer.
   
   Are you sure? The `SlicingWindowAggOperatorTest` will fail if I remove the `return` in `onProcessingTime` method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong closed pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong closed pull request #14708:
URL: https://github.com/apache/flink/pull/14708


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong closed pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong closed pull request #14708:
URL: https://github.com/apache/flink/pull/14708


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311",
       "triggerID" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12426",
       "triggerID" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8d3e884dfaf6092dac5e15363beb60702d90e1c0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12434",
       "triggerID" : "8d3e884dfaf6092dac5e15363beb60702d90e1c0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c49626cfa8cd631c731f7db6395465af1711bd20 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12426) 
   * 8d3e884dfaf6092dac5e15363beb60702d90e1c0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12434) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311",
       "triggerID" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 480de3a5924eb8a615e173e3044a2e6cf22e16e4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284) 
   * e00fb2507a257ad657632047b3d2ec2e5b38a737 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 888fb9f7a4fee6a724bdbde0550f0192008af55e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280) 
   * 480de3a5924eb8a615e173e3044a2e6cf22e16e4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763512047


   Hi @xintongsong , could you help to review the first two small commits related to the managed memory?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311",
       "triggerID" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12426",
       "triggerID" : "c49626cfa8cd631c731f7db6395465af1711bd20",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e00fb2507a257ad657632047b3d2ec2e5b38a737 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311) 
   * c49626cfa8cd631c731f7db6395465af1711bd20 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12426) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 888fb9f7a4fee6a724bdbde0550f0192008af55e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563445932



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link SlicingWindowOperator} implements an optimized processing for aligned windows which
+ * can apply the slicing optimization. The core idea of slicing optimization is to divide all
+ * elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices).
+ *
+ * <h3>Concept of Aligned Window and Unaligned Window</h3>
+ *
+ * <p>We divide windows into 2 categories: Aligned Windows and Unaligned Windows.
+ *
+ * <p>Aligned Windows are windows have predetermined window boundaries and windows can be divided
+ * into finite number of non-overlapping chunks. The boundary of an aligned window is determined
+ * independently from the time characteristic of the data stream, or messages it receives. For
+ * example, hopping (sliding) window is an aligned window as the window boundaries are predetermined
+ * based on the window size and slide. Aligned windows include tumbling, hopping, cumulative
+ * windows.
+ *
+ * <p>Unaligned Windows are windows determined dynamically based on elements. For example, session
+ * window is an unaligned window as the window boundaries are determined based on the messages
+ * timestamps and their correlations. Currently, unaligned windows include session window only.
+ *
+ * <p>Because aligned windows can be divided into finite number of non-overlapping chunks (a.k.a.
+ * slices), which can apply efficient processing to share intermediate results.
+ *
+ * <h3>Concept of Slice</h3>
+ *
+ * <p>Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the
+ * chunks are slices. It has the following properties:
+ *
+ * <ul>
+ *   <li>An element must only belong to a single slice.
+ *   <li>Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j.
+ *   <li>A window is consist of a finite number of slices.
+ * </ul>
+ *
+ * <h3>Abstraction of Slicing Window Operator</h3>
+ *
+ * <p>A slicing window operator is a simple wrap of {@link SlicingWindowProcessor}. It delegates all
+ * the important methods to the underlying processor, where the processor can have different
+ * implementation for aggregate and topk or others.
+ *
+ * <p>A {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign slices
+ * and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an example.
+ *
+ * <p>Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows,
+ * therefore, it doesn't support unaligned windows, e.g. session window.
+ *
+ * <p>Note: currently, {@link SlicingWindowOperator} doesn't support early-fire and late-arrival.
+ * Thus late elements (elements belong to emitted windows) will be simply dropped.
+ */
+@Internal
+public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
+    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+
+    /** The concrete window operator implementation. */
+    private final SlicingWindowProcessor<W> windowProcessor;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** The service to register timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    /** The tracked processing time triggered last time. */
+    private transient long lastTriggeredProcessingTime;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    public SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor) {
+        this.windowProcessor = windowProcessor;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        lastTriggeredProcessingTime = Long.MIN_VALUE;
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        internalTimerService =
+                getInternalTimerService(
+                        "window-timers", windowProcessor.createWindowSerializer(), this);
+
+        windowProcessor.open(
+                new WindowProcessorContext<>(
+                        getContainingTask(),
+                        getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        internalTimerService,
+                        getKeyedStateBackend(),
+                        collector,
+                        getRuntimeContext()));
+
+        // metrics
+        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.lateRecordsDroppedRate =
+                metrics.meter(
+                        LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(numLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = internalTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return internalTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowProcessor.close();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            windowProcessor.close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData inputRow = element.getValue();
+        BinaryRowData currentKey = (BinaryRowData) getCurrentKey();
+        boolean isElementDropped = windowProcessor.processElement(currentKey, inputRow);
+        if (isElementDropped) {
+            // markEvent will increase numLateRecordsDropped
+            lateRecordsDroppedRate.markEvent();
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        windowProcessor.advanceProgress(mark.getTimestamp());
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+        onTimer(timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+        long timestamp = timer.getTimestamp();
+        if (timestamp > lastTriggeredProcessingTime) {
+            // similar to the watermark advance,
+            // we need to notify WindowProcessor first to flush buffer into state
+            lastTriggeredProcessingTime = timestamp;
+            windowProcessor.advanceProgress(timestamp);
+            // register timer again, because we need to skip current timer to avoid duplicate output
+            setCurrentKey(timer.getKey());
+            internalTimerService.registerProcessingTimeTimer(timer.getNamespace(), timestamp);

Review comment:
       I delete the processing timer in `CombineRecordsFunction` at the same time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311",
       "triggerID" : "e00fb2507a257ad657632047b3d2ec2e5b38a737",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e00fb2507a257ad657632047b3d2ec2e5b38a737 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12311) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563306860



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.MathUtils;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utilities to create {@link SliceAssigner}s. */
+@Internal
+public final class SliceAssigners {
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Utilities
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /**
+     * Creates a tumbling window {@link SliceAssigner} that assigns elements to slices of tumbling
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on
+     *     processing time.
+     * @param size the size of the generated windows.
+     */
+    public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration size) {
+        return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0);
+    }
+
+    /**
+     * Creates a hopping window {@link SliceAssigner} that assigns elements to slices of hopping
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param size the size of the generated windows.
+     * @param slide the slide interval of the generated windows.
+     */
+    public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration size, Duration slide) {
+        return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), slide.toMillis(), 0);
+    }
+
+    /**
+     * Creates a cumulative window {@link SliceAssigner} that assigns elements to slices of
+     * cumulative windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param maxSize the max size of the generated windows.
+     * @param step the step interval of the generated windows.
+     */
+    public static CumulativeSliceAssigner cumulative(
+            int rowtimeIndex, Duration maxSize, Duration step) {
+        return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), step.toMillis(), 0);
+    }
+
+    /**
+     * Creates a {@link SliceAssigner} that assigns elements which has been attached window start
+     * and window end timestamp to slices. The assigned slice is equal to the given window.
+     *
+     * @param windowEndIndex the index of window end field in the input row, mustn't be a negative
+     *     value.
+     * @param windowSize the size of the generated window.
+     */
+    public static WindowedSliceAssigner windowed(int windowEndIndex, Duration windowSize) {
+        return new WindowedSliceAssigner(windowEndIndex, windowSize.toMillis());
+    }
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Slice Assigners
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /** The {@link SliceAssigner} for tumbling windows. */
+    public static final class TumblingSliceAssigner extends AbstractSliceAssigner
+            implements SliceUnsharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
+        public TumblingSliceAssigner withOffset(Duration offset) {
+            return new TumblingSliceAssigner(rowtimeIndex, size, offset.toMillis());
+        }
+
+        private final long size;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        private TumblingSliceAssigner(int rowtimeIndex, long size, long offset) {
+            super(rowtimeIndex);
+            checkArgument(
+                    size > 0,
+                    String.format(
+                            "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
+                            size));
+            checkArgument(
+                    Math.abs(offset) < size,
+                    String.format(
+                            "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
+                            size, offset));
+            this.size = size;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
+            return start + size;
+        }
+
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            reuseList.reset(windowEnd);
+            return reuseList;
+        }
+    }
+
+    /** The {@link SliceAssigner} for hopping windows. */
+    public static final class HoppingSliceAssigner extends AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link HoppingSliceAssigner} with a new specified offset. */
+        public HoppingSliceAssigner withOffset(Duration offset) {
+            return new HoppingSliceAssigner(rowtimeIndex, size, slide, offset.toMillis());
+        }
+
+        private final long size;
+        private final long slide;
+        private final long offset;
+        private final long sliceSize;
+        private final int numSlicesPerWindow;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        protected HoppingSliceAssigner(int rowtimeIndex, long size, long slide, long offset) {
+            super(rowtimeIndex);
+            if (size <= 0 || slide <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window must satisfy slide > 0 and size > 0, but got slide %dms and size %dms.",
+                                slide, size));
+            }
+            if (size % slide != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window requires size must be an integral multiple of slide, but got size %dms and slide %dms.",
+                                size, slide));
+            }
+            this.size = size;
+            this.slide = slide;
+            this.offset = offset;
+            this.sliceSize = ArithmeticUtils.gcd(size, slide);
+            this.numSlicesPerWindow = MathUtils.checkedDownCast(size / sliceSize);
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, sliceSize);
+            return start + sliceSize;
+        }
+
+        @Override
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            // we need to cleanup the first slice of the window
+            long windowStart = getWindowStart(windowEnd);
+            long firstSliceEnd = windowStart + sliceSize;
+            reuseList.reset(firstSliceEnd);
+            return reuseList;
+        }
+
+        @Override
+        public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
+            // the iterable to list all the slices of the triggered window
+            Iterable<Long> toBeMerged =
+                    new HoppingSlicesIterable(sliceEnd, sliceSize, numSlicesPerWindow);
+            // null namespace means use heap data views, instead of state state views
+            callback.merge(null, toBeMerged);
+        }
+
+        @Override
+        public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
+            if (isWindowEmpty.get()) {
+                return Optional.empty();
+            } else {
+                return Optional.of(windowEnd + sliceSize);
+            }
+        }
+    }
+
+    /** The {@link SliceAssigner} for cumulative windows. */
+    public static final class CumulativeSliceAssigner extends AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link CumulativeSliceAssigner} with a new specified offset. */
+        public CumulativeSliceAssigner withOffset(Duration offset) {
+            return new CumulativeSliceAssigner(rowtimeIndex, maxSize, step, offset.toMillis());
+        }
+
+        private final long maxSize;
+        private final long step;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        protected CumulativeSliceAssigner(int rowtimeIndex, long maxSize, long step, long offset) {
+            super(rowtimeIndex);
+            if (maxSize <= 0 || step <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cumulative Window parameters must satisfy maxSize > 0 and step > 0, but got maxSize %dms and step %dms.",
+                                maxSize, step));
+            }
+            if (maxSize % step != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cumulative Window requires maxSize must be an integral multiple of step, but got maxSize %dms and step %dms.",
+                                maxSize, step));
+            }
+
+            this.maxSize = maxSize;
+            this.step = step;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, step);
+            return start + step;
+        }
+
+        @Override
+        public long getWindowStart(long windowEnd) {
+            return TimeWindow.getWindowStartWithOffset(windowEnd - 1, offset, maxSize);
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            long windowStart = getWindowStart(windowEnd);
+            long firstSliceEnd = windowStart + step;
+            long lastSliceEnd = windowStart + maxSize;
+            if (windowEnd == firstSliceEnd) {
+                // we reuse state in the first slice, skip cleanup for the first slice
+                return Collections.emptyList();

Review comment:
       Unfortunately, we can't. Because the `reuseList` is not only used to construct expried slices but also the merged slices. I think it's fine to think it as a resued wrapper of result. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763509713


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 8d3e884dfaf6092dac5e15363beb60702d90e1c0 (Fri May 28 07:13:07 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-766367200






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 480de3a5924eb8a615e173e3044a2e6cf22e16e4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12284) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-766624340


   Build is passed: https://dev.azure.com/imjark/Flink/_build/results?buildId=397&view=results


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563305734



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceSharedAssigner.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * A {@link SliceAssigner} which shares slices for windows, which means a window is divided into
+ * multiple slices and need to merge the slices into windows when emitting windows.
+ *
+ * <p>Classical window of {@link SliceSharedAssigner} is hopping window.
+ */
+@Internal
+public interface SliceSharedAssigner extends SliceAssigner {
+
+    /**
+     * Determines which slices (if any) should be merged.
+     *
+     * @param sliceEnd the triggered slice, identified by end timestamp
+     * @param callback a callback that can be invoked to signal which slices should be merged.
+     */
+    void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception;
+
+    /**
+     * Returns the optional end timestamp of next window which should be triggered. Empty if no
+     * following window to trigger for now.
+     *
+     * <p>The purpose of this method is avoid register too many timers for each hopping and
+     * cumulative slice, e.g. HOP(1day, 10s) needs register 4300 timers for every slice. In order to

Review comment:
       Should be 8640? Why -1 ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563306124



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.MathUtils;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utilities to create {@link SliceAssigner}s. */
+@Internal
+public final class SliceAssigners {
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Utilities
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /**
+     * Creates a tumbling window {@link SliceAssigner} that assigns elements to slices of tumbling
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on
+     *     processing time.
+     * @param size the size of the generated windows.
+     */
+    public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration size) {
+        return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0);
+    }
+
+    /**
+     * Creates a hopping window {@link SliceAssigner} that assigns elements to slices of hopping
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param size the size of the generated windows.
+     * @param slide the slide interval of the generated windows.
+     */
+    public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration size, Duration slide) {
+        return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), slide.toMillis(), 0);
+    }
+
+    /**
+     * Creates a cumulative window {@link SliceAssigner} that assigns elements to slices of
+     * cumulative windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param maxSize the max size of the generated windows.
+     * @param step the step interval of the generated windows.
+     */
+    public static CumulativeSliceAssigner cumulative(
+            int rowtimeIndex, Duration maxSize, Duration step) {
+        return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), step.toMillis(), 0);
+    }
+
+    /**
+     * Creates a {@link SliceAssigner} that assigns elements which has been attached window start
+     * and window end timestamp to slices. The assigned slice is equal to the given window.
+     *
+     * @param windowEndIndex the index of window end field in the input row, mustn't be a negative
+     *     value.
+     * @param windowSize the size of the generated window.
+     */
+    public static WindowedSliceAssigner windowed(int windowEndIndex, Duration windowSize) {
+        return new WindowedSliceAssigner(windowEndIndex, windowSize.toMillis());
+    }
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Slice Assigners
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /** The {@link SliceAssigner} for tumbling windows. */
+    public static final class TumblingSliceAssigner extends AbstractSliceAssigner
+            implements SliceUnsharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
+        public TumblingSliceAssigner withOffset(Duration offset) {
+            return new TumblingSliceAssigner(rowtimeIndex, size, offset.toMillis());
+        }
+
+        private final long size;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        private TumblingSliceAssigner(int rowtimeIndex, long size, long offset) {
+            super(rowtimeIndex);
+            checkArgument(
+                    size > 0,
+                    String.format(
+                            "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
+                            size));
+            checkArgument(
+                    Math.abs(offset) < size,
+                    String.format(
+                            "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",

Review comment:
       This is on purpose, to make the message more concise, e.g. `got size 10000ms and offset 20000ms`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xintongsong commented on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
xintongsong commented on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763519912


   The first two commits LGTM.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563307764



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link SlicingWindowOperator} implements an optimized processing for aligned windows which
+ * can apply the slicing optimization. The core idea of slicing optimization is to divide all
+ * elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices).
+ *
+ * <h3>Concept of Aligned Window and Unaligned Window</h3>
+ *
+ * <p>We divide windows into 2 categories: Aligned Windows and Unaligned Windows.
+ *
+ * <p>Aligned Windows are windows have predetermined window boundaries and windows can be divided
+ * into finite number of non-overlapping chunks. The boundary of an aligned window is determined
+ * independently from the time characteristic of the data stream, or messages it receives. For
+ * example, hopping (sliding) window is an aligned window as the window boundaries are predetermined
+ * based on the window size and slide. Aligned windows include tumbling, hopping, cumulative
+ * windows.
+ *
+ * <p>Unaligned Windows are windows determined dynamically based on elements. For example, session
+ * window is an unaligned window as the window boundaries are determined based on the messages
+ * timestamps and their correlations. Currently, unaligned windows include session window only.
+ *
+ * <p>Because aligned windows can be divided into finite number of non-overlapping chunks (a.k.a.
+ * slices), which can apply efficient processing to share intermediate results.
+ *
+ * <h3>Concept of Slice</h3>
+ *
+ * <p>Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the
+ * chunks are slices. It has the following properties:
+ *
+ * <ul>
+ *   <li>An element must only belong to a single slice.
+ *   <li>Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j.
+ *   <li>A window is consist of a finite number of slices.
+ * </ul>
+ *
+ * <h3>Abstraction of Slicing Window Operator</h3>
+ *
+ * <p>A slicing window operator is a simple wrap of {@link SlicingWindowProcessor}. It delegates all
+ * the important methods to the underlying processor, where the processor can have different
+ * implementation for aggregate and topk or others.
+ *
+ * <p>A {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign slices
+ * and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an example.
+ *
+ * <p>Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows,
+ * therefore, it doesn't support unaligned windows, e.g. session window.
+ *
+ * <p>Note: currently, {@link SlicingWindowOperator} doesn't support early-fire and late-arrival.
+ * Thus late elements (elements belong to emitted windows) will be simply dropped.
+ */
+@Internal
+public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
+    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+
+    /** The concrete window operator implementation. */
+    private final SlicingWindowProcessor<W> windowProcessor;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** The service to register timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    /** The tracked processing time triggered last time. */
+    private transient long lastTriggeredProcessingTime;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    public SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor) {
+        this.windowProcessor = windowProcessor;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        lastTriggeredProcessingTime = Long.MIN_VALUE;
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        internalTimerService =
+                getInternalTimerService(
+                        "window-timers", windowProcessor.createWindowSerializer(), this);
+
+        windowProcessor.open(
+                new WindowProcessorContext<>(
+                        getContainingTask(),
+                        getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        internalTimerService,
+                        getKeyedStateBackend(),
+                        collector,
+                        getRuntimeContext()));
+
+        // metrics
+        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.lateRecordsDroppedRate =
+                metrics.meter(
+                        LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(numLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = internalTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return internalTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowProcessor.close();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            windowProcessor.close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData inputRow = element.getValue();
+        BinaryRowData currentKey = (BinaryRowData) getCurrentKey();
+        boolean isElementDropped = windowProcessor.processElement(currentKey, inputRow);
+        if (isElementDropped) {
+            // markEvent will increase numLateRecordsDropped
+            lateRecordsDroppedRate.markEvent();
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        windowProcessor.advanceProgress(mark.getTimestamp());
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+        onTimer(timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+        long timestamp = timer.getTimestamp();
+        if (timestamp > lastTriggeredProcessingTime) {
+            // similar to the watermark advance,
+            // we need to notify WindowProcessor first to flush buffer into state
+            lastTriggeredProcessingTime = timestamp;
+            windowProcessor.advanceProgress(timestamp);
+            // register timer again, because we need to skip current timer to avoid duplicate output
+            setCurrentKey(timer.getKey());
+            internalTimerService.registerProcessingTimeTimer(timer.getNamespace(), timestamp);

Review comment:
       > the lastTriggeredProcessingTime is always the biggest timestamp of multiple keys
   Nope. The `lastTriggeredProcessingTime` should be the smallest timestamp of multiple timers (keys in your question?) which are not triggered. Because the timers are triggered by timer time in order. 
   
   > The test result is as expected even I call onTimer in current timer.
   Are you sure? The `SlicingWindowAggOperatorTest` will fail if I remove the `return` in `onProcessingTime` method.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link SlicingWindowOperator} implements an optimized processing for aligned windows which
+ * can apply the slicing optimization. The core idea of slicing optimization is to divide all
+ * elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices).
+ *
+ * <h3>Concept of Aligned Window and Unaligned Window</h3>
+ *
+ * <p>We divide windows into 2 categories: Aligned Windows and Unaligned Windows.
+ *
+ * <p>Aligned Windows are windows have predetermined window boundaries and windows can be divided
+ * into finite number of non-overlapping chunks. The boundary of an aligned window is determined
+ * independently from the time characteristic of the data stream, or messages it receives. For
+ * example, hopping (sliding) window is an aligned window as the window boundaries are predetermined
+ * based on the window size and slide. Aligned windows include tumbling, hopping, cumulative
+ * windows.
+ *
+ * <p>Unaligned Windows are windows determined dynamically based on elements. For example, session
+ * window is an unaligned window as the window boundaries are determined based on the messages
+ * timestamps and their correlations. Currently, unaligned windows include session window only.
+ *
+ * <p>Because aligned windows can be divided into finite number of non-overlapping chunks (a.k.a.
+ * slices), which can apply efficient processing to share intermediate results.
+ *
+ * <h3>Concept of Slice</h3>
+ *
+ * <p>Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the
+ * chunks are slices. It has the following properties:
+ *
+ * <ul>
+ *   <li>An element must only belong to a single slice.
+ *   <li>Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j.
+ *   <li>A window is consist of a finite number of slices.
+ * </ul>
+ *
+ * <h3>Abstraction of Slicing Window Operator</h3>
+ *
+ * <p>A slicing window operator is a simple wrap of {@link SlicingWindowProcessor}. It delegates all
+ * the important methods to the underlying processor, where the processor can have different
+ * implementation for aggregate and topk or others.
+ *
+ * <p>A {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign slices
+ * and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an example.
+ *
+ * <p>Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows,
+ * therefore, it doesn't support unaligned windows, e.g. session window.
+ *
+ * <p>Note: currently, {@link SlicingWindowOperator} doesn't support early-fire and late-arrival.
+ * Thus late elements (elements belong to emitted windows) will be simply dropped.
+ */
+@Internal
+public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
+    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+
+    /** The concrete window operator implementation. */
+    private final SlicingWindowProcessor<W> windowProcessor;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** The service to register timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    /** The tracked processing time triggered last time. */
+    private transient long lastTriggeredProcessingTime;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    public SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor) {
+        this.windowProcessor = windowProcessor;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        lastTriggeredProcessingTime = Long.MIN_VALUE;
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        internalTimerService =
+                getInternalTimerService(
+                        "window-timers", windowProcessor.createWindowSerializer(), this);
+
+        windowProcessor.open(
+                new WindowProcessorContext<>(
+                        getContainingTask(),
+                        getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        internalTimerService,
+                        getKeyedStateBackend(),
+                        collector,
+                        getRuntimeContext()));
+
+        // metrics
+        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.lateRecordsDroppedRate =
+                metrics.meter(
+                        LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(numLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = internalTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return internalTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowProcessor.close();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            windowProcessor.close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData inputRow = element.getValue();
+        BinaryRowData currentKey = (BinaryRowData) getCurrentKey();
+        boolean isElementDropped = windowProcessor.processElement(currentKey, inputRow);
+        if (isElementDropped) {
+            // markEvent will increase numLateRecordsDropped
+            lateRecordsDroppedRate.markEvent();
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        windowProcessor.advanceProgress(mark.getTimestamp());
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+        onTimer(timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+        long timestamp = timer.getTimestamp();
+        if (timestamp > lastTriggeredProcessingTime) {
+            // similar to the watermark advance,
+            // we need to notify WindowProcessor first to flush buffer into state
+            lastTriggeredProcessingTime = timestamp;
+            windowProcessor.advanceProgress(timestamp);
+            // register timer again, because we need to skip current timer to avoid duplicate output
+            setCurrentKey(timer.getKey());
+            internalTimerService.registerProcessingTimeTimer(timer.getNamespace(), timestamp);

Review comment:
       > the lastTriggeredProcessingTime is always the biggest timestamp of multiple keys
   
   Nope. The `lastTriggeredProcessingTime` should be the smallest timestamp of multiple timers (keys in your question?) which are not triggered. Because the timers are triggered by timer time in order. 
   
   > The test result is as expected even I call onTimer in current timer.
   
   Are you sure? The `SlicingWindowAggOperatorTest` will fail if I remove the `return` in `onProcessingTime` method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14708:
URL: https://github.com/apache/flink/pull/14708#issuecomment-763516524


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280",
       "triggerID" : "888fb9f7a4fee6a724bdbde0550f0192008af55e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "480de3a5924eb8a615e173e3044a2e6cf22e16e4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 888fb9f7a4fee6a724bdbde0550f0192008af55e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12280) 
   * 480de3a5924eb8a615e173e3044a2e6cf22e16e4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r561827373



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
##########
@@ -20,9 +20,13 @@
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 
 /** Utility to create a {@link StateTtlConfig} object. */
-public class StateTtlConfigUtil {
+public class StateConfigUtil {
+
+    private static final String ROCKSDB_KEYED_STATE_BACKEDN =

Review comment:
       typo
   ```suggestion
       private static final String ROCKSDB_KEYED_STATE_BACKEND =
   ``` 

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.MathUtils;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utilities to create {@link SliceAssigner}s. */
+@Internal
+public final class SliceAssigners {
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Utilities
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /**
+     * Creates a tumbling window {@link SliceAssigner} that assigns elements to slices of tumbling
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on
+     *     processing time.
+     * @param size the size of the generated windows.
+     */
+    public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration size) {
+        return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0);
+    }
+
+    /**
+     * Creates a hopping window {@link SliceAssigner} that assigns elements to slices of hopping
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param size the size of the generated windows.
+     * @param slide the slide interval of the generated windows.
+     */
+    public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration size, Duration slide) {
+        return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), slide.toMillis(), 0);
+    }
+
+    /**
+     * Creates a cumulative window {@link SliceAssigner} that assigns elements to slices of
+     * cumulative windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param maxSize the max size of the generated windows.
+     * @param step the step interval of the generated windows.
+     */
+    public static CumulativeSliceAssigner cumulative(
+            int rowtimeIndex, Duration maxSize, Duration step) {
+        return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), step.toMillis(), 0);
+    }
+
+    /**
+     * Creates a {@link SliceAssigner} that assigns elements which has been attached window start
+     * and window end timestamp to slices. The assigned slice is equal to the given window.
+     *
+     * @param windowEndIndex the index of window end field in the input row, mustn't be a negative
+     *     value.
+     * @param windowSize the size of the generated window.
+     */
+    public static WindowedSliceAssigner windowed(int windowEndIndex, Duration windowSize) {
+        return new WindowedSliceAssigner(windowEndIndex, windowSize.toMillis());
+    }
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Slice Assigners
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /** The {@link SliceAssigner} for tumbling windows. */
+    public static final class TumblingSliceAssigner extends AbstractSliceAssigner
+            implements SliceUnsharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
+        public TumblingSliceAssigner withOffset(Duration offset) {
+            return new TumblingSliceAssigner(rowtimeIndex, size, offset.toMillis());
+        }
+
+        private final long size;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        private TumblingSliceAssigner(int rowtimeIndex, long size, long offset) {
+            super(rowtimeIndex);
+            checkArgument(
+                    size > 0,
+                    String.format(
+                            "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
+                            size));
+            checkArgument(
+                    Math.abs(offset) < size,
+                    String.format(
+                            "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
+                            size, offset));
+            this.size = size;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
+            return start + size;
+        }
+
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            reuseList.reset(windowEnd);
+            return reuseList;
+        }
+    }
+
+    /** The {@link SliceAssigner} for hopping windows. */
+    public static final class HoppingSliceAssigner extends AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link HoppingSliceAssigner} with a new specified offset. */
+        public HoppingSliceAssigner withOffset(Duration offset) {
+            return new HoppingSliceAssigner(rowtimeIndex, size, slide, offset.toMillis());
+        }
+
+        private final long size;
+        private final long slide;
+        private final long offset;
+        private final long sliceSize;
+        private final int numSlicesPerWindow;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        protected HoppingSliceAssigner(int rowtimeIndex, long size, long slide, long offset) {
+            super(rowtimeIndex);
+            if (size <= 0 || slide <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window must satisfy slide > 0 and size > 0, but got slide %dms and size %dms.",
+                                slide, size));
+            }
+            if (size % slide != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window requires size must be an integral multiple of slide, but got size %dms and slide %dms.",
+                                size, slide));
+            }
+            this.size = size;
+            this.slide = slide;
+            this.offset = offset;
+            this.sliceSize = ArithmeticUtils.gcd(size, slide);
+            this.numSlicesPerWindow = MathUtils.checkedDownCast(size / sliceSize);
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, sliceSize);
+            return start + sliceSize;
+        }
+
+        @Override
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            // we need to cleanup the first slice of the window
+            long windowStart = getWindowStart(windowEnd);
+            long firstSliceEnd = windowStart + sliceSize;
+            reuseList.reset(firstSliceEnd);
+            return reuseList;
+        }
+
+        @Override
+        public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
+            // the iterable to list all the slices of the triggered window
+            Iterable<Long> toBeMerged =
+                    new HoppingSlicesIterable(sliceEnd, sliceSize, numSlicesPerWindow);
+            // null namespace means use heap data views, instead of state state views
+            callback.merge(null, toBeMerged);
+        }
+
+        @Override
+        public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
+            if (isWindowEmpty.get()) {
+                return Optional.empty();
+            } else {
+                return Optional.of(windowEnd + sliceSize);
+            }
+        }
+    }
+
+    /** The {@link SliceAssigner} for cumulative windows. */
+    public static final class CumulativeSliceAssigner extends AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link CumulativeSliceAssigner} with a new specified offset. */
+        public CumulativeSliceAssigner withOffset(Duration offset) {
+            return new CumulativeSliceAssigner(rowtimeIndex, maxSize, step, offset.toMillis());
+        }
+
+        private final long maxSize;
+        private final long step;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        protected CumulativeSliceAssigner(int rowtimeIndex, long maxSize, long step, long offset) {
+            super(rowtimeIndex);
+            if (maxSize <= 0 || step <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cumulative Window parameters must satisfy maxSize > 0 and step > 0, but got maxSize %dms and step %dms.",
+                                maxSize, step));
+            }
+            if (maxSize % step != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cumulative Window requires maxSize must be an integral multiple of step, but got maxSize %dms and step %dms.",
+                                maxSize, step));
+            }
+
+            this.maxSize = maxSize;
+            this.step = step;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, step);
+            return start + step;
+        }
+
+        @Override
+        public long getWindowStart(long windowEnd) {
+            return TimeWindow.getWindowStartWithOffset(windowEnd - 1, offset, maxSize);
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            long windowStart = getWindowStart(windowEnd);
+            long firstSliceEnd = windowStart + step;
+            long lastSliceEnd = windowStart + maxSize;
+            if (windowEnd == firstSliceEnd) {
+                // we reuse state in the first slice, skip cleanup for the first slice
+                return Collections.emptyList();

Review comment:
       ```suggestion
                   return reuseList.clear();
   ```
   And I think we can rename the `reuseList` to `expiredSlices` which describe the logic better.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java
##########
@@ -94,7 +94,8 @@ public BytesMap(
             MemoryManager memoryManager,
             long memorySize,
             TypeSerializer<K> keySerializer) {
-        this.memoryPool = new LazyMemorySegmentPool(owner, memoryManager, memorySize);

Review comment:
        * Base class for {@link BytesHashMap} and {@link BytesMultiMap}.
   

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java
##########
@@ -37,37 +34,26 @@
     private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
 
     private final Object owner;
-    private final @Nullable MemoryManager memoryManager;
+    private final MemoryManager memoryManager;
     private final ArrayList<MemorySegment> cachePages;
     private final int maxPages;
     private final int perRequestPages;
 
     private int pageUsage;
 
-    public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, long memorySize) {
-        this(
-                owner,
-                memoryManager,
-                (int) memorySize
-                        / (memoryManager == null
-                                ? MemoryManager.DEFAULT_PAGE_SIZE
-                                : memoryManager.getPageSize()));
-    }
-
     public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, int maxPages) {

Review comment:
       There code path may come in with nullable `MemoryManager `,  see `BufferDataOverWindowOperator`  and `DummyEnvironment` 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
##########
@@ -218,7 +218,7 @@ public void allocatePages(Object owner, Collection<MemorySegment> target, int nu
         Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
         Preconditions.checkArgument(
                 numberOfPages <= totalNumberOfPages,
-                "Cannot allocate more segments %d than the max number %d",
+                "Cannot allocate more segments %s than the max number %s",

Review comment:
       nice catch, Could you also fix `Illegal negative checkpoint id: %d` in `FsCheckpointStorageAccess`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
+
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.aggregate.window.combines.WindowCombineFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
+import org.apache.flink.table.runtime.util.KeyValueIterator;
+import org.apache.flink.table.runtime.util.WindowKey;
+import org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo;
+import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.EOFException;
+import java.util.Iterator;
+
+/**
+ * An implementation of {@link WindowBuffer} that buffers input elements in a {@link
+ * WindowBytesMultiMap} and combines buffered elements into state when flushing.
+ */
+public final class RecordsWindowBuffer implements WindowBuffer {
+
+    private final WindowCombineFunction combineFunction;
+    private final WindowBytesMultiMap recordsBuffer;
+    private final WindowKey reuseWindowKey;
+    private final RowDataSerializer recordSerializer;
+
+    private long minTriggerTime = Long.MAX_VALUE;
+
+    public RecordsWindowBuffer(
+            Object operatorOwner,
+            MemoryManager memoryManager,
+            long memorySize,
+            WindowCombineFunction combineFunction,
+            LogicalType[] keyTypes,
+            RowType inputType) {
+        this.combineFunction = combineFunction;
+        LogicalType[] inputFieldTypes =
+                inputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .toArray(LogicalType[]::new);
+        this.recordsBuffer =
+                new WindowBytesMultiMap(
+                        operatorOwner, memoryManager, memorySize, keyTypes, inputFieldTypes);
+        this.recordSerializer = new RowDataSerializer(inputFieldTypes);
+        this.reuseWindowKey = new WindowKeySerializer(keyTypes.length).createInstance();
+    }
+
+    @Override
+    public void addElement(BinaryRowData key, long sliceEnd, RowData element) throws Exception {
+        // track the lowest trigger time, if watermark exceeds the trigger time, it means there
+        minTriggerTime = Math.min(sliceEnd - 1, minTriggerTime);
+
+        reuseWindowKey.replace(sliceEnd, key);
+        LookupInfo<WindowKey, Iterator<RowData>> lookup = recordsBuffer.lookup(reuseWindowKey);
+        try {
+            recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element));
+        } catch (EOFException e) {
+            // buffer is full, flush it to state
+            flush();
+            // remember to add the input element again
+            addElement(key, sliceEnd, element);
+        }
+    }
+
+    @Override
+    public void advanceProgress(long progress) throws Exception {
+        if (progress >= minTriggerTime) {

Review comment:
       ```suggestion
           if (progress > minTriggerTime) {
   ```
   when progress==minTriggerTime, the  watermark won't  advance

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceSharedAssigner.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * A {@link SliceAssigner} which shares slices for windows, which means a window is divided into
+ * multiple slices and need to merge the slices into windows when emitting windows.
+ *
+ * <p>Classical window of {@link SliceSharedAssigner} is hopping window.
+ */
+@Internal
+public interface SliceSharedAssigner extends SliceAssigner {
+
+    /**
+     * Determines which slices (if any) should be merged.
+     *
+     * @param sliceEnd the triggered slice, identified by end timestamp
+     * @param callback a callback that can be invoked to signal which slices should be merged.
+     */
+    void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception;
+
+    /**
+     * Returns the optional end timestamp of next window which should be triggered. Empty if no
+     * following window to trigger for now.
+     *
+     * <p>The purpose of this method is avoid register too many timers for each hopping and
+     * cumulative slice, e.g. HOP(1day, 10s) needs register 4300 timers for every slice. In order to

Review comment:
       Should be24 * 60 * 60 / GCD(10, 24 * 60 * 60) - 1 = 8639 ?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.MathUtils;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utilities to create {@link SliceAssigner}s. */
+@Internal
+public final class SliceAssigners {
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Utilities
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /**
+     * Creates a tumbling window {@link SliceAssigner} that assigns elements to slices of tumbling
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on
+     *     processing time.
+     * @param size the size of the generated windows.
+     */
+    public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration size) {
+        return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0);
+    }
+
+    /**
+     * Creates a hopping window {@link SliceAssigner} that assigns elements to slices of hopping
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param size the size of the generated windows.
+     * @param slide the slide interval of the generated windows.
+     */
+    public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration size, Duration slide) {
+        return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), slide.toMillis(), 0);
+    }
+
+    /**
+     * Creates a cumulative window {@link SliceAssigner} that assigns elements to slices of
+     * cumulative windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param maxSize the max size of the generated windows.
+     * @param step the step interval of the generated windows.
+     */
+    public static CumulativeSliceAssigner cumulative(
+            int rowtimeIndex, Duration maxSize, Duration step) {
+        return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), step.toMillis(), 0);
+    }
+
+    /**
+     * Creates a {@link SliceAssigner} that assigns elements which has been attached window start
+     * and window end timestamp to slices. The assigned slice is equal to the given window.
+     *
+     * @param windowEndIndex the index of window end field in the input row, mustn't be a negative
+     *     value.
+     * @param windowSize the size of the generated window.
+     */
+    public static WindowedSliceAssigner windowed(int windowEndIndex, Duration windowSize) {
+        return new WindowedSliceAssigner(windowEndIndex, windowSize.toMillis());
+    }
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Slice Assigners
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /** The {@link SliceAssigner} for tumbling windows. */
+    public static final class TumblingSliceAssigner extends AbstractSliceAssigner
+            implements SliceUnsharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
+        public TumblingSliceAssigner withOffset(Duration offset) {
+            return new TumblingSliceAssigner(rowtimeIndex, size, offset.toMillis());
+        }
+
+        private final long size;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        private TumblingSliceAssigner(int rowtimeIndex, long size, long offset) {
+            super(rowtimeIndex);
+            checkArgument(
+                    size > 0,
+                    String.format(
+                            "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
+                            size));
+            checkArgument(
+                    Math.abs(offset) < size,
+                    String.format(
+                            "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
+                            size, offset));
+            this.size = size;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
+            return start + size;
+        }
+
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            reuseList.reset(windowEnd);
+            return reuseList;
+        }
+    }
+
+    /** The {@link SliceAssigner} for hopping windows. */
+    public static final class HoppingSliceAssigner extends AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link HoppingSliceAssigner} with a new specified offset. */
+        public HoppingSliceAssigner withOffset(Duration offset) {
+            return new HoppingSliceAssigner(rowtimeIndex, size, slide, offset.toMillis());
+        }
+
+        private final long size;
+        private final long slide;
+        private final long offset;
+        private final long sliceSize;
+        private final int numSlicesPerWindow;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        protected HoppingSliceAssigner(int rowtimeIndex, long size, long slide, long offset) {
+            super(rowtimeIndex);
+            if (size <= 0 || slide <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window must satisfy slide > 0 and size > 0, but got slide %dms and size %dms.",
+                                slide, size));
+            }
+            if (size % slide != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window requires size must be an integral multiple of slide, but got size %dms and slide %dms.",

Review comment:
       Hopping window hasn't this limitation, only the sliced hopping window requires, we can improve the message

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/StateKeyContext.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.state;
+
+import org.apache.flink.table.data.RowData;
+
+/** Context to switch current key in state backend. */
+public interface StateKeyContext {
+    /** Sets current state key to given value. */

Review comment:
       minor: Add one blank line at the class begining

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.MathUtils;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utilities to create {@link SliceAssigner}s. */
+@Internal
+public final class SliceAssigners {
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Utilities
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /**
+     * Creates a tumbling window {@link SliceAssigner} that assigns elements to slices of tumbling
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on
+     *     processing time.
+     * @param size the size of the generated windows.
+     */
+    public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration size) {
+        return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0);
+    }
+
+    /**
+     * Creates a hopping window {@link SliceAssigner} that assigns elements to slices of hopping
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param size the size of the generated windows.
+     * @param slide the slide interval of the generated windows.
+     */
+    public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration size, Duration slide) {
+        return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), slide.toMillis(), 0);
+    }
+
+    /**
+     * Creates a cumulative window {@link SliceAssigner} that assigns elements to slices of
+     * cumulative windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on *
+     *     processing time.
+     * @param maxSize the max size of the generated windows.
+     * @param step the step interval of the generated windows.
+     */
+    public static CumulativeSliceAssigner cumulative(
+            int rowtimeIndex, Duration maxSize, Duration step) {
+        return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), step.toMillis(), 0);
+    }
+
+    /**
+     * Creates a {@link SliceAssigner} that assigns elements which has been attached window start
+     * and window end timestamp to slices. The assigned slice is equal to the given window.
+     *
+     * @param windowEndIndex the index of window end field in the input row, mustn't be a negative
+     *     value.
+     * @param windowSize the size of the generated window.
+     */
+    public static WindowedSliceAssigner windowed(int windowEndIndex, Duration windowSize) {
+        return new WindowedSliceAssigner(windowEndIndex, windowSize.toMillis());
+    }
+
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Slice Assigners
+    // ------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /** The {@link SliceAssigner} for tumbling windows. */
+    public static final class TumblingSliceAssigner extends AbstractSliceAssigner
+            implements SliceUnsharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
+        public TumblingSliceAssigner withOffset(Duration offset) {
+            return new TumblingSliceAssigner(rowtimeIndex, size, offset.toMillis());
+        }
+
+        private final long size;
+        private final long offset;
+        private final ReusableListIterable reuseList = new ReusableListIterable();
+
+        private TumblingSliceAssigner(int rowtimeIndex, long size, long offset) {
+            super(rowtimeIndex);
+            checkArgument(
+                    size > 0,
+                    String.format(
+                            "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
+                            size));
+            checkArgument(
+                    Math.abs(offset) < size,
+                    String.format(
+                            "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",

Review comment:
       ```suggestion
                               "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %d ms and offset %d ms.",
   ```

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
+
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.aggregate.window.combines.WindowCombineFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
+import org.apache.flink.table.runtime.util.KeyValueIterator;
+import org.apache.flink.table.runtime.util.WindowKey;
+import org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo;
+import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.EOFException;
+import java.util.Iterator;
+
+/**
+ * An implementation of {@link WindowBuffer} that buffers input elements in a {@link
+ * WindowBytesMultiMap} and combines buffered elements into state when flushing.
+ */
+public final class RecordsWindowBuffer implements WindowBuffer {
+
+    private final WindowCombineFunction combineFunction;
+    private final WindowBytesMultiMap recordsBuffer;
+    private final WindowKey reuseWindowKey;
+    private final RowDataSerializer recordSerializer;
+
+    private long minTriggerTime = Long.MAX_VALUE;
+
+    public RecordsWindowBuffer(
+            Object operatorOwner,
+            MemoryManager memoryManager,
+            long memorySize,
+            WindowCombineFunction combineFunction,
+            LogicalType[] keyTypes,
+            RowType inputType) {
+        this.combineFunction = combineFunction;
+        LogicalType[] inputFieldTypes =
+                inputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .toArray(LogicalType[]::new);
+        this.recordsBuffer =
+                new WindowBytesMultiMap(
+                        operatorOwner, memoryManager, memorySize, keyTypes, inputFieldTypes);
+        this.recordSerializer = new RowDataSerializer(inputFieldTypes);
+        this.reuseWindowKey = new WindowKeySerializer(keyTypes.length).createInstance();
+    }
+
+    @Override
+    public void addElement(BinaryRowData key, long sliceEnd, RowData element) throws Exception {
+        // track the lowest trigger time, if watermark exceeds the trigger time, it means there
+        minTriggerTime = Math.min(sliceEnd - 1, minTriggerTime);

Review comment:
       Looks like missed some note..

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link SlicingWindowOperator} implements an optimized processing for aligned windows which
+ * can apply the slicing optimization. The core idea of slicing optimization is to divide all
+ * elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices).
+ *
+ * <h3>Concept of Aligned Window and Unaligned Window</h3>
+ *
+ * <p>We divide windows into 2 categories: Aligned Windows and Unaligned Windows.
+ *
+ * <p>Aligned Windows are windows have predetermined window boundaries and windows can be divided
+ * into finite number of non-overlapping chunks. The boundary of an aligned window is determined
+ * independently from the time characteristic of the data stream, or messages it receives. For
+ * example, hopping (sliding) window is an aligned window as the window boundaries are predetermined
+ * based on the window size and slide. Aligned windows include tumbling, hopping, cumulative
+ * windows.
+ *
+ * <p>Unaligned Windows are windows determined dynamically based on elements. For example, session
+ * window is an unaligned window as the window boundaries are determined based on the messages
+ * timestamps and their correlations. Currently, unaligned windows include session window only.
+ *
+ * <p>Because aligned windows can be divided into finite number of non-overlapping chunks (a.k.a.
+ * slices), which can apply efficient processing to share intermediate results.
+ *
+ * <h3>Concept of Slice</h3>
+ *
+ * <p>Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the
+ * chunks are slices. It has the following properties:
+ *
+ * <ul>
+ *   <li>An element must only belong to a single slice.
+ *   <li>Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j.
+ *   <li>A window is consist of a finite number of slices.
+ * </ul>
+ *
+ * <h3>Abstraction of Slicing Window Operator</h3>
+ *
+ * <p>A slicing window operator is a simple wrap of {@link SlicingWindowProcessor}. It delegates all
+ * the important methods to the underlying processor, where the processor can have different
+ * implementation for aggregate and topk or others.
+ *
+ * <p>A {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign slices
+ * and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an example.
+ *
+ * <p>Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows,
+ * therefore, it doesn't support unaligned windows, e.g. session window.
+ *
+ * <p>Note: currently, {@link SlicingWindowOperator} doesn't support early-fire and late-arrival.
+ * Thus late elements (elements belong to emitted windows) will be simply dropped.
+ */
+@Internal
+public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
+    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+
+    /** The concrete window operator implementation. */
+    private final SlicingWindowProcessor<W> windowProcessor;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** The service to register timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    /** The tracked processing time triggered last time. */
+    private transient long lastTriggeredProcessingTime;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    public SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor) {
+        this.windowProcessor = windowProcessor;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        lastTriggeredProcessingTime = Long.MIN_VALUE;
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        internalTimerService =
+                getInternalTimerService(
+                        "window-timers", windowProcessor.createWindowSerializer(), this);
+
+        windowProcessor.open(
+                new WindowProcessorContext<>(
+                        getContainingTask(),
+                        getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        internalTimerService,
+                        getKeyedStateBackend(),
+                        collector,
+                        getRuntimeContext()));
+
+        // metrics
+        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.lateRecordsDroppedRate =
+                metrics.meter(
+                        LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(numLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = internalTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return internalTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowProcessor.close();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            windowProcessor.close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData inputRow = element.getValue();
+        BinaryRowData currentKey = (BinaryRowData) getCurrentKey();
+        boolean isElementDropped = windowProcessor.processElement(currentKey, inputRow);
+        if (isElementDropped) {
+            // markEvent will increase numLateRecordsDropped
+            lateRecordsDroppedRate.markEvent();
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        windowProcessor.advanceProgress(mark.getTimestamp());
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+        onTimer(timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+        long timestamp = timer.getTimestamp();
+        if (timestamp > lastTriggeredProcessingTime) {
+            // similar to the watermark advance,
+            // we need to notify WindowProcessor first to flush buffer into state
+            lastTriggeredProcessingTime = timestamp;
+            windowProcessor.advanceProgress(timestamp);
+            // register timer again, because we need to skip current timer to avoid duplicate output
+            setCurrentKey(timer.getKey());
+            internalTimerService.registerProcessingTimeTimer(timer.getNamespace(), timestamp);

Review comment:
       I'm confusing about this code pice, the `lastTriggeredProcessingTime` is always the biggest timestamp of multiple keys,  but watermark is always the smallest of multiple keys. The test result is as expected even I call `onTimer` in current timer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14708: [FLINK-21054][table-runtime-blink] Implement mini-batch optimized slicing window aggregate operator

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563298480



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java
##########
@@ -37,37 +34,26 @@
     private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
 
     private final Object owner;
-    private final @Nullable MemoryManager memoryManager;
+    private final MemoryManager memoryManager;
     private final ArrayList<MemorySegment> cachePages;
     private final int maxPages;
     private final int perRequestPages;
 
     private int pageUsage;
 
-    public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, long memorySize) {
-        this(
-                owner,
-                memoryManager,
-                (int) memorySize
-                        / (memoryManager == null
-                                ? MemoryManager.DEFAULT_PAGE_SIZE
-                                : memoryManager.getPageSize()));
-    }
-
     public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, int maxPages) {

Review comment:
       The nullable `MemoryManager` is introduced this month, see facd51fcf5c78c5f733c203c38e3a093ba8a5218. This is introduced because streaming operator can't use MemoryManager at that time, and we have to allocate memory from JVM heap. But with FLINK-20860 is resolved, we don't need this hack code anymore. 
   
    `BufferDataOverWindowOperator` uses `getContainingTask().getEnvironment().getMemoryManager()` to get the `MemoryManager` which should never be null in our cases, becuase we will never use `DummyEnvironment` in our tests. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org