You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/27 11:43:00 UTC

[GitHub] [flink] HuangXingBo opened a new pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

HuangXingBo opened a new pull request #14775:
URL: https://github.com/apache/flink/pull/14775


   ## What is the purpose of the change
   
   *This pull request will Introduce `PythonStreamGroupWindowAggregateOperator` for supporting Python Stream Group Window agg*
   
   
   ## Brief change log
   
     - *Introduce `PythonStreamGroupWindowAggregateOperator`*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *UT in `PythonStreamGroupWindowAggregateOperatorTest`*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] WeiZhong94 commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
WeiZhong94 commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r568385212



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;

Review comment:
       If we do not plan to support early fire/late fire in the first version of the Python group window aggregate, the `allowedLateness` field is unnecessary/




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569215914



##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c3", "c8", 3L, 0L), initialTime + 5));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(false, "c3", "c8", 3L, 0L), initialTime + 6));
+        testHarness.processWatermark(Long.MAX_VALUE);
+        testHarness.close();
+
+        expectedOutput.add(

Review comment:
       Yes. It can remove much deduplicate code in the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896) 
   * f2494304b2c16c02b89ee96864ff1e61f446f203 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] WeiZhong94 commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
WeiZhong94 commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r582663613



##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -194,8 +194,11 @@ message UserDefinedAggregateFunctions {
   // True if the count(*) agg is inserted by the planner.
   bool count_star_inserted = 11;
 
+  // Whether has Group Window.
+  bool has_group_window = 12;

Review comment:
       We can use `HasField` method in Python to check if the field exists.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d8f3c75d291d0050ee56f82aa019418718bf87a5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f2494304b2c16c02b89ee96864ff1e61f446f203 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697) 
   * d8f3c75d291d0050ee56f82aa019418718bf87a5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3f0036901cb06dd1f641d0e7243c6da486602b86 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569214315



##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -147,6 +147,30 @@ message UserDefinedAggregateFunction {
   bool takes_row_as_input = 6;
 }
 
+message GroupWindow {
+  enum WindowType {
+    TUMBLING_GROUP_WINDOW = 0;
+    SLIDING_GROUP_WINDOW = 1;
+    SESSION_GROUP_WINDOW = 2;
+  }
+
+  WindowType window_type = 1;
+
+  bool is_time_window = 2;
+
+  int64 window_slide = 3;
+
+  int64 window_size = 4;
+

Review comment:
       Good suggestion. Schema information will only be transmitted once, adding one more field will not affect performance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-788549281


   The failed test has been reported in [FLINK-21147](https://issues.apache.org/jira/browse/FLINK-21147) which is not related to this PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896) 
   * f2494304b2c16c02b89ee96864ff1e61f446f203 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] WeiZhong94 commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
WeiZhong94 commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r568352971



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
##########
@@ -76,8 +61,7 @@
  */

Review comment:
       The java doc needs to be changed.

##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -147,6 +147,30 @@ message UserDefinedAggregateFunction {
   bool takes_row_as_input = 6;
 }
 
+message GroupWindow {
+  enum WindowType {
+    TUMBLING_GROUP_WINDOW = 0;
+    SLIDING_GROUP_WINDOW = 1;
+    SESSION_GROUP_WINDOW = 2;
+  }
+
+  WindowType window_type = 1;
+
+  bool is_time_window = 2;
+
+  int64 window_slide = 3;
+
+  int64 window_size = 4;
+

Review comment:
       Currently the gap of the session window stores in the `window_size` field. I think we can add a new `window_gap` field to make it easier to understand.

##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c3", "c8", 3L, 0L), initialTime + 5));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(false, "c3", "c8", 3L, 0L), initialTime + 6));
+        testHarness.processWatermark(Long.MAX_VALUE);
+        testHarness.close();
+
+        expectedOutput.add(

Review comment:
       It would be better if we extract some utility method to create these stream record in one line.

##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c3", "c8", 3L, 0L), initialTime + 5));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(false, "c3", "c8", 3L, 0L), initialTime + 6));
+        testHarness.processWatermark(Long.MAX_VALUE);
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c3 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c3 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredOnCheckpoint() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processWatermark(new Watermark(10000L));
+        // checkpoint trigger finishBundle
+        testHarness.prepareSnapshotPreBarrier(0L);
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(new Watermark(10000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark(20000L);
+
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredByCount() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 4);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+
+        testHarness.processWatermark(new Watermark(10000L));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(new Watermark(10000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark(20000L);
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredByTime() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
+        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processWatermark(new Watermark(20000L));
+        assertOutputEquals(
+                "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
+
+        testHarness.setProcessingTime(1000L);
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @Override
+    public LogicalType[] getOutputLogicalType() {
+        return new LogicalType[] {
+            DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()
+        };
+    }
+
+    @Override
+    public RowType getInputType() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("f1", new VarCharType()),
+                        new RowType.RowField("f2", new VarCharType()),
+                        new RowType.RowField("f3", new BigIntType()),
+                        new RowType.RowField("rowTime", new BigIntType())));
+    }
+
+    @Override
+    public RowType getOutputType() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("f1", new VarCharType()),
+                        new RowType.RowField("f2", new BigIntType()),
+                        new RowType.RowField("windowStart", new TimestampType(3)),
+                        new RowType.RowField("windowEnd", new TimestampType(3))));
+    }
+
+    @Override
+    OneInputStreamOperator getTestOperator(Configuration config) {
+        long size = 10000L;
+        long slide = 5000L;
+        SlidingWindowAssigner windowAssigner =
+                SlidingWindowAssigner.of(Duration.ofMillis(size), Duration.ofMillis(slide))
+                        .withEventTime();
+        PlannerWindowReference windowRef =
+                new PlannerWindowReference("w$", new Some<>(new TimestampType(3)));
+        LogicalWindow window =
+                new SlidingGroupWindow(
+                        windowRef,
+                        new FieldReferenceExpression(
+                                "rowtime",
+                                new AtomicDataType(
+                                        new TimestampType(true, TimestampKind.ROWTIME, 3)),
+                                0,
+                                3),
+                        intervalOfMillis(size),
+                        intervalOfMillis(slide));
+        return new PassThroughPythonStreamGroupWindowAggregateOperator(
+                config,
+                getInputType(),
+                getOutputType(),
+                new PythonAggregateFunctionInfo[] {
+                    new PythonAggregateFunctionInfo(
+                            PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE,
+                            new Integer[] {2},
+                            -1,
+                            false)
+                },
+                getGrouping(),
+                -1,
+                false,
+                false,
+                3,
+                windowAssigner,
+                window,
+                0,
+                new int[] {0, 1});
+    }
+
+    /** PassThroughPythonStreamGroupWindowAggregateOperator. */
+    public static class PassThroughPythonStreamGroupWindowAggregateOperator<K>

Review comment:
       It would be better if we extract this class to a independent java file. It is so large.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. */
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;
+
+    private int timerDataLength;
+
+    private int keyLength;
+
+    public PythonStreamGroupWindowAggregateOperator(
+            Configuration config,
+            RowType inputType,
+            RowType outputType,
+            PythonAggregateFunctionInfo[] aggregateFunctions,
+            DataViewUtils.DataViewSpec[][] dataViewSpecs,
+            int[] grouping,
+            int indexOfCountStar,
+            boolean generateUpdateBefore,
+            boolean countStarInserted,
+            int inputTimeFieldIndex,
+            WindowAssigner<W> windowAssigner,
+            LogicalWindow window,
+            long allowedLateness,
+            int[] namedProperties) {
+        super(
+                config,
+                inputType,
+                outputType,
+                aggregateFunctions,
+                dataViewSpecs,
+                grouping,
+                indexOfCountStar,
+                generateUpdateBefore);
+        this.countStarInserted = countStarInserted;
+        this.inputTimeFieldIndex = inputTimeFieldIndex;
+        this.windowAssigner = windowAssigner;
+        this.allowedLateness = allowedLateness;
+        this.namedProperties = namedProperties;
+        buildWindow(window);
+    }
+
+    @Override
+    public void open() throws Exception {
+        windowSerializer = windowAssigner.getWindowSerializer(new ExecutionConfig());
+        internalTimerService = getInternalTimerService("window-timers", windowSerializer, this);
+        if (isTimeWindow) {

Review comment:
       How about add some explanation about the structure of the timer data?

##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(

Review comment:
       It would be better if we extract some utility method to create these stream record in one line.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;

Review comment:
       If we do not plan to support early fire/late fire in the first version of the Python group window aggregate, the `allowedLateness` field is unnecessary/

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. */
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;
+
+    private int timerDataLength;
+
+    private int keyLength;

Review comment:
       ditto

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. */
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;

Review comment:
       This field can also be transient.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;

Review comment:
       Why not 3?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3f0036901cb06dd1f641d0e7243c6da486602b86 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861) 
   * 9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569215043



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;

Review comment:
       Yes. I will remove the allowedLateness. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo closed pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo closed pull request #14775:
URL: https://github.com/apache/flink/pull/14775


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569215043



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;

Review comment:
       Yes. I will remove the allowedLateness. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569216546



##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c3", "c8", 3L, 0L), initialTime + 5));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(false, "c3", "c8", 3L, 0L), initialTime + 6));
+        testHarness.processWatermark(Long.MAX_VALUE);
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c3 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c3 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredOnCheckpoint() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processWatermark(new Watermark(10000L));
+        // checkpoint trigger finishBundle
+        testHarness.prepareSnapshotPreBarrier(0L);
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(new Watermark(10000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark(20000L);
+
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredByCount() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 4);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+
+        testHarness.processWatermark(new Watermark(10000L));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(new Watermark(10000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark(20000L);
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredByTime() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
+        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processWatermark(new Watermark(20000L));
+        assertOutputEquals(
+                "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
+
+        testHarness.setProcessingTime(1000L);
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @Override
+    public LogicalType[] getOutputLogicalType() {
+        return new LogicalType[] {
+            DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()
+        };
+    }
+
+    @Override
+    public RowType getInputType() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("f1", new VarCharType()),
+                        new RowType.RowField("f2", new VarCharType()),
+                        new RowType.RowField("f3", new BigIntType()),
+                        new RowType.RowField("rowTime", new BigIntType())));
+    }
+
+    @Override
+    public RowType getOutputType() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("f1", new VarCharType()),
+                        new RowType.RowField("f2", new BigIntType()),
+                        new RowType.RowField("windowStart", new TimestampType(3)),
+                        new RowType.RowField("windowEnd", new TimestampType(3))));
+    }
+
+    @Override
+    OneInputStreamOperator getTestOperator(Configuration config) {
+        long size = 10000L;
+        long slide = 5000L;
+        SlidingWindowAssigner windowAssigner =
+                SlidingWindowAssigner.of(Duration.ofMillis(size), Duration.ofMillis(slide))
+                        .withEventTime();
+        PlannerWindowReference windowRef =
+                new PlannerWindowReference("w$", new Some<>(new TimestampType(3)));
+        LogicalWindow window =
+                new SlidingGroupWindow(
+                        windowRef,
+                        new FieldReferenceExpression(
+                                "rowtime",
+                                new AtomicDataType(
+                                        new TimestampType(true, TimestampKind.ROWTIME, 3)),
+                                0,
+                                3),
+                        intervalOfMillis(size),
+                        intervalOfMillis(slide));
+        return new PassThroughPythonStreamGroupWindowAggregateOperator(
+                config,
+                getInputType(),
+                getOutputType(),
+                new PythonAggregateFunctionInfo[] {
+                    new PythonAggregateFunctionInfo(
+                            PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE,
+                            new Integer[] {2},
+                            -1,
+                            false)
+                },
+                getGrouping(),
+                -1,
+                false,
+                false,
+                3,
+                windowAssigner,
+                window,
+                0,
+                new int[] {0, 1});
+    }
+
+    /** PassThroughPythonStreamGroupWindowAggregateOperator. */
+    public static class PassThroughPythonStreamGroupWindowAggregateOperator<K>

Review comment:
       Yes, it is really bad for reading




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569213485



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. */
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;
+
+    private int timerDataLength;
+
+    private int keyLength;
+
+    public PythonStreamGroupWindowAggregateOperator(
+            Configuration config,
+            RowType inputType,
+            RowType outputType,
+            PythonAggregateFunctionInfo[] aggregateFunctions,
+            DataViewUtils.DataViewSpec[][] dataViewSpecs,
+            int[] grouping,
+            int indexOfCountStar,
+            boolean generateUpdateBefore,
+            boolean countStarInserted,
+            int inputTimeFieldIndex,
+            WindowAssigner<W> windowAssigner,
+            LogicalWindow window,
+            long allowedLateness,
+            int[] namedProperties) {
+        super(
+                config,
+                inputType,
+                outputType,
+                aggregateFunctions,
+                dataViewSpecs,
+                grouping,
+                indexOfCountStar,
+                generateUpdateBefore);
+        this.countStarInserted = countStarInserted;
+        this.inputTimeFieldIndex = inputTimeFieldIndex;
+        this.windowAssigner = windowAssigner;
+        this.allowedLateness = allowedLateness;
+        this.namedProperties = namedProperties;
+        buildWindow(window);
+    }
+
+    @Override
+    public void open() throws Exception {
+        windowSerializer = windowAssigner.getWindowSerializer(new ExecutionConfig());
+        internalTimerService = getInternalTimerService("window-timers", windowSerializer, this);
+        if (isTimeWindow) {

Review comment:
       Yes. make sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819",
       "triggerID" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84decbc592802aec616132d918fb8cf01f721d54",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "84decbc592802aec616132d918fb8cf01f721d54",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2b22323c65b5932209ad2ff7757a3e0e4c8344ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13943",
       "triggerID" : "2b22323c65b5932209ad2ff7757a3e0e4c8344ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 84decbc592802aec616132d918fb8cf01f721d54 UNKNOWN
   * 2b22323c65b5932209ad2ff7757a3e0e4c8344ef Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13943) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768230921


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269 (Wed Jan 27 11:45:16 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-20964).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] amaliujia commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569195501



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamGroupAggregateOperator.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class for {@link PythonStreamGroupAggregateOperator} and {@link
+ * PythonStreamGroupTableAggregateOperator}.
+ */
+@Internal
+public abstract class AbstractPythonStreamGroupAggregateOperator
+        extends AbstractPythonStreamAggregateOperator
+        implements Triggerable<RowData, VoidNamespace>, CleanupState {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The minimum time in milliseconds until state which was not updated will be retained. */
+    private final long minRetentionTime;
+
+    /** The maximum time in milliseconds until state which was not updated will be retained. */
+    private final long maxRetentionTime;
+
+    /**
+     * Indicates whether state cleaning is enabled. Can be calculated from the `minRetentionTime`.
+     */
+    private final boolean stateCleaningEnabled;
+
+    private transient TimerService timerService;
+
+    // holds the latest registered cleanup timer
+    private transient ValueState<Long> cleanupTimeState;
+
+    public AbstractPythonStreamGroupAggregateOperator(
+            Configuration config,
+            RowType inputType,
+            RowType outputType,
+            PythonAggregateFunctionInfo[] aggregateFunctions,
+            DataViewUtils.DataViewSpec[][] dataViewSpecs,
+            int[] grouping,
+            int indexOfCountStar,
+            boolean generateUpdateBefore,
+            long minRetentionTime,
+            long maxRetentionTime) {
+        super(
+                config,

Review comment:
       nit: indents are a bit wired. Seems `config` can be after `super(`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3f0036901cb06dd1f641d0e7243c6da486602b86 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861) 
   * 9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f2494304b2c16c02b89ee96864ff1e61f446f203 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] WeiZhong94 commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
WeiZhong94 commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r568352971



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
##########
@@ -76,8 +61,7 @@
  */

Review comment:
       The java doc needs to be changed.

##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -147,6 +147,30 @@ message UserDefinedAggregateFunction {
   bool takes_row_as_input = 6;
 }
 
+message GroupWindow {
+  enum WindowType {
+    TUMBLING_GROUP_WINDOW = 0;
+    SLIDING_GROUP_WINDOW = 1;
+    SESSION_GROUP_WINDOW = 2;
+  }
+
+  WindowType window_type = 1;
+
+  bool is_time_window = 2;
+
+  int64 window_slide = 3;
+
+  int64 window_size = 4;
+

Review comment:
       Currently the gap of the session window stores in the `window_size` field. I think we can add a new `window_gap` field to make it easier to understand.

##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c3", "c8", 3L, 0L), initialTime + 5));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(false, "c3", "c8", 3L, 0L), initialTime + 6));
+        testHarness.processWatermark(Long.MAX_VALUE);
+        testHarness.close();
+
+        expectedOutput.add(

Review comment:
       It would be better if we extract some utility method to create these stream record in one line.

##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c3", "c8", 3L, 0L), initialTime + 5));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(false, "c3", "c8", 3L, 0L), initialTime + 6));
+        testHarness.processWatermark(Long.MAX_VALUE);
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c3 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c3 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredOnCheckpoint() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processWatermark(new Watermark(10000L));
+        // checkpoint trigger finishBundle
+        testHarness.prepareSnapshotPreBarrier(0L);
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(new Watermark(10000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark(20000L);
+
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredByCount() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 4);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+
+        testHarness.processWatermark(new Watermark(10000L));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(new Watermark(10000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark(20000L);
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredByTime() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
+        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
+        testHarness.processWatermark(new Watermark(20000L));
+        assertOutputEquals(
+                "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
+
+        testHarness.setProcessingTime(1000L);
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @Override
+    public LogicalType[] getOutputLogicalType() {
+        return new LogicalType[] {
+            DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()
+        };
+    }
+
+    @Override
+    public RowType getInputType() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("f1", new VarCharType()),
+                        new RowType.RowField("f2", new VarCharType()),
+                        new RowType.RowField("f3", new BigIntType()),
+                        new RowType.RowField("rowTime", new BigIntType())));
+    }
+
+    @Override
+    public RowType getOutputType() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("f1", new VarCharType()),
+                        new RowType.RowField("f2", new BigIntType()),
+                        new RowType.RowField("windowStart", new TimestampType(3)),
+                        new RowType.RowField("windowEnd", new TimestampType(3))));
+    }
+
+    @Override
+    OneInputStreamOperator getTestOperator(Configuration config) {
+        long size = 10000L;
+        long slide = 5000L;
+        SlidingWindowAssigner windowAssigner =
+                SlidingWindowAssigner.of(Duration.ofMillis(size), Duration.ofMillis(slide))
+                        .withEventTime();
+        PlannerWindowReference windowRef =
+                new PlannerWindowReference("w$", new Some<>(new TimestampType(3)));
+        LogicalWindow window =
+                new SlidingGroupWindow(
+                        windowRef,
+                        new FieldReferenceExpression(
+                                "rowtime",
+                                new AtomicDataType(
+                                        new TimestampType(true, TimestampKind.ROWTIME, 3)),
+                                0,
+                                3),
+                        intervalOfMillis(size),
+                        intervalOfMillis(slide));
+        return new PassThroughPythonStreamGroupWindowAggregateOperator(
+                config,
+                getInputType(),
+                getOutputType(),
+                new PythonAggregateFunctionInfo[] {
+                    new PythonAggregateFunctionInfo(
+                            PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE,
+                            new Integer[] {2},
+                            -1,
+                            false)
+                },
+                getGrouping(),
+                -1,
+                false,
+                false,
+                3,
+                windowAssigner,
+                window,
+                0,
+                new int[] {0, 1});
+    }
+
+    /** PassThroughPythonStreamGroupWindowAggregateOperator. */
+    public static class PassThroughPythonStreamGroupWindowAggregateOperator<K>

Review comment:
       It would be better if we extract this class to a independent java file. It is so large.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. */
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;
+
+    private int timerDataLength;
+
+    private int keyLength;
+
+    public PythonStreamGroupWindowAggregateOperator(
+            Configuration config,
+            RowType inputType,
+            RowType outputType,
+            PythonAggregateFunctionInfo[] aggregateFunctions,
+            DataViewUtils.DataViewSpec[][] dataViewSpecs,
+            int[] grouping,
+            int indexOfCountStar,
+            boolean generateUpdateBefore,
+            boolean countStarInserted,
+            int inputTimeFieldIndex,
+            WindowAssigner<W> windowAssigner,
+            LogicalWindow window,
+            long allowedLateness,
+            int[] namedProperties) {
+        super(
+                config,
+                inputType,
+                outputType,
+                aggregateFunctions,
+                dataViewSpecs,
+                grouping,
+                indexOfCountStar,
+                generateUpdateBefore);
+        this.countStarInserted = countStarInserted;
+        this.inputTimeFieldIndex = inputTimeFieldIndex;
+        this.windowAssigner = windowAssigner;
+        this.allowedLateness = allowedLateness;
+        this.namedProperties = namedProperties;
+        buildWindow(window);
+    }
+
+    @Override
+    public void open() throws Exception {
+        windowSerializer = windowAssigner.getWindowSerializer(new ExecutionConfig());
+        internalTimerService = getInternalTimerService("window-timers", windowSerializer, this);
+        if (isTimeWindow) {

Review comment:
       How about add some explanation about the structure of the timer data?

##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(

Review comment:
       It would be better if we extract some utility method to create these stream record in one line.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;

Review comment:
       If we do not plan to support early fire/late fire in the first version of the Python group window aggregate, the `allowedLateness` field is unnecessary/

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. */
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;
+
+    private int timerDataLength;
+
+    private int keyLength;

Review comment:
       ditto

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to lateness.
+     *   <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. */
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;

Review comment:
       This field can also be transient.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;

Review comment:
       Why not 3?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569215154



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. */
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;

Review comment:
       Good Catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819",
       "triggerID" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84decbc592802aec616132d918fb8cf01f721d54",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "84decbc592802aec616132d918fb8cf01f721d54",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2b22323c65b5932209ad2ff7757a3e0e4c8344ef",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13943",
       "triggerID" : "2b22323c65b5932209ad2ff7757a3e0e4c8344ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 751a2a17ffec64164dc4092ab32c6348bbe18774 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819) 
   * 84decbc592802aec616132d918fb8cf01f721d54 UNKNOWN
   * 2b22323c65b5932209ad2ff7757a3e0e4c8344ef Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13943) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819",
       "triggerID" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84decbc592802aec616132d918fb8cf01f721d54",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "84decbc592802aec616132d918fb8cf01f721d54",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2b22323c65b5932209ad2ff7757a3e0e4c8344ef",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2b22323c65b5932209ad2ff7757a3e0e4c8344ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 751a2a17ffec64164dc4092ab32c6348bbe18774 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819) 
   * 84decbc592802aec616132d918fb8cf01f721d54 UNKNOWN
   * 2b22323c65b5932209ad2ff7757a3e0e4c8344ef UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-786441629


   @WeiZhong94 Thanks a lot for the review. I have addressed the comment at the latest commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-772491559


   @WeiZhong94 Thanks a lot for the review. I have addressed the comments at the latest commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554) 
   * 3f0036901cb06dd1f641d0e7243c6da486602b86 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f2494304b2c16c02b89ee96864ff1e61f446f203 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697) 
   * d8f3c75d291d0050ee56f82aa019418718bf87a5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] amaliujia commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569194581



##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -147,6 +147,30 @@ message UserDefinedAggregateFunction {
   bool takes_row_as_input = 6;
 }
 
+message GroupWindow {
+  enum WindowType {
+    TUMBLING_GROUP_WINDOW = 0;
+    SLIDING_GROUP_WINDOW = 1;
+    SESSION_GROUP_WINDOW = 2;
+  }
+
+  WindowType window_type = 1;
+
+  bool is_time_window = 2;
+
+  int64 window_slide = 3;
+
+  int64 window_size = 4;
+

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r569218822



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamGroupAggregateOperator.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class for {@link PythonStreamGroupAggregateOperator} and {@link
+ * PythonStreamGroupTableAggregateOperator}.
+ */
+@Internal
+public abstract class AbstractPythonStreamGroupAggregateOperator
+        extends AbstractPythonStreamAggregateOperator
+        implements Triggerable<RowData, VoidNamespace>, CleanupState {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The minimum time in milliseconds until state which was not updated will be retained. */
+    private final long minRetentionTime;
+
+    /** The maximum time in milliseconds until state which was not updated will be retained. */
+    private final long maxRetentionTime;
+
+    /**
+     * Indicates whether state cleaning is enabled. Can be calculated from the `minRetentionTime`.
+     */
+    private final boolean stateCleaningEnabled;
+
+    private transient TimerService timerService;
+
+    // holds the latest registered cleanup timer
+    private transient ValueState<Long> cleanupTimeState;
+
+    public AbstractPythonStreamGroupAggregateOperator(
+            Configuration config,
+            RowType inputType,
+            RowType outputType,
+            PythonAggregateFunctionInfo[] aggregateFunctions,
+            DataViewUtils.DataViewSpec[][] dataViewSpecs,
+            int[] grouping,
+            int indexOfCountStar,
+            boolean generateUpdateBefore,
+            long minRetentionTime,
+            long maxRetentionTime) {
+        super(
+                config,

Review comment:
       This is caused by the format configured by the google-java-format plugin used by flink
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819",
       "triggerID" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 751a2a17ffec64164dc4092ab32c6348bbe18774 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819",
       "triggerID" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84decbc592802aec616132d918fb8cf01f721d54",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "84decbc592802aec616132d918fb8cf01f721d54",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 751a2a17ffec64164dc4092ab32c6348bbe18774 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819) 
   * 84decbc592802aec616132d918fb8cf01f721d54 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d8f3c75d291d0050ee56f82aa019418718bf87a5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732) 
   * 751a2a17ffec64164dc4092ab32c6348bbe18774 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14775: [FLINK-20964][python] Introduce PythonStreamGroupWindowAggregateOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14775:
URL: https://github.com/apache/flink/pull/14775#issuecomment-768234660


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12554",
       "triggerID" : "7df0cb198efffa5c5dd7d6cbe98e4db63f8e4269",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12861",
       "triggerID" : "3f0036901cb06dd1f641d0e7243c6da486602b86",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12896",
       "triggerID" : "9cdc750c7db649b3ce4571c0d80fcdd3eb2acfdf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13697",
       "triggerID" : "f2494304b2c16c02b89ee96864ff1e61f446f203",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732",
       "triggerID" : "d8f3c75d291d0050ee56f82aa019418718bf87a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819",
       "triggerID" : "751a2a17ffec64164dc4092ab32c6348bbe18774",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d8f3c75d291d0050ee56f82aa019418718bf87a5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13732) 
   * 751a2a17ffec64164dc4092ab32c6348bbe18774 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13819) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org