You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/26 09:46:53 UTC

[flink] branch master updated: [FLINK-29349][table-runtime] Use state ttl instead of timer to clean up state in proctime unbounded over aggregate

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f8909cfb51 [FLINK-29349][table-runtime] Use state ttl instead of timer to clean up state in proctime unbounded over aggregate
0f8909cfb51 is described below

commit 0f8909cfb5179cb9e87de2615404bb6dd12e359f
Author: lincoln lee <li...@gmail.com>
AuthorDate: Mon Sep 26 17:46:19 2022 +0800

    [FLINK-29349][table-runtime] Use state ttl instead of timer to clean up state in proctime unbounded over aggregate
    
    This closes #20898
---
 .../nodes/exec/stream/StreamExecOverAggregate.java |   4 +-
 .../runtime/harness/OverAggregateHarnessTest.scala |   7 +-
 .../over/ProcTimeUnboundedPrecedingFunction.java   |  34 ++----
 .../ProcTimeUnboundedPrecedingFunctionTest.java    | 117 +++++++++++++++++++++
 4 files changed, 131 insertions(+), 31 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
index 0c8177269eb..17c74132219 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.runtime.operators.over.RowTimeRowsBoundedPreceding
 import org.apache.flink.table.runtime.operators.over.RowTimeRowsUnboundedPrecedingFunction;
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -335,8 +336,7 @@ public class StreamExecOverAggregate extends ExecNodeBase<RowData>
             }
         } else {
             return new ProcTimeUnboundedPrecedingFunction<>(
-                    config.getStateRetentionTime(),
-                    TableConfigUtils.getMaxIdleStateRetentionTime(config),
+                    StateConfigUtil.createTtlConfig(config.getStateRetentionTime()),
                     genAggsHandler,
                     flattenAccTypes);
         }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
index c165e30284e..cedfd412bd2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.planner.runtime.harness
 
-import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
@@ -291,8 +290,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
 
     testHarness.open()
 
-    // register cleanup timer with 4003
-    testHarness.setProcessingTime(1003)
+    testHarness.setStateTtlProcessingTime(1003)
 
     testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 1L: JLong, null)))
     testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "bbb", 10L: JLong, null)))
@@ -306,8 +304,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
     testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 7L: JLong, null)))
     testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 8L: JLong, null)))
 
-    // trigger cleanup timer and register cleanup timer with 8003
-    testHarness.setProcessingTime(5003)
+    testHarness.setStateTtlProcessingTime(5003)
     testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 9L: JLong, null)))
     testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 10L: JLong, null)))
     testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "bbb", 40L: JLong, null)))
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunction.java
index 6034a4f3434..c8e3de37ff8 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunction.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.runtime.operators.over;
 
+import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
@@ -25,7 +26,6 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
 import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
-import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
 import org.apache.flink.table.runtime.generated.AggsHandleFunction;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -40,9 +40,10 @@ import org.apache.flink.util.Collector;
  * UNBOUNDED preceding AND CURRENT ROW) FROM T.
  */
 public class ProcTimeUnboundedPrecedingFunction<K>
-        extends KeyedProcessFunctionWithCleanupState<K, RowData, RowData> {
-    private static final long serialVersionUID = 1L;
+        extends KeyedProcessFunction<K, RowData, RowData> {
+    private static final long serialVersionUID = 2L;
 
+    private final StateTtlConfig ttlConfig;
     private final GeneratedAggsHandleFunction genAggsHandler;
     private final LogicalType[] accTypes;
 
@@ -51,11 +52,10 @@ public class ProcTimeUnboundedPrecedingFunction<K>
     private transient JoinedRowData output;
 
     public ProcTimeUnboundedPrecedingFunction(
-            long minRetentionTime,
-            long maxRetentionTime,
+            StateTtlConfig ttlConfig,
             GeneratedAggsHandleFunction genAggsHandler,
             LogicalType[] accTypes) {
-        super(minRetentionTime, maxRetentionTime);
+        this.ttlConfig = ttlConfig;
         this.genAggsHandler = genAggsHandler;
         this.accTypes = accTypes;
     }
@@ -69,10 +69,11 @@ public class ProcTimeUnboundedPrecedingFunction<K>
 
         InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(accTypes);
         ValueStateDescriptor<RowData> stateDescriptor =
-                new ValueStateDescriptor<RowData>("accState", accTypeInfo);
+                new ValueStateDescriptor<>("accState", accTypeInfo);
+        if (ttlConfig.isEnabled()) {
+            stateDescriptor.enableTimeToLive(ttlConfig);
+        }
         accState = getRuntimeContext().getState(stateDescriptor);
-
-        initCleanupTimeState("ProcTimeUnboundedOverCleanupTime");
     }
 
     @Override
@@ -81,9 +82,6 @@ public class ProcTimeUnboundedPrecedingFunction<K>
             KeyedProcessFunction<K, RowData, RowData>.Context ctx,
             Collector<RowData> out)
             throws Exception {
-        // register state-cleanup timer
-        registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
-
         RowData accumulators = accState.value();
         if (null == accumulators) {
             accumulators = function.createAccumulators();
@@ -104,18 +102,6 @@ public class ProcTimeUnboundedPrecedingFunction<K>
         out.collect(output);
     }
 
-    @Override
-    public void onTimer(
-            long timestamp,
-            KeyedProcessFunction<K, RowData, RowData>.OnTimerContext ctx,
-            Collector<RowData> out)
-            throws Exception {
-        if (stateCleaningEnabled) {
-            cleanupState(accState);
-            function.cleanup();
-        }
-    }
-
     @Override
     public void close() throws Exception {
         if (null != function) {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunctionTest.java
new file mode 100644
index 00000000000..9327d319eac
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunctionTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.over;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.AggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ProcTimeRangeBoundedPrecedingFunction}. */
+public class ProcTimeUnboundedPrecedingFunctionTest {
+    StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(2_000);
+
+    private static GeneratedAggsHandleFunction aggsHandleFunction =
+            new GeneratedAggsHandleFunction("Function", "", new Object[0]) {
+                @Override
+                public AggsHandleFunction newInstance(ClassLoader classLoader) {
+                    return new SumAggsHandleFunction(1);
+                }
+            };
+
+    private LogicalType[] inputFieldTypes =
+            new LogicalType[] {
+                VarCharType.STRING_TYPE, new BigIntType(),
+            };
+
+    private LogicalType[] outputFieldTypes =
+            new LogicalType[] {
+                VarCharType.STRING_TYPE, new BigIntType(), new BigIntType(),
+            };
+    private LogicalType[] accTypes = new LogicalType[] {new BigIntType()};
+
+    private RowDataKeySelector keySelector =
+            HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputFieldTypes);
+    private TypeInformation<RowData> keyType = keySelector.getProducedType();
+
+    private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputFieldTypes);
+
+    @Test
+    public void testStateTtl() throws Exception {
+        ProcTimeUnboundedPrecedingFunction<RowData> function =
+                new ProcTimeUnboundedPrecedingFunction<>(ttlConfig, aggsHandleFunction, accTypes);
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(function);
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+
+        testHarness.open();
+
+        AbstractKeyedStateBackend stateBackend =
+                (AbstractKeyedStateBackend) operator.getKeyedStateBackend();
+
+        assertThat(stateBackend.numKeyValueStateEntries())
+                .as("Initial state is not empty")
+                .isEqualTo(0);
+
+        // put some records
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement(insertRecord("key", 1L));
+        testHarness.processElement(insertRecord("key", 1L));
+        testHarness.processElement(insertRecord("key", 1L));
+
+        testHarness.setStateTtlProcessingTime(2001);
+        // at this moment the backend state should have been expired, will calculate from new start
+        testHarness.processElement(insertRecord("key", 1L));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("key", 1L, 1L));
+        expectedOutput.add(insertRecord("key", 1L, 2L));
+        expectedOutput.add(insertRecord("key", 1L, 3L));
+        // the result of `sum` restart
+        expectedOutput.add(insertRecord("key", 1L, 1L));
+        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+    }
+
+    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
+            KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType);
+    }
+}