You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/20 01:55:47 UTC

[flink] branch release-1.11 updated: [FLINK-18119][table-blink] Expire state automatically and accurately for time range bounded over aggregation

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 2cfaff8  [FLINK-18119][table-blink] Expire state automatically and accurately for time range bounded over aggregation
2cfaff8 is described below

commit 2cfaff87427e34e93160c56bb85b1fd8e68fb964
Author: Hyeonseop Lee <hy...@ls-al.me>
AuthorDate: Tue Jun 16 19:36:23 2020 +0900

    [FLINK-18119][table-blink] Expire state automatically and accurately for time range bounded over aggregation
    
    This changes the state expiration behavior for RowTimeRangeBoundedPrecedingFunction and ProcTimeRangeBoundedPrecedingFunction. In the previous version, we use TableConfig.setIdleStateRetentionTime to cleanup state when it is idle for some time. However, a bounded over aggregation is just like a processing/event-time interval join or window aggregation, the state size should be bounded and stable. The operator should expire state automatically based on watermark and processing time wit [...]
    
    This closes #12680
---
 .../physical/stream/StreamExecOverAggregate.scala  |  4 -
 .../runtime/harness/OverWindowHarnessTest.scala    | 56 +------------
 .../ProcTimeRangeBoundedPrecedingFunction.java     | 45 +++++++---
 .../over/RowTimeRangeBoundedPrecedingFunction.java | 79 ++++++++----------
 .../ProcTimeRangeBoundedPrecedingFunctionTest.java | 96 ++++++++++++++++++++++
 .../RowTimeRangeBoundedPrecedingFunctionTest.java  | 96 ++++++++++++++++++++++
 6 files changed, 261 insertions(+), 115 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
index 80fecb6..e76ef41 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
@@ -422,8 +422,6 @@ class StreamExecOverAggregate(
           rowTimeIdx.get)
       } else {
         new RowTimeRangeBoundedPrecedingFunction(
-          tableConfig.getMinIdleStateRetentionTime,
-          tableConfig.getMaxIdleStateRetentionTime,
           genAggsHandler,
           flattenAccTypes,
           fieldTypes,
@@ -441,8 +439,6 @@ class StreamExecOverAggregate(
           precedingOffset)
       } else {
         new ProcTimeRangeBoundedPrecedingFunction(
-          tableConfig.getMinIdleStateRetentionTime,
-          tableConfig.getMaxIdleStateRetentionTime,
           genAggsHandler,
           flattenAccTypes,
           fieldTypes,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverWindowHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverWindowHarnessTest.scala
index 54c74dd..e05b1d2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverWindowHarnessTest.scala
@@ -177,13 +177,11 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
       """.stripMargin
     val t1 = tEnv.sqlQuery(sql)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4))
     val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
     val assertor = new RowDataHarnessAssertor(
       Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG, Types.LONG))
     testHarness.open()
 
-    // register cleanup timer with 3003
     testHarness.setProcessingTime(3)
     testHarness.processElement(new StreamRecord(
       binaryrow(0L: JLong, "aaa", 1L: JLong, null)))
@@ -194,7 +192,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
     testHarness.processElement(new StreamRecord(
       binaryrow(0L: JLong, "aaa", 2L: JLong, null)))
 
-    // trigger cleanup timer and register cleanup timer with 6003
     testHarness.setProcessingTime(3003)
     testHarness.processElement(new StreamRecord(
       binaryrow(0L: JLong, "aaa", 3L: JLong, null)))
@@ -205,7 +202,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
     testHarness.processElement(new StreamRecord(
       binaryrow(0L: JLong, "aaa", 4L: JLong, null)))
 
-    // register cleanup timer with 9002
     testHarness.setProcessingTime(6002)
 
     testHarness.setProcessingTime(7002)
@@ -216,7 +212,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
     testHarness.processElement(new StreamRecord(
       binaryrow(0L: JLong, "bbb", 30L: JLong, null)))
 
-    // register cleanup timer with 14002
     testHarness.setProcessingTime(11002)
     testHarness.processElement(new StreamRecord(
       binaryrow(0L: JLong, "aaa", 7L: JLong, null)))
@@ -258,7 +253,7 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
     expectedOutput.add(new StreamRecord(
       row(0L: JLong, "bbb", 30L: JLong, null, 20L: JLong, 30L: JLong)))
     expectedOutput.add(new StreamRecord(
-      row(0L: JLong, "aaa", 7L: JLong, null, 7L: JLong, 7L: JLong)))
+      row(0L: JLong, "aaa", 7L: JLong, null, 5L: JLong, 7L: JLong)))
     expectedOutput.add(new StreamRecord(
       row(0L: JLong, "aaa", 8L: JLong, null, 7L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
@@ -270,21 +265,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
 
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
 
-    // test for clean-up timer NPE
-    testHarness.setProcessingTime(20000)
-
-    // timer registered for 23000
-    testHarness.processElement(new StreamRecord(
-      binaryrow(0L: JLong, "ccc", 10L: JLong, null)))
-
-    // update clean-up timer to 25500. Previous timer should not clean up
-    testHarness.setProcessingTime(22500)
-    testHarness.processElement(new StreamRecord(
-      binaryrow(0L: JLong, "ccc", 10L: JLong, null)))
-
-    // 23000 clean-up timer should fire but not fail with an NPE
-    testHarness.setProcessingTime(23001)
-
     testHarness.close()
   }
 
@@ -409,7 +389,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
       """.stripMargin
     val t1 = tEnv.sqlQuery(sql)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
     val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
     val assertor = new RowDataHarnessAssertor(
       Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG))
@@ -466,32 +445,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
 
     testHarness.processWatermark(19000)
 
-    // test cleanup
-    testHarness.setProcessingTime(1000)
-    testHarness.processWatermark(20000)
-
-    // check that state is removed after max retention time
-    testHarness.processElement(new StreamRecord(
-      binaryrow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 3000
-    testHarness.setProcessingTime(2500)
-    testHarness.processElement(new StreamRecord(
-      binaryrow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 4500
-    testHarness.processWatermark(20010) // compute output
-
-    testHarness.setProcessingTime(4499)
-    testHarness.setProcessingTime(4500)
-
-    // check that state is only removed if all data was processed
-    testHarness.processElement(new StreamRecord(
-      binaryrow(20011L: JLong, "ccc", 3L: JLong))) // clean-up 6500
-
-    testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
-
-    testHarness.processWatermark(20020) // schedule emission
-
-    testHarness.setProcessingTime(8499) // clean-up
-    testHarness.setProcessingTime(8500) // clean-up
-
     val result = dropWatermarks(testHarness.getOutput.toArray)
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -526,13 +479,6 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
     expectedOutput.add(new StreamRecord(
       row(12001L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
 
-    expectedOutput.add(new StreamRecord(
-      row(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
-    expectedOutput.add(new StreamRecord(
-      row(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
-    expectedOutput.add(new StreamRecord(
-      row(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
-
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
     testHarness.close()
   }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
index b1d43a1..de4deff 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -30,7 +31,6 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.table.data.JoinedRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
-import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
 import org.apache.flink.table.runtime.generated.AggsHandleFunction;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
@@ -57,7 +57,7 @@ import java.util.List;
  * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
  * FROM T.
  */
-public class ProcTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, RowData, RowData> {
+public class ProcTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunction<K, RowData, RowData> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(ProcTimeRangeBoundedPrecedingFunction.class);
@@ -70,17 +70,17 @@ public class ProcTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFuncti
 	private transient ValueState<RowData> accState;
 	private transient MapState<Long, List<RowData>> inputState;
 
+	// the state which keeps the safe timestamp to cleanup states
+	private transient ValueState<Long> cleanupTsState;
+
 	private transient AggsHandleFunction function;
 	private transient JoinedRowData output;
 
 	public ProcTimeRangeBoundedPrecedingFunction(
-			long minRetentionTime,
-			long maxRetentionTime,
 			GeneratedAggsHandleFunction genAggsHandler,
 			LogicalType[] accTypes,
 			LogicalType[] inputFieldTypes,
 			long precedingTimeBoundary) {
-		super(minRetentionTime, maxRetentionTime);
 		this.genAggsHandler = genAggsHandler;
 		this.accTypes = accTypes;
 		this.inputFieldTypes = inputFieldTypes;
@@ -107,7 +107,11 @@ public class ProcTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFuncti
 			new ValueStateDescriptor<RowData>("accState", accTypeInfo);
 		accState = getRuntimeContext().getState(stateDescriptor);
 
-		initCleanupTimeState("ProcTimeBoundedRangeOverCleanupTime");
+		ValueStateDescriptor<Long> cleanupTsStateDescriptor = new ValueStateDescriptor<>(
+			"cleanupTsState",
+			Types.LONG
+		);
+		this.cleanupTsState = getRuntimeContext().getState(cleanupTsStateDescriptor);
 	}
 
 	@Override
@@ -116,9 +120,6 @@ public class ProcTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFuncti
 			KeyedProcessFunction<K, RowData, RowData>.Context ctx,
 			Collector<RowData> out) throws Exception {
 		long currentTime = ctx.timerService().currentProcessingTime();
-		// register state-cleanup timer
-		registerProcessingCleanupTimer(ctx, currentTime);
-
 		// buffer the event incoming event
 
 		// add current element to the window list of elements with corresponding timestamp
@@ -128,19 +129,39 @@ public class ProcTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFuncti
 			rowList = new ArrayList<RowData>();
 			// register timer to process event once the current millisecond passed
 			ctx.timerService().registerProcessingTimeTimer(currentTime + 1);
+			registerCleanupTimer(ctx, currentTime);
 		}
 		rowList.add(input);
 		inputState.put(currentTime, rowList);
 	}
 
+	private void registerCleanupTimer(
+			KeyedProcessFunction<K, RowData, RowData>.Context ctx,
+			long timestamp) throws Exception {
+		// calculate safe timestamp to cleanup states
+		long minCleanupTimestamp = timestamp + precedingTimeBoundary + 1;
+		long maxCleanupTimestamp = timestamp + (long) (precedingTimeBoundary * 1.5) + 1;
+		// update timestamp and register timer if needed
+		Long curCleanupTimestamp = cleanupTsState.value();
+		if (curCleanupTimestamp == null || curCleanupTimestamp < minCleanupTimestamp) {
+			// we don't delete existing timer since it may delete timer for data processing
+			// TODO Use timer with namespace to distinguish timers
+			ctx.timerService().registerProcessingTimeTimer(maxCleanupTimestamp);
+			cleanupTsState.update(maxCleanupTimestamp);
+		}
+	}
+
 	@Override
 	public void onTimer(
 			long timestamp,
 			KeyedProcessFunction<K, RowData, RowData>.OnTimerContext ctx,
 			Collector<RowData> out) throws Exception {
-		if (needToCleanupState(timestamp)) {
-			// clean up and return
-			cleanupState(inputState, accState);
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			accState.clear();
+			cleanupTsState.clear();
 			function.cleanup();
 			return;
 		}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
index e23fc62..51d1122 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.table.data.JoinedRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
-import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
 import org.apache.flink.table.runtime.generated.AggsHandleFunction;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
@@ -57,10 +56,10 @@ import java.util.List;
  * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
  * FROM T.
  */
-public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, RowData, RowData> {
+public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunction<K, RowData, RowData> {
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(RowTimeRowsBoundedPrecedingFunction.class);
+	private static final Logger LOG = LoggerFactory.getLogger(RowTimeRangeBoundedPrecedingFunction.class);
 
 	private final GeneratedAggsHandleFunction genAggsHandler;
 	private final LogicalType[] accTypes;
@@ -76,6 +75,9 @@ public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctio
 	// the state which used to materialize the accumulator for incremental calculation
 	private transient ValueState<RowData> accState;
 
+	// the state which keeps the safe timestamp to cleanup states
+	private transient ValueState<Long> cleanupTsState;
+
 	// the state which keeps all the data that are not expired.
 	// The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
 	// the second element of tuple is a list that contains the entire data of all the rows belonging
@@ -85,14 +87,11 @@ public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctio
 	private transient AggsHandleFunction function;
 
 	public RowTimeRangeBoundedPrecedingFunction(
-			long minRetentionTime,
-			long maxRetentionTime,
 			GeneratedAggsHandleFunction genAggsHandler,
 			LogicalType[] accTypes,
 			LogicalType[] inputFieldTypes,
 			long precedingOffset,
 			int rowTimeIdx) {
-		super(minRetentionTime, maxRetentionTime);
 		Preconditions.checkNotNull(precedingOffset);
 		this.genAggsHandler = genAggsHandler;
 		this.accTypes = accTypes;
@@ -126,7 +125,11 @@ public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctio
 			rowListTypeInfo);
 		inputState = getRuntimeContext().getMapState(inputStateDesc);
 
-		initCleanupTimeState("RowTimeBoundedRangeOverCleanupTime");
+		ValueStateDescriptor<Long> cleanupTsStateDescriptor = new ValueStateDescriptor<>(
+			"cleanupTsState",
+			Types.LONG
+		);
+		this.cleanupTsState = getRuntimeContext().getState(cleanupTsStateDescriptor);
 	}
 
 	@Override
@@ -134,9 +137,6 @@ public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctio
 			RowData input,
 			KeyedProcessFunction<K, RowData, RowData>.Context ctx,
 			Collector<RowData> out) throws Exception {
-		// register state-cleanup timer
-		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
-
 		// triggering timestamp for trigger calculation
 		long triggeringTs = input.getLong(rowTimeIdx);
 
@@ -158,6 +158,23 @@ public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctio
 				// register event time timer
 				ctx.timerService().registerEventTimeTimer(triggeringTs);
 			}
+			registerCleanupTimer(ctx, triggeringTs);
+		}
+	}
+
+	private void registerCleanupTimer(
+			KeyedProcessFunction<K, RowData, RowData>.Context ctx,
+			long timestamp) throws Exception {
+		// calculate safe timestamp to cleanup states
+		long minCleanupTimestamp = timestamp + precedingOffset + 1;
+		long maxCleanupTimestamp = timestamp + (long) (precedingOffset * 1.5) + 1;
+		// update timestamp and register timer if needed
+		Long curCleanupTimestamp = cleanupTsState.value();
+		if (curCleanupTimestamp == null || curCleanupTimestamp < minCleanupTimestamp) {
+			// we don't delete existing timer since it may delete timer for data processing
+			// TODO Use timer with namespace to distinguish timers
+			ctx.timerService().registerEventTimeTimer(maxCleanupTimestamp);
+			cleanupTsState.update(maxCleanupTimestamp);
 		}
 	}
 
@@ -166,37 +183,14 @@ public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctio
 			long timestamp,
 			KeyedProcessFunction<K, RowData, RowData>.OnTimerContext ctx,
 			Collector<RowData> out) throws Exception {
-		// register state-cleanup timer
-		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
-
-		if (isProcessingTimeTimer(ctx)) {
-			if (stateCleaningEnabled) {
-
-				Iterator<Long> keysIt = inputState.keys().iterator();
-				Long lastProcessedTime = lastTriggeringTsState.value();
-				if (lastProcessedTime == null) {
-					lastProcessedTime = 0L;
-				}
-
-				// is data left which has not been processed yet?
-				boolean noRecordsToProcess = true;
-				while (keysIt.hasNext() && noRecordsToProcess) {
-					if (keysIt.next() > lastProcessedTime) {
-						noRecordsToProcess = false;
-					}
-				}
-
-				if (noRecordsToProcess) {
-					// we clean the state
-					cleanupState(inputState, accState, lastTriggeringTsState);
-					function.cleanup();
-				} else {
-					// There are records left to process because a watermark has not been received yet.
-					// This would only happen if the input stream has stopped. So we don't need to clean up.
-					// We leave the state as it is and schedule a new cleanup timer
-					registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
-				}
-			}
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			accState.clear();
+			lastTriggeringTsState.clear();
+			cleanupTsState.clear();
+			function.cleanup();
 			return;
 		}
 
@@ -275,9 +269,6 @@ public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctio
 			accState.update(accumulators);
 		}
 		lastTriggeringTsState.update(timestamp);
-
-		// update cleanup timer
-		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
 	}
 
 	@Override
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
new file mode 100644
index 0000000..b570a98
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.over;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.AggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.Test;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link ProcTimeRangeBoundedPrecedingFunction}.
+ */
+public class ProcTimeRangeBoundedPrecedingFunctionTest {
+
+	private static GeneratedAggsHandleFunction aggsHandleFunction =
+		new GeneratedAggsHandleFunction("Function", "", new Object[0]) {
+			@Override
+			public AggsHandleFunction newInstance(ClassLoader classLoader) {
+				return new SumAggsHandleFunction(1);
+			}
+		};
+
+	private LogicalType[] inputFieldTypes = new LogicalType[]{
+		new VarCharType(VarCharType.MAX_LENGTH),
+		new BigIntType(),
+	};
+	private LogicalType[] accTypes = new LogicalType[]{ new BigIntType() };
+
+	private BinaryRowDataKeySelector keySelector = new BinaryRowDataKeySelector(new int[]{ 0 }, inputFieldTypes);
+	private TypeInformation<RowData> keyType = keySelector.getProducedType();
+
+	@Test
+	public void testStateCleanup() throws Exception {
+		ProcTimeRangeBoundedPrecedingFunction<RowData> function = new ProcTimeRangeBoundedPrecedingFunction<>(
+			aggsHandleFunction, accTypes, inputFieldTypes, 2000);
+		KeyedProcessOperator<RowData, RowData, RowData> operator = new KeyedProcessOperator<>(function);
+
+		OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
+
+		testHarness.open();
+
+		AbstractKeyedStateBackend stateBackend = (AbstractKeyedStateBackend) operator.getKeyedStateBackend();
+
+		assertEquals("Initial state is not empty", 0, stateBackend.numKeyValueStateEntries());
+
+		// put some records
+		testHarness.setProcessingTime(100);
+		testHarness.processElement(insertRecord("key", 1L));
+		testHarness.processElement(insertRecord("key", 1L));
+		testHarness.setProcessingTime(500);
+		testHarness.processElement(insertRecord("key", 1L));
+
+		testHarness.setProcessingTime(1000);
+		// at this moment we expect the function to have some records in state
+
+		testHarness.setProcessingTime(4000);
+		// at this moment the function should have cleaned up states
+
+		assertEquals("State has not been cleaned up", 0, stateBackend.numKeyValueStateEntries());
+	}
+
+	private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
+			KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception {
+		return new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType);
+	}
+
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunctionTest.java
new file mode 100644
index 0000000..4c148ea9
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunctionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.over;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.AggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.Test;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link RowTimeRangeBoundedPrecedingFunction}.
+ */
+public class RowTimeRangeBoundedPrecedingFunctionTest {
+
+	private static GeneratedAggsHandleFunction aggsHandleFunction =
+		new GeneratedAggsHandleFunction("Function", "", new Object[0]) {
+			@Override
+			public AggsHandleFunction newInstance(ClassLoader classLoader) {
+				return new SumAggsHandleFunction(1);
+			}
+		};
+
+	private LogicalType[] inputFieldTypes = new LogicalType[]{
+		new VarCharType(VarCharType.MAX_LENGTH),
+		new BigIntType(),
+		new BigIntType()
+	};
+	private LogicalType[] accTypes = new LogicalType[]{ new BigIntType() };
+
+	private BinaryRowDataKeySelector keySelector = new BinaryRowDataKeySelector(new int[]{ 0 }, inputFieldTypes);
+	private TypeInformation<RowData> keyType = keySelector.getProducedType();
+
+	@Test
+	public void testStateCleanup() throws Exception {
+		RowTimeRangeBoundedPrecedingFunction<RowData> function = new RowTimeRangeBoundedPrecedingFunction<>(
+			aggsHandleFunction, accTypes, inputFieldTypes, 2000, 2);
+		KeyedProcessOperator<RowData, RowData, RowData> operator = new KeyedProcessOperator<>(function);
+
+		OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(operator);
+
+		testHarness.open();
+
+		AbstractKeyedStateBackend stateBackend = (AbstractKeyedStateBackend) operator.getKeyedStateBackend();
+
+		assertEquals("Initial state is not empty", 0, stateBackend.numKeyValueStateEntries());
+
+		// put some records
+		testHarness.processElement(insertRecord("key", 1L, 100L));
+		testHarness.processElement(insertRecord("key", 1L, 100L));
+		testHarness.processElement(insertRecord("key", 1L, 500L));
+
+		testHarness.processWatermark(new Watermark(1000L));
+		// at this moment we expect the function to have some records in state
+
+		testHarness.processWatermark(new Watermark(4000L));
+		// at this moment the function should have cleaned up states
+
+		assertEquals("State has not been cleaned up", 0, stateBackend.numKeyValueStateEntries());
+	}
+
+	private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
+			KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception {
+		return new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType);
+	}
+
+}