You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/26 09:46:53 UTC
[flink] branch master updated: [FLINK-29349][table-runtime] Use state ttl instead of timer to clean up state in proctime unbounded over aggregate
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0f8909cfb51 [FLINK-29349][table-runtime] Use state ttl instead of timer to clean up state in proctime unbounded over aggregate
0f8909cfb51 is described below
commit 0f8909cfb5179cb9e87de2615404bb6dd12e359f
Author: lincoln lee <li...@gmail.com>
AuthorDate: Mon Sep 26 17:46:19 2022 +0800
[FLINK-29349][table-runtime] Use state ttl instead of timer to clean up state in proctime unbounded over aggregate
This closes #20898
---
.../nodes/exec/stream/StreamExecOverAggregate.java | 4 +-
.../runtime/harness/OverAggregateHarnessTest.scala | 7 +-
.../over/ProcTimeUnboundedPrecedingFunction.java | 34 ++----
.../ProcTimeUnboundedPrecedingFunctionTest.java | 117 +++++++++++++++++++++
4 files changed, 131 insertions(+), 31 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
index 0c8177269eb..17c74132219 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.runtime.operators.over.RowTimeRowsBoundedPreceding
import org.apache.flink.table.runtime.operators.over.RowTimeRowsUnboundedPrecedingFunction;
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -335,8 +336,7 @@ public class StreamExecOverAggregate extends ExecNodeBase<RowData>
}
} else {
return new ProcTimeUnboundedPrecedingFunction<>(
- config.getStateRetentionTime(),
- TableConfigUtils.getMaxIdleStateRetentionTime(config),
+ StateConfigUtil.createTtlConfig(config.getStateRetentionTime()),
genAggsHandler,
flattenAccTypes);
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
index c165e30284e..cedfd412bd2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
@@ -17,7 +17,6 @@
*/
package org.apache.flink.table.planner.runtime.harness
-import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
@@ -291,8 +290,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
testHarness.open()
- // register cleanup timer with 4003
- testHarness.setProcessingTime(1003)
+ testHarness.setStateTtlProcessingTime(1003)
testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 1L: JLong, null)))
testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "bbb", 10L: JLong, null)))
@@ -306,8 +304,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 7L: JLong, null)))
testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 8L: JLong, null)))
- // trigger cleanup timer and register cleanup timer with 8003
- testHarness.setProcessingTime(5003)
+ testHarness.setStateTtlProcessingTime(5003)
testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 9L: JLong, null)))
testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "aaa", 10L: JLong, null)))
testHarness.processElement(new StreamRecord(binaryrow(0L: JLong, "bbb", 40L: JLong, null)))
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunction.java
index 6034a4f3434..c8e3de37ff8 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunction.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.runtime.operators.over;
+import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
@@ -25,7 +26,6 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
-import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -40,9 +40,10 @@ import org.apache.flink.util.Collector;
* UNBOUNDED preceding AND CURRENT ROW) FROM T.
*/
public class ProcTimeUnboundedPrecedingFunction<K>
- extends KeyedProcessFunctionWithCleanupState<K, RowData, RowData> {
- private static final long serialVersionUID = 1L;
+ extends KeyedProcessFunction<K, RowData, RowData> {
+ private static final long serialVersionUID = 2L;
+ private final StateTtlConfig ttlConfig;
private final GeneratedAggsHandleFunction genAggsHandler;
private final LogicalType[] accTypes;
@@ -51,11 +52,10 @@ public class ProcTimeUnboundedPrecedingFunction<K>
private transient JoinedRowData output;
public ProcTimeUnboundedPrecedingFunction(
- long minRetentionTime,
- long maxRetentionTime,
+ StateTtlConfig ttlConfig,
GeneratedAggsHandleFunction genAggsHandler,
LogicalType[] accTypes) {
- super(minRetentionTime, maxRetentionTime);
+ this.ttlConfig = ttlConfig;
this.genAggsHandler = genAggsHandler;
this.accTypes = accTypes;
}
@@ -69,10 +69,11 @@ public class ProcTimeUnboundedPrecedingFunction<K>
InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(accTypes);
ValueStateDescriptor<RowData> stateDescriptor =
- new ValueStateDescriptor<RowData>("accState", accTypeInfo);
+ new ValueStateDescriptor<>("accState", accTypeInfo);
+ if (ttlConfig.isEnabled()) {
+ stateDescriptor.enableTimeToLive(ttlConfig);
+ }
accState = getRuntimeContext().getState(stateDescriptor);
-
- initCleanupTimeState("ProcTimeUnboundedOverCleanupTime");
}
@Override
@@ -81,9 +82,6 @@ public class ProcTimeUnboundedPrecedingFunction<K>
KeyedProcessFunction<K, RowData, RowData>.Context ctx,
Collector<RowData> out)
throws Exception {
- // register state-cleanup timer
- registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
-
RowData accumulators = accState.value();
if (null == accumulators) {
accumulators = function.createAccumulators();
@@ -104,18 +102,6 @@ public class ProcTimeUnboundedPrecedingFunction<K>
out.collect(output);
}
- @Override
- public void onTimer(
- long timestamp,
- KeyedProcessFunction<K, RowData, RowData>.OnTimerContext ctx,
- Collector<RowData> out)
- throws Exception {
- if (stateCleaningEnabled) {
- cleanupState(accState);
- function.cleanup();
- }
- }
-
@Override
public void close() throws Exception {
if (null != function) {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunctionTest.java
new file mode 100644
index 00000000000..9327d319eac
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeUnboundedPrecedingFunctionTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.over;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.AggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ProcTimeRangeBoundedPrecedingFunction}. */
+public class ProcTimeUnboundedPrecedingFunctionTest {
+ StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(2_000);
+
+ private static GeneratedAggsHandleFunction aggsHandleFunction =
+ new GeneratedAggsHandleFunction("Function", "", new Object[0]) {
+ @Override
+ public AggsHandleFunction newInstance(ClassLoader classLoader) {
+ return new SumAggsHandleFunction(1);
+ }
+ };
+
+ private LogicalType[] inputFieldTypes =
+ new LogicalType[] {
+ VarCharType.STRING_TYPE, new BigIntType(),
+ };
+
+ private LogicalType[] outputFieldTypes =
+ new LogicalType[] {
+ VarCharType.STRING_TYPE, new BigIntType(), new BigIntType(),
+ };
+ private LogicalType[] accTypes = new LogicalType[] {new BigIntType()};
+
+ private RowDataKeySelector keySelector =
+ HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputFieldTypes);
+ private TypeInformation<RowData> keyType = keySelector.getProducedType();
+
+ private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputFieldTypes);
+
+ @Test
+ public void testStateTtl() throws Exception {
+ ProcTimeUnboundedPrecedingFunction<RowData> function =
+ new ProcTimeUnboundedPrecedingFunction<>(ttlConfig, aggsHandleFunction, accTypes);
+ KeyedProcessOperator<RowData, RowData, RowData> operator =
+ new KeyedProcessOperator<>(function);
+
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(operator);
+
+ testHarness.open();
+
+ AbstractKeyedStateBackend stateBackend =
+ (AbstractKeyedStateBackend) operator.getKeyedStateBackend();
+
+ assertThat(stateBackend.numKeyValueStateEntries())
+ .as("Initial state is not empty")
+ .isEqualTo(0);
+
+ // put some records
+ testHarness.setStateTtlProcessingTime(1);
+ testHarness.processElement(insertRecord("key", 1L));
+ testHarness.processElement(insertRecord("key", 1L));
+ testHarness.processElement(insertRecord("key", 1L));
+
+ testHarness.setStateTtlProcessingTime(2001);
+ // at this moment the backend state should have been expired, will calculate from new start
+ testHarness.processElement(insertRecord("key", 1L));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("key", 1L, 1L));
+ expectedOutput.add(insertRecord("key", 1L, 2L));
+ expectedOutput.add(insertRecord("key", 1L, 3L));
+ // the result of `sum` restart
+ expectedOutput.add(insertRecord("key", 1L, 1L));
+ assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ }
+
+ private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
+ KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType);
+ }
+}