You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/01/29 10:25:07 UTC

[flink] branch master updated: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 592f3a9  [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
592f3a9 is described below

commit 592f3a9a299826efcf9ffa17c56221ac3904f021
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Thu Sep 27 17:44:52 2018 +0200

    [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
    
    This closes #7188.
---
 docs/dev/stream/state/state.md                     |  46 ++++++
 .../flink/api/common/state/StateTtlConfig.java     |  81 +++++++++-
 .../tests/DataStreamStateTTLTestProgram.java       |  43 +-----
 .../flink/streaming/tests/TtlTestConfig.java       |  73 +++++++++
 .../streaming/tests/TtlVerifyUpdateFunction.java   |   4 +-
 .../tests/verify/TtlAggregatingStateVerifier.java  |   2 +-
 .../tests/verify/TtlFoldingStateVerifier.java      |   2 +-
 .../tests/verify/TtlListStateVerifier.java         |   2 +-
 .../tests/verify/TtlMapStateVerifier.java          |   2 +-
 .../tests/verify/TtlReducingStateVerifier.java     |   2 +-
 .../tests/verify/TtlValueStateVerifier.java        |   2 +-
 .../test-scripts/test_stream_state_ttl.sh          |   2 +-
 .../network/KvStateServerHandlerTest.java          |   5 +
 .../runtime/state/AbstractKeyedStateBackend.java   |  21 +++
 .../flink/runtime/state/KeyedStateBackend.java     |  17 +++
 .../flink/runtime/state/{heap => }/StateEntry.java |  31 +++-
 .../runtime/state/heap/AbstractHeapState.java      |   5 +
 .../runtime/state/heap/CopyOnWriteStateTable.java  | 164 ++++++++++++++++-----
 .../runtime/state/heap/NestedMapsStateTable.java   | 116 +++++++++++++++
 .../flink/runtime/state/heap/StateTable.java       |   3 +
 .../runtime/state/internal/InternalKvState.java    |  48 ++++++
 .../flink/runtime/state/ttl/AbstractTtlState.java  |  28 +++-
 .../runtime/state/ttl/TtlAggregatingState.java     |  23 +--
 .../flink/runtime/state/ttl/TtlFoldingState.java   |  21 ++-
 .../runtime/state/ttl/TtlIncrementalCleanup.java   | 102 +++++++++++++
 .../flink/runtime/state/ttl/TtlListState.java      |  39 ++++-
 .../flink/runtime/state/ttl/TtlMapState.java       |  31 +++-
 .../flink/runtime/state/ttl/TtlReducingState.java  |  20 ++-
 .../{TtlValueState.java => TtlStateContext.java}   |  46 +++---
 .../flink/runtime/state/ttl/TtlStateFactory.java   | 112 +++++++++-----
 .../flink/runtime/state/ttl/TtlValueState.java     |  20 ++-
 .../flink/runtime/query/KvStateRegistryTest.java   |   5 +
 .../state/heap/CopyOnWriteStateTableTest.java      |  53 ++++++-
 .../heap/StateTableSnapshotCompatibilityTest.java  |   1 +
 .../state/ttl/HeapAsyncSnapshotTtlStateTest.java   |   5 +
 .../state/ttl/HeapSyncSnapshotTtlStateTest.java    |   5 +
 .../runtime/state/ttl/StateBackendTestContext.java |   7 +-
 .../flink/runtime/state/ttl/TtlStateTestBase.java  | 101 ++++++++++++-
 .../state/ttl/mock/MockInternalKvState.java        |   5 +
 .../runtime/state/ttl/mock/MockStateBackend.java   |   7 +-
 .../streaming/state/AbstractRocksDBState.java      |   5 +
 .../state/RocksDBNativeMetricMonitorTest.java      |  12 +-
 42 files changed, 1112 insertions(+), 207 deletions(-)

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 1654c9a..a7fced2 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -361,6 +361,8 @@ e.g. by calling `ValueState.value()`.
 <span class="label label-danger">Attention</span> This means that by default if expired state is not read, 
 it won't be removed, possibly leading to ever growing state. This might change in future releases. 
 
+##### Cleanup in full snapshot
+
 Additionally, you can activate the cleanup at the moment of taking the full state snapshot which 
 will reduce its size. The local state is not cleaned up under the current implementation 
 but it will not include the removed expired state in case of restoration from the previous snapshot.
@@ -394,6 +396,50 @@ val ttlConfig = StateTtlConfig
 
 This option is not applicable for the incremental checkpointing in the RocksDB state backend.
 
+##### Incremental cleanup
+
+Another option is to trigger cleanup of some state entries incrementally.
+The trigger can be a callback from each state access or/and each record processing.
+If this cleanup strategy is active for certain state,
+The storage backend keeps a lazy global iterator for this state over all its entries.
+Every time incremental cleanup is triggered, the iterator is advanced.
+The traversed state entries are checked and expired ones are cleaned up.
+
+This feature can be activated in `StateTtlConfig`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.state.StateTtlConfig;
+ StateTtlConfig ttlConfig = StateTtlConfig
+    .newBuilder(Time.seconds(1))
+    .cleanupIncrementally()
+    .build();
+{% endhighlight %}
+</div>
+ <div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.state.StateTtlConfig
+val ttlConfig = StateTtlCon fig
+    .newBuilder(Time.seconds(1))
+    .cleanupIncrementally
+    .build
+{% endhighlight %}
+</div>
+</div>
+
+This strategy has two parameters. The first one is number of checked state entries per each cleanup triggering.
+If enabled, it is always triggered per each state access.
+The second parameter defines whether to trigger cleanup additionally per each record processing.
+
+**Notes:**
+- If no access happens to the state or no records are processed, expired state will persist.
+- Time spent for the incremental cleanup increases record processing latency.
+- At the moment incremental cleanup is implemented only for Heap state backend. Setting it for RocksDB will have no effect.
+- If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys 
+while iterating because of its specific implementation which does not support concurrent modifications. 
+Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.
+
 More strategies will be added in the future for cleaning up expired state automatically in the background.
 
 ### State in the Scala DataStream API
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index 42eaea4..5547775 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -21,7 +21,9 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.EnumMap;
@@ -219,7 +221,44 @@ public class StateTtlConfig implements Serializable {
 		public Builder cleanupFullSnapshot() {
 			cleanupStrategies.strategies.put(
 				CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,
-				new CleanupStrategies.CleanupStrategy() {  });
+				new CleanupStrategies.EmptyCleanupStrategy());
+			return this;
+		}
+
+		/**
+		 * Cleanup expired state incrementally cleanup local state.
+		 *
+		 * <p>Upon every state access this cleanup strategy checks a bunch of state keys for expiration
+		 * and cleans up expired ones. It keeps a lazy iterator through all keys with relaxed consistency
+		 * if backend supports it. This way all keys should be regularly checked and cleaned eventually over time
+		 * if any state is constantly being accessed.
+		 *
+		 * <p>Additionally to the incremental cleanup upon state access, it can also run per every record.
+		 * Caution: if there are a lot of registered states using this option,
+		 * they all will be iterated for every record to check if there is something to cleanup.
+		 *
+		 * <p>Note: if no access happens to this state or no records are processed
+		 * in case of {@code runCleanupForEveryRecord}, expired state will persist.
+		 *
+		 * <p>Note: Time spent for the incremental cleanup increases record processing latency.
+		 *
+		 * <p>Note: At the moment incremental cleanup is implemented only for Heap state backend.
+		 * Setting it for RocksDB will have no effect.
+		 *
+		 * <p>Note: If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys
+		 * while iterating because of its specific implementation which does not support concurrent modifications.
+		 * Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.
+		 *
+		 * @param cleanupSize max number of keys pulled from queue for clean up upon state touch for any key
+		 * @param runCleanupForEveryRecord run incremental cleanup per each processed record
+		 */
+		@Nonnull
+		public Builder cleanupIncrementally(
+			@Nonnegative int cleanupSize,
+			boolean runCleanupForEveryRecord) {
+			cleanupStrategies.strategies.put(
+				CleanupStrategies.Strategies.INCREMENTAL_CLEANUP,
+				new IncrementalCleanupStrategy(cleanupSize, runCleanupForEveryRecord));
 			return this;
 		}
 
@@ -256,7 +295,8 @@ public class StateTtlConfig implements Serializable {
 
 		/** Fixed strategies ordinals in {@code strategies} config field. */
 		enum Strategies {
-			FULL_STATE_SCAN_SNAPSHOT
+			FULL_STATE_SCAN_SNAPSHOT,
+			INCREMENTAL_CLEANUP
 		}
 
 		/** Base interface for cleanup strategies configurations. */
@@ -264,10 +304,47 @@ public class StateTtlConfig implements Serializable {
 
 		}
 
+		static class EmptyCleanupStrategy implements CleanupStrategy {
+			private static final long serialVersionUID = 1373998465131443873L;
+		}
+
 		final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);
 
 		public boolean inFullSnapshot() {
 			return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
 		}
+
+		@Nullable
+		public IncrementalCleanupStrategy getIncrementalCleanupStrategy() {
+			return (IncrementalCleanupStrategy) strategies.get(Strategies.INCREMENTAL_CLEANUP);
+		}
+	}
+
+	/** Configuration of cleanup strategy while taking the full snapshot.  */
+	public static class IncrementalCleanupStrategy implements CleanupStrategies.CleanupStrategy {
+		private static final long serialVersionUID = 3109278696501988780L;
+
+		/** Max number of keys pulled from queue for clean up upon state touch for any key. */
+		private final int cleanupSize;
+
+		/** Whether to run incremental cleanup per each processed record. */
+		private final boolean runCleanupForEveryRecord;
+
+		private IncrementalCleanupStrategy(
+			int cleanupSize,
+			boolean runCleanupForEveryRecord) {
+			Preconditions.checkArgument(cleanupSize >= 0,
+				"Number of incrementally cleaned up state entries cannot be negative.");
+			this.cleanupSize = cleanupSize;
+			this.runCleanupForEveryRecord = runCleanupForEveryRecord;
+		}
+
+		public int getCleanupSize() {
+			return cleanupSize;
+		}
+
+		public boolean runCleanupForEveryRecord() {
+			return runCleanupForEveryRecord;
+		}
 	}
 }
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
index 1a572f3..5d55bec 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
@@ -19,10 +19,7 @@
 package org.apache.flink.streaming.tests;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
@@ -48,26 +45,6 @@ import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
  * </ul>
  */
 public class DataStreamStateTTLTestProgram {
-	private static final ConfigOption<Integer> UPDATE_GENERATOR_SRC_KEYSPACE = ConfigOptions
-		.key("update_generator_source.keyspace")
-		.defaultValue(100);
-
-	private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
-		.key("update_generator_source.sleep_time")
-		.defaultValue(0L);
-
-	private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
-		.key("update_generator_source.sleep_after_elements")
-		.defaultValue(0L);
-
-	private static final ConfigOption<Long> STATE_TTL_VERIFIER_TTL_MILLI = ConfigOptions
-		.key("state_ttl_verifier.ttl_milli")
-		.defaultValue(1000L);
-
-	private static final ConfigOption<Long> REPORT_STAT_AFTER_UPDATES_NUM = ConfigOptions
-		.key("report_stat.after_updates_num")
-		.defaultValue(200L);
-
 	public static void main(String[] args) throws Exception {
 		final ParameterTool pt = ParameterTool.fromArgs(args);
 
@@ -77,23 +54,17 @@ public class DataStreamStateTTLTestProgram {
 
 		final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
 
-		int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
-		long sleepAfterElements = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
-			UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
-		long sleepTime = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_TIME.key(),
-			UPDATE_GENERATOR_SRC_SLEEP_TIME.defaultValue());
-		Time ttl = Time.milliseconds(pt.getLong(STATE_TTL_VERIFIER_TTL_MILLI.key(),
-			STATE_TTL_VERIFIER_TTL_MILLI.defaultValue()));
-		long reportStatAfterUpdatesNum = pt.getLong(REPORT_STAT_AFTER_UPDATES_NUM.key(),
-			REPORT_STAT_AFTER_UPDATES_NUM.defaultValue());
-
-		StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(ttl).build();
+		TtlTestConfig config = TtlTestConfig.fromArgs(pt);
+		StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
+			.cleanupIncrementally(5, true)
+			.cleanupFullSnapshot()
+			.build();
 
 		env
-			.addSource(new TtlStateUpdateSource(keySpace, sleepAfterElements, sleepTime))
+			.addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
 			.name("TtlStateUpdateSource")
 			.keyBy(TtlStateUpdate::getKey)
-			.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, reportStatAfterUpdatesNum))
+			.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
 			.name("TtlVerifyUpdateFunction")
 			.addSink(new PrintSinkFunction<>())
 			.name("PrintFailedVerifications");
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlTestConfig.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlTestConfig.java
new file mode 100644
index 0000000..854db36
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlTestConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+class TtlTestConfig {
+	private static final ConfigOption<Integer> UPDATE_GENERATOR_SRC_KEYSPACE = ConfigOptions
+		.key("update_generator_source.keyspace")
+		.defaultValue(100);
+
+	private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
+		.key("update_generator_source.sleep_time")
+		.defaultValue(0L);
+
+	private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
+		.key("update_generator_source.sleep_after_elements")
+		.defaultValue(0L);
+
+	private static final ConfigOption<Long> STATE_TTL_VERIFIER_TTL_MILLI = ConfigOptions
+		.key("state_ttl_verifier.ttl_milli")
+		.defaultValue(1000L);
+
+	private static final ConfigOption<Long> REPORT_STAT_AFTER_UPDATES_NUM = ConfigOptions
+		.key("report_stat.after_updates_num")
+		.defaultValue(200L);
+
+	final int keySpace;
+	final long sleepAfterElements;
+	final long sleepTime;
+	final Time ttl;
+	final long reportStatAfterUpdatesNum;
+
+	private TtlTestConfig(int keySpace, long sleepAfterElements, long sleepTime, Time ttl, long reportStatAfterUpdatesNum) {
+		this.keySpace = keySpace;
+		this.sleepAfterElements = sleepAfterElements;
+		this.sleepTime = sleepTime;
+		this.ttl = ttl;
+		this.reportStatAfterUpdatesNum = reportStatAfterUpdatesNum;
+	}
+
+	static TtlTestConfig fromArgs(ParameterTool pt) {
+		int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
+		long sleepAfterElements = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
+			UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
+		long sleepTime = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_TIME.key(),
+			UPDATE_GENERATOR_SRC_SLEEP_TIME.defaultValue());
+		Time ttl = Time.milliseconds(pt.getLong(STATE_TTL_VERIFIER_TTL_MILLI.key(),
+			STATE_TTL_VERIFIER_TTL_MILLI.defaultValue()));
+		long reportStatAfterUpdatesNum = pt.getLong(REPORT_STAT_AFTER_UPDATES_NUM.key(),
+			REPORT_STAT_AFTER_UPDATES_NUM.defaultValue());
+		return new TtlTestConfig(keySpace, sleepAfterElements, sleepTime, ttl, reportStatAfterUpdatesNum);
+	}
+}
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
index 250041d..f9e492e 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -64,9 +64,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * - emits verification context in case of failure
  */
 class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction {
-
 	private static final long serialVersionUID = 1L;
-
 	private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);
 
 	@Nonnull
@@ -154,6 +152,8 @@ class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String
 	}
 
 	private static class UpdateStat implements Serializable {
+		private static final long serialVersionUID = -4557720969995878873L;
+
 		final long reportStatAfterUpdatesNum;
 		long updates = 0;
 		long prevUpdatesNum = 0;
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
index 8a62957..9a6c06a 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
@@ -34,7 +34,7 @@ import java.util.List;
 class TtlAggregatingStateVerifier extends AbstractTtlStateVerifier<
 	AggregatingStateDescriptor<Integer, Long, String>, AggregatingState<Integer, String>, Long, Integer, String> {
 	TtlAggregatingStateVerifier() {
-		super(new AggregatingStateDescriptor<>("TtlAggregatingStateVerifier", AGG_FUNC, LongSerializer.INSTANCE));
+		super(new AggregatingStateDescriptor<>(TtlAggregatingStateVerifier.class.getSimpleName(), AGG_FUNC, LongSerializer.INSTANCE));
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
index bcc8590..902792f 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
@@ -37,7 +37,7 @@ class TtlFoldingStateVerifier extends AbstractTtlStateVerifier<
 
 	TtlFoldingStateVerifier() {
 		super(new FoldingStateDescriptor<>(
-			"TtlFoldingStateVerifier", INIT_VAL, (v, acc) -> acc + v, LongSerializer.INSTANCE));
+			TtlFoldingStateVerifier.class.getSimpleName(), INIT_VAL, (v, acc) -> acc + v, LongSerializer.INSTANCE));
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
index 4aed98f..ad97238 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
@@ -34,7 +34,7 @@ import java.util.stream.StreamSupport;
 class TtlListStateVerifier extends AbstractTtlStateVerifier<
 	ListStateDescriptor<String>, ListState<String>, List<String>, String, List<String>> {
 	TtlListStateVerifier() {
-		super(new ListStateDescriptor<>("TtlListStateVerifier", StringSerializer.INSTANCE));
+		super(new ListStateDescriptor<>(TtlListStateVerifier.class.getSimpleName(), StringSerializer.INSTANCE));
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
index eeda78d..c3d9449 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
@@ -45,7 +45,7 @@ class TtlMapStateVerifier extends AbstractTtlStateVerifier<
 	}
 
 	TtlMapStateVerifier() {
-		super(new MapStateDescriptor<>("TtlMapStateVerifier", StringSerializer.INSTANCE, StringSerializer.INSTANCE));
+		super(new MapStateDescriptor<>(TtlMapStateVerifier.class.getSimpleName(), StringSerializer.INSTANCE, StringSerializer.INSTANCE));
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
index cd33ed0..2358af2 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
@@ -34,7 +34,7 @@ class TtlReducingStateVerifier extends AbstractTtlStateVerifier<
 	ReducingStateDescriptor<Integer>, ReducingState<Integer>, Integer, Integer, Integer> {
 	TtlReducingStateVerifier() {
 		super(new ReducingStateDescriptor<>(
-			"TtlReducingStateVerifier",
+			TtlReducingStateVerifier.class.getSimpleName(),
 			(ReduceFunction<Integer>) (value1, value2) -> value1 + value2,
 			IntSerializer.INSTANCE));
 	}
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
index d8bdfd4..02dccab 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
@@ -31,7 +31,7 @@ import java.util.List;
 class TtlValueStateVerifier
 	extends AbstractTtlStateVerifier<ValueStateDescriptor<String>, ValueState<String>, String, String, String> {
 	TtlValueStateVerifier() {
-		super(new ValueStateDescriptor<>("TtlValueStateVerifier", StringSerializer.INSTANCE));
+		super(new ValueStateDescriptor<>(TtlValueStateVerifier.class.getSimpleName(), StringSerializer.INSTANCE));
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
index 3dca45a..0d76f27 100755
--- a/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
+++ b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
@@ -20,7 +20,7 @@
 source "$(dirname "$0")"/common.sh
 
 STATE_BACKEND_TYPE="${1:-file}"
-STATE_BACKEND_FILE_ASYNC="${2:-false}"
+STATE_BACKEND_FILE_ASYNC="${2:-true}"
 TTL="${3:-1000}"
 PRECISION="${4:-5}"
 PARALLELISM="${5-3}"
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index bfadb75..5ce889c 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -333,6 +333,11 @@ public class KvStateServerHandlerTest extends TestLogger {
 					}
 
 					@Override
+					public StateIncrementalVisitor<Integer, VoidNamespace, Long> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
+						throw new UnsupportedOperationException();
+					}
+
+					@Override
 					public void clear() {
 
 					}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 7422e65..11b4c23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -38,6 +38,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -60,6 +62,9 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	/** The currently active key. */
 	private K currentKey;
 
+	/** Listeners to changes of keyed context ({@link #currentKey}). */
+	private final Set<KeySelectionListener<K>> keySelectionListeners;
+
 	/** The key group of the currently active key. */
 	private int currentKeyGroup;
 
@@ -115,6 +120,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 		this.executionConfig = executionConfig;
 		this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig);
 		this.ttlTimeProvider = Preconditions.checkNotNull(ttlTimeProvider);
+		this.keySelectionListeners = new HashSet<>();
 	}
 
 	private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
@@ -149,10 +155,25 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 */
 	@Override
 	public void setCurrentKey(K newKey) {
+		notifyKeySelected(newKey);
 		this.currentKey = newKey;
 		this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
 	}
 
+	private void notifyKeySelected(K newKey) {
+		keySelectionListeners.forEach(listener -> listener.keySelected(newKey));
+	}
+
+	@Override
+	public void registerKeySelectionListener(KeySelectionListener<K> listener) {
+		keySelectionListeners.add(listener);
+	}
+
+	@Override
+	public boolean deregisterKeySelectionListener(KeySelectionListener<K> listener) {
+		return keySelectionListeners.remove(listener);
+	}
+
 	/**
 	 * @see KeyedStateBackend
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 7ba14b3..997e68b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -106,4 +106,21 @@ public interface KeyedStateBackend<K>
 
 	@Override
 	void dispose();
+
+	/** State backend will call {@link KeySelectionListener#keySelected} when key context is switched if supported. */
+	void registerKeySelectionListener(KeySelectionListener<K> listener);
+
+	/**
+	 * Stop calling listener registered in {@link #registerKeySelectionListener}.
+	 *
+	 * @return returns true iff listener was registered before.
+	 */
+	boolean deregisterKeySelectionListener(KeySelectionListener<K> listener);
+
+	/** Listener is given a callback when {@link #setCurrentKey} is called (key context changes). */
+	@FunctionalInterface
+	interface KeySelectionListener<K> {
+		/** Callback when key context is switched. */
+		void keySelected(K newKey);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateEntry.java
similarity index 65%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/StateEntry.java
index 8e29cb2..847ca83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateEntry.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.heap;
+package org.apache.flink.runtime.state;
 
 /**
- * Interface of entries in a state table. Entries are triple of key, namespace, and state.
+ * Interface of entries in a state backend. Entries are triple of key, namespace, and state.
  *
  * @param <K> type of key.
  * @param <N> type of namespace.
@@ -41,4 +41,31 @@ public interface StateEntry<K, N, S> {
 	 * Returns the state of this entry.
 	 */
 	S getState();
+
+	class SimpleStateEntry<K, N, S> implements StateEntry<K, N, S> {
+		private final K key;
+		private final N namespace;
+		private final S value;
+
+		public SimpleStateEntry(K key, N namespace, S value) {
+			this.key = key;
+			this.namespace = namespace;
+			this.value = value;
+		}
+
+		@Override
+		public K getKey() {
+			return key;
+		}
+
+		@Override
+		public N getNamespace() {
+			return namespace;
+		}
+
+		@Override
+		public S getState() {
+			return value;
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index e45b0be..61a46899 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -124,4 +124,9 @@ public abstract class AbstractHeapState<K, N, SV> implements InternalKvState<K,
 			return null;
 		}
 	}
+
+	@Override
+	public StateIncrementalVisitor<K, N, SV> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
+		return stateTable.getStateIncrementalVisitor(recommendedMaxNumberOfReturnedRecords);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index 72c70bc..9c360da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateEntry;
 import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -31,6 +33,8 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -118,6 +122,12 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 	private static final int MAXIMUM_CAPACITY = 1 << 30;
 
 	/**
+	 * Default capacity for a {@link CopyOnWriteStateTable}. Must be a power of two,
+	 * greater than {@code MINIMUM_CAPACITY} and less than {@code MAXIMUM_CAPACITY}.
+	 */
+	public static final int DEFAULT_CAPACITY = 1024;
+
+	/**
 	 * Minimum number of entries that one step of incremental rehashing migrates from the old to the new sub-table.
 	 */
 	private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
@@ -199,13 +209,13 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 	private int modCount;
 
 	/**
-	 * Constructs a new {@code StateTable} with default capacity of 1024.
+	 * Constructs a new {@code StateTable} with default capacity of {@code DEFAULT_CAPACITY}.
 	 *
 	 * @param keyContext the key context.
 	 * @param metaInfo   the meta information, including the type serializer for state copy-on-write.
 	 */
 	CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
-		this(keyContext, metaInfo, 1024);
+		this(keyContext, metaInfo, DEFAULT_CAPACITY);
 	}
 
 	/**
@@ -1032,75 +1042,153 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 
 	// StateEntryIterator  ---------------------------------------------------------------------------------------------
 
+	@Override
+	public StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
+		return new StateIncrementalVisitorImpl(recommendedMaxNumberOfReturnedRecords);
+	}
+
 	/**
-	 * Iterator over the entries in a {@link CopyOnWriteStateTable}.
+	 * Iterator over state entry chains in a {@link CopyOnWriteStateTable}.
 	 */
-	class StateEntryIterator implements Iterator<StateEntry<K, N, S>> {
-		private StateTableEntry<K, N, S>[] activeTable;
+	class StateEntryChainIterator implements Iterator<StateTableEntry<K, N, S>> {
+		StateTableEntry<K, N, S>[] activeTable;
 		private int nextTablePosition;
-		private StateTableEntry<K, N, S> nextEntry;
-		private int expectedModCount;
+		private final int maxTraversedTablePositions;
 
-		StateEntryIterator() {
+		StateEntryChainIterator() {
+			this(Integer.MAX_VALUE);
+		}
+
+		StateEntryChainIterator(int maxTraversedTablePositions) {
+			this.maxTraversedTablePositions = maxTraversedTablePositions;
 			this.activeTable = primaryTable;
 			this.nextTablePosition = 0;
-			this.expectedModCount = modCount;
-			this.nextEntry = getBootstrapEntry();
-			advanceIterator();
 		}
 
-		private StateTableEntry<K, N, S> advanceIterator() {
+		@Override
+		public boolean hasNext() {
+			return size() > 0 && (nextTablePosition < activeTable.length || activeTable == primaryTable);
+		}
 
-			StateTableEntry<K, N, S> entryToReturn = nextEntry;
-			StateTableEntry<K, N, S> next = entryToReturn.next;
+		@Override
+		public StateTableEntry<K, N, S> next() {
+			StateTableEntry<K, N, S> next;
+			// consider both sub-tables to cover the case of rehash
+			while (true) { // current is empty
+				// try get next in active table or
+				// iteration is done over primary and rehash table
+				// or primary was swapped with rehash when rehash is done
+				next = nextActiveTablePosition();
+				if (next != null ||
+					nextTablePosition < activeTable.length ||
+					activeTable == incrementalRehashTable ||
+					activeTable != primaryTable) {
+					return next;
+				} else {
+					// switch to rehash (empty if no rehash)
+					activeTable = incrementalRehashTable;
+					nextTablePosition = 0;
+				}
+			}
+		}
 
-			// consider both sub-tables tables to cover the case of rehash
-			while (next == null) {
+		private StateTableEntry<K, N, S> nextActiveTablePosition() {
+			StateTableEntry<K, N, S>[] tab = activeTable;
+			int traversedPositions = 0;
+			while (nextTablePosition < tab.length && traversedPositions < maxTraversedTablePositions) {
+				StateTableEntry<K, N, S> next = tab[nextTablePosition++];
+				if (next != null) {
+					return next;
+				}
+				traversedPositions++;
+			}
+			return null;
+		}
+	}
 
-				StateTableEntry<K, N, S>[] tab = activeTable;
+	/**
+	 * Iterator over state entries in a {@link CopyOnWriteStateTable} which does not tolerate concurrent modifications.
+	 */
+	class StateEntryIterator implements Iterator<StateEntry<K, N, S>> {
 
-				while (nextTablePosition < tab.length) {
-					next = tab[nextTablePosition++];
+		private final StateEntryChainIterator chainIterator;
+		private StateTableEntry<K, N, S> nextEntry;
+		private final int expectedModCount;
 
-					if (next != null) {
-						nextEntry = next;
-						return entryToReturn;
-					}
-				}
+		StateEntryIterator() {
+			this.chainIterator = new StateEntryChainIterator();
+			this.expectedModCount = modCount;
+			this.nextEntry = getBootstrapEntry();
+			advanceIterator();
+		}
 
-				if (activeTable == incrementalRehashTable) {
-					break;
-				}
+		@Override
+		public boolean hasNext() {
+			return nextEntry != null;
+		}
 
-				activeTable = incrementalRehashTable;
-				nextTablePosition = 0;
+		@Override
+		public StateEntry<K, N, S> next() {
+			if (modCount != expectedModCount) {
+				throw new ConcurrentModificationException();
 			}
+			if (!hasNext()) {
+				throw new NoSuchElementException();
+			}
+			return advanceIterator();
+		}
 
+		StateTableEntry<K, N, S> advanceIterator() {
+			StateTableEntry<K, N, S> entryToReturn = nextEntry;
+			StateTableEntry<K, N, S> next = nextEntry.next;
+			if (next == null) {
+				next = chainIterator.next();
+			}
 			nextEntry = next;
 			return entryToReturn;
 		}
+	}
+
+	/**
+	 * Incremental visitor over state entries in a {@link CopyOnWriteStateTable}.
+	 */
+	class StateIncrementalVisitorImpl implements StateIncrementalVisitor<K, N, S> {
+
+		private final StateEntryChainIterator chainIterator;
+		private final Collection<StateEntry<K, N, S>> chainToReturn = new ArrayList<>(5);
+
+		StateIncrementalVisitorImpl(int recommendedMaxNumberOfReturnedRecords) {
+			chainIterator = new StateEntryChainIterator(recommendedMaxNumberOfReturnedRecords);
+		}
 
 		@Override
 		public boolean hasNext() {
-			return nextEntry != null;
+			return chainIterator.hasNext();
 		}
 
 		@Override
-		public StateTableEntry<K, N, S> next() {
-			if (modCount != expectedModCount) {
-				throw new ConcurrentModificationException();
+		public Collection<StateEntry<K, N, S>> nextEntries() {
+			if (!hasNext()) {
+				return null;
 			}
 
-			if (nextEntry == null) {
-				throw new NoSuchElementException();
+			chainToReturn.clear();
+			for (StateTableEntry<K, N, S> nextEntry = chainIterator.next();
+				 nextEntry != null;
+				 nextEntry = nextEntry.next) {
+				chainToReturn.add(nextEntry);
 			}
+			return chainToReturn;
+		}
 
-			return advanceIterator();
+		@Override
+		public void remove(StateEntry<K, N, S> stateEntry) {
+			CopyOnWriteStateTable.this.remove(stateEntry.getKey(), stateEntry.getNamespace());
 		}
 
 		@Override
-		public void remove() {
-			throw new UnsupportedOperationException("Read-only iterator");
+		public void update(StateEntry<K, N, S> stateEntry, S newValue) {
+			CopyOnWriteStateTable.this.put(stateEntry.getKey(), stateEntry.getNamespace(), newValue);
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 167d90f..f6b5557 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -24,10 +24,13 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateEntry.SimpleStateEntry;
 import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
 import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
@@ -35,8 +38,11 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Stream;
@@ -185,6 +191,11 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 			.flatMap(namespaceSate -> namespaceSate.keySet().stream());
 	}
 
+	@Override
+	public StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
+		return new StateEntryIterator();
+	}
+
 	// ------------------------------------------------------------------------
 
 	private boolean containsKey(K key, int keyGroupIndex, N namespace) {
@@ -414,4 +425,109 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 			return filtered;
 		}
 	}
+
+	/**
+	 * Iterator over state entries in a {@link NestedMapsStateTable}.
+	 *
+	 * <p>The iterator keeps a snapshotted copy of key/namespace sets, available at the beginning of iteration.
+	 * While further iterating the copy, the iterator returns the actual state value from primary maps
+	 * if exists at that moment.
+	 *
+	 * <p>Note: Usage of this iterator can have a heap memory consumption impact.
+	 */
+	class StateEntryIterator implements StateIncrementalVisitor<K, N, S>, Iterator<StateEntry<K, N, S>> {
+		private int keyGropuIndex;
+		private Iterator<Map.Entry<N, Map<K, S>>> namespaceIterator;
+		private Map.Entry<N, Map<K, S>> namespace;
+		private Iterator<Map.Entry<K, S>> keyValueIterator;
+		private StateEntry<K, N, S> nextEntry;
+		private StateEntry<K, N, S> lastReturnedEntry;
+
+		StateEntryIterator() {
+			keyGropuIndex = 0;
+			namespace = null;
+			keyValueIterator = null;
+			nextKeyIterator();
+		}
+
+		@Override
+		public boolean hasNext() {
+			nextKeyIterator();
+			return keyIteratorHasNext();
+		}
+
+		@Override
+		public Collection<StateEntry<K, N, S>> nextEntries() {
+			StateEntry<K, N, S> nextEntry = next();
+			return nextEntry == null ? Collections.emptyList() : Collections.singletonList(nextEntry);
+		}
+
+		@Override
+		public StateEntry<K, N, S> next() {
+			StateEntry<K, N, S> next = null;
+			if (hasNext()) {
+				next = nextEntry;
+			}
+			nextEntry = null;
+			lastReturnedEntry = next;
+			return next;
+		}
+
+		private void nextKeyIterator() {
+			while (!keyIteratorHasNext()) {
+				nextNamespaceIterator();
+				if (namespaceIteratorHasNext()) {
+					namespace = namespaceIterator.next();
+					keyValueIterator = new HashSet<>(namespace.getValue().entrySet()).iterator();
+				} else {
+					break;
+				}
+			}
+		}
+
+		private void nextNamespaceIterator() {
+			while (!namespaceIteratorHasNext()) {
+				while (keyGropuIndex < state.length && state[keyGropuIndex] == null) {
+					keyGropuIndex++;
+				}
+				if (keyGropuIndex < state.length && state[keyGropuIndex] != null) {
+					namespaceIterator = new HashSet<>(state[keyGropuIndex++].entrySet()).iterator();
+				} else {
+					break;
+				}
+			}
+		}
+
+		private boolean keyIteratorHasNext() {
+			while (nextEntry == null && keyValueIterator != null && keyValueIterator.hasNext()) {
+				Map.Entry<K, S> next = keyValueIterator.next();
+				Map<K, S> ns = state[keyGropuIndex - 1] == null ? null :
+					state[keyGropuIndex - 1].getOrDefault(namespace.getKey(), null);
+				S upToDateValue = ns == null ? null : ns.getOrDefault(next.getKey(), null);
+				if (upToDateValue != null) {
+					nextEntry = new SimpleStateEntry<>(next.getKey(), namespace.getKey(), upToDateValue);
+				}
+			}
+			return nextEntry != null;
+		}
+
+		private boolean namespaceIteratorHasNext() {
+			return namespaceIterator != null && namespaceIterator.hasNext();
+		}
+
+		@Override
+		public void remove() {
+			remove(lastReturnedEntry);
+		}
+
+		@Override
+		public void remove(StateEntry<K, N, S> stateEntry) {
+			state[keyGropuIndex - 1].get(stateEntry.getNamespace()).remove(stateEntry.getKey());
+		}
+
+		@Override
+		public void update(StateEntry<K, N, S> stateEntry, S newValue) {
+			state[keyGropuIndex - 1].get(stateEntry.getNamespace()).put(stateEntry.getKey(), newValue);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 565790e..7f1b9c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
 import org.apache.flink.runtime.state.StateSnapshotRestore;
 import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -166,6 +167,8 @@ public abstract class StateTable<K, N, S> implements StateSnapshotRestore {
 
 	public abstract Stream<K> getKeys(N namespace);
 
+	public abstract StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords);
+
 	// Meta data setter / getter and toString -----------------------------------------------------
 
 	public TypeSerializer<S> getStateSerializer() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
index 1310dd2..3c1ef2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.internal;
 
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateEntry;
+
+import java.util.Collection;
 
 /**
  * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the
@@ -105,4 +108,49 @@ public interface InternalKvState<K, N, V> extends State {
 			final TypeSerializer<K> safeKeySerializer,
 			final TypeSerializer<N> safeNamespaceSerializer,
 			final TypeSerializer<V> safeValueSerializer) throws Exception;
+
+	/**
+	 * Get global visitor of state entries.
+	 *
+	 * @param recommendedMaxNumberOfReturnedRecords hint to the visitor not to exceed this number of returned records
+	 *                                              per {@code nextEntries} call, it can still be exceeded
+	 *                                              by some smaller constant.
+	 * @return global iterator over state entries
+	 */
+	StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords);
+
+	/**
+	 * The state entry visitor which supports remove and update of the last returned entries.
+	 *
+	 * <p>The visitor should tolerate concurrent modifications.
+	 * It might trade this tolerance for consistency
+	 * and return duplicates or not all values (created while visiting)
+	 * but always state values which exist and up-to-date at the moment of calling {@code nextEntries()}.
+	 */
+	interface StateIncrementalVisitor<K, N, V> {
+		/** Whether the visitor potentially has some next entries to return from {@code nextEntries()}. */
+		boolean hasNext();
+
+		/**
+		 * Return some next entries which are available at the moment.
+		 *
+		 * <p>If empty collection is returned, it does not mean that the visitor is exhausted but
+		 * it means that the visitor has done some incremental work advancing and checking internal data structures.
+		 * The finished state of the visitor has to be checked by {@code hasNext()} method.
+		 *
+		 * <p>The returned collection and state values must not be changed internally
+		 * (there might be no defensive copies in {@code nextEntries()} for performance).
+		 * It has to be deeply copied if it is to modify, e.g. with the {@code update()} method.
+		 */
+		Collection<StateEntry<K, N, V>> nextEntries();
+
+		void remove(StateEntry<K, N, V> stateEntry);
+
+		/**
+		 * Update the value of the last returned entry from the {@code next()} method.
+		 *
+		 * @throws IllegalStateException if next() has never been called yet or iteration is over.
+		 */
+		void update(StateEntry<K, N, V> stateEntry, V newValue);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
index 5d1af8a..72c71b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
@@ -18,13 +18,15 @@
 
 package org.apache.flink.runtime.state.ttl;
 
-import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.SupplierWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 /**
  * Base class for TTL logic wrappers of state objects.
  *
@@ -39,9 +41,13 @@ abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N,
 	implements InternalKvState<K, N, SV> {
 	private final TypeSerializer<SV> valueSerializer;
 
-	AbstractTtlState(S original, StateTtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
-		super(original, config, timeProvider);
-		this.valueSerializer = valueSerializer;
+	/** This registered callback is to be called whenever state is accessed for read or write. */
+	final Runnable accessCallback;
+
+	AbstractTtlState(TtlStateContext<S, SV> ttlStateContext) {
+		super(ttlStateContext.original, ttlStateContext.config, ttlStateContext.timeProvider);
+		this.valueSerializer = ttlStateContext.valueSerializer;
+		this.accessCallback = ttlStateContext.accessCallback;
 	}
 
 	<SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
@@ -82,5 +88,19 @@ abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N,
 	@Override
 	public void clear() {
 		original.clear();
+		accessCallback.run();
+	}
+
+	/**
+	 * Check if state has expired or not and update it if it has partially expired.
+	 *
+	 * @return either non expired (possibly updated) state or null if the state has expired.
+	 */
+	@Nullable
+	public abstract TTLSV getUnexpiredOrNull(@Nonnull TTLSV ttlValue);
+
+	@Override
+	public StateIncrementalVisitor<K, N, SV> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
+		throw new UnsupportedOperationException();
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
index 94f489d..0eff635 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.state.ttl;
 
-import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.util.Collection;
 
 /**
@@ -39,26 +39,31 @@ class TtlAggregatingState<K, N, IN, ACC, OUT>
 	implements InternalAggregatingState<K, N, IN, ACC, OUT> {
 
 	TtlAggregatingState(
-		InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT> originalState,
-		StateTtlConfig config,
-		TtlTimeProvider timeProvider,
-		TypeSerializer<ACC> valueSerializer,
+		TtlStateContext<InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT>, ACC> ttlStateContext,
 		TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {
-		super(originalState, config, timeProvider, valueSerializer);
-		aggregateFunction.stateClear = originalState::clear;
-		aggregateFunction.updater = originalState::updateInternal;
+		super(ttlStateContext);
+		aggregateFunction.stateClear = ttlStateContext.original::clear;
+		aggregateFunction.updater = ttlStateContext.original::updateInternal;
 	}
 
 	@Override
 	public OUT get() throws Exception {
+		accessCallback.run();
 		return original.get();
 	}
 
 	@Override
 	public void add(IN value) throws Exception {
+		accessCallback.run();
 		original.add(value);
 	}
 
+	@Nullable
+	@Override
+	public TtlValue<ACC> getUnexpiredOrNull(@Nonnull TtlValue<ACC> ttlValue) {
+		return expired(ttlValue) ? null : ttlValue;
+	}
+
 	@Override
 	public void clear() {
 		original.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
index 4c64ae3..bf51bee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
@@ -19,10 +19,11 @@
 package org.apache.flink.runtime.state.ttl;
 
 import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 /**
  * This class wraps folding state with TTL logic.
  *
@@ -35,24 +36,28 @@ import org.apache.flink.runtime.state.internal.InternalFoldingState;
 class TtlFoldingState<K, N, T, ACC>
 	extends AbstractTtlState<K, N, ACC, TtlValue<ACC>, InternalFoldingState<K, N, T, TtlValue<ACC>>>
 	implements InternalFoldingState<K, N, T, ACC> {
-	TtlFoldingState(
-		InternalFoldingState<K, N, T, TtlValue<ACC>> originalState,
-		StateTtlConfig config,
-		TtlTimeProvider timeProvider,
-		TypeSerializer<ACC> valueSerializer) {
-		super(originalState, config, timeProvider, valueSerializer);
+	TtlFoldingState(TtlStateContext<InternalFoldingState<K, N, T, TtlValue<ACC>>, ACC> ttlStateContext) {
+		super(ttlStateContext);
 	}
 
 	@Override
 	public ACC get() throws Exception {
+		accessCallback.run();
 		return getInternal();
 	}
 
 	@Override
 	public void add(T value) throws Exception {
+		accessCallback.run();
 		original.add(value);
 	}
 
+	@Nullable
+	@Override
+	public TtlValue<ACC> getUnexpiredOrNull(@Nonnull TtlValue<ACC> ttlValue) {
+		return expired(ttlValue) ? null : ttlValue;
+	}
+
 	@Override
 	public void clear() {
 		original.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.java
new file mode 100644
index 0000000..7336120
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import java.util.Collection;
+
+/**
+ * Incremental cleanup of state with TTL.
+ *
+ * @param <K> type of state key
+ * @param <N> type of state namespace
+ */
+class TtlIncrementalCleanup<K, N, S> {
+	/** Global state entry iterator is advanced for {@code cleanupSize} entries. */
+	@Nonnegative
+	private final int cleanupSize;
+
+	/** Particular state with TTL object is used to check whether currently iterated entry has expired. */
+	private AbstractTtlState<K, N, ?, S, ?> ttlState;
+
+	/** Global state entry iterator, advanced for {@code cleanupSize} entries every state and/or record processing. */
+	private StateIncrementalVisitor<K, N, S> stateIterator;
+
+	/**
+	 * TtlIncrementalCleanup constructor.
+	 *
+	 * @param cleanupSize max number of queued keys to incrementally cleanup upon state access
+	 */
+	TtlIncrementalCleanup(@Nonnegative int cleanupSize) {
+		this.cleanupSize = cleanupSize;
+	}
+
+	void stateAccessed() {
+		initIteratorIfNot();
+		try {
+			runCleanup();
+		} catch (Throwable t) {
+			throw new FlinkRuntimeException("Failed to incrementally clean up state with TTL", t);
+		}
+	}
+
+	private void initIteratorIfNot() {
+		if (stateIterator == null || !stateIterator.hasNext()) {
+			stateIterator = ttlState.original.getStateIncrementalVisitor(cleanupSize);
+		}
+	}
+
+	private void runCleanup() {
+		int entryNum = 0;
+		Collection<StateEntry<K, N, S>> nextEntries;
+		while (
+			entryNum < cleanupSize &&
+			stateIterator.hasNext() &&
+			!(nextEntries = stateIterator.nextEntries()).isEmpty()) {
+
+			for (StateEntry<K, N, S> state : nextEntries) {
+				S cleanState = ttlState.getUnexpiredOrNull(state.getState());
+				if (cleanState == null) {
+					stateIterator.remove(state);
+				} else if (cleanState != state.getState()) {
+					stateIterator.update(state, cleanState);
+				}
+			}
+
+			entryNum += nextEntries.size();
+		}
+	}
+
+	/**
+	 * As TTL state wrapper depends on this class through access callback,
+	 * it has to be set here after its construction is done.
+	 */
+	public void setTtlState(@Nonnull AbstractTtlState<K, N, ?, S, ?> ttlState) {
+		this.ttlState = ttlState;
+	}
+
+	int getCleanupSize() {
+		return cleanupSize;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
index b384d76..919328f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.state.ttl;
 
-import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -40,32 +42,34 @@ import java.util.NoSuchElementException;
 class TtlListState<K, N, T> extends
 	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
 	implements InternalListState<K, N, T> {
-	TtlListState(
-		InternalListState<K, N, TtlValue<T>> originalState,
-		StateTtlConfig config,
-		TtlTimeProvider timeProvider,
-		TypeSerializer<List<T>> valueSerializer) {
-		super(originalState, config, timeProvider, valueSerializer);
+	TtlListState(TtlStateContext<InternalListState<K, N, TtlValue<T>>, List<T>> ttlStateContext) {
+		super(ttlStateContext);
 	}
 
 	@Override
 	public void update(List<T> values) throws Exception {
+		accessCallback.run();
 		updateInternal(values);
 	}
 
 	@Override
 	public void addAll(List<T> values) throws Exception {
+		accessCallback.run();
 		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
 		original.addAll(withTs(values));
 	}
 
 	@Override
 	public Iterable<T> get() throws Exception {
+		accessCallback.run();
 		Iterable<TtlValue<T>> ttlValue = original.get();
 		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
 		if (updateTsOnRead) {
 			List<TtlValue<T>> collected = collect(ttlValue);
 			ttlValue = collected;
+			// the underlying state in backend is iterated in updateTs anyways
+			// to avoid reiterating backend in IteratorWithCleanup
+			// it is collected and iterated next time in memory
 			updateTs(collected);
 		}
 		final Iterable<TtlValue<T>> finalResult = ttlValue;
@@ -87,10 +91,31 @@ class TtlListState<K, N, T> extends
 
 	@Override
 	public void add(T value) throws Exception {
+		accessCallback.run();
 		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
 		original.add(wrapWithTs(value));
 	}
 
+	@Nullable
+	@Override
+	public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>> ttlValues) {
+		long currentTimestamp = timeProvider.currentTimestamp();
+		List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
+		TypeSerializer<TtlValue<T>> elementSerializer =
+			((ListSerializer<TtlValue<T>>) original.getValueSerializer()).getElementSerializer();
+		for (TtlValue<T> ttlValue : ttlValues) {
+			if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) {
+				// we have to do the defensive copy to update the value
+				unexpired.add(elementSerializer.copy(ttlValue));
+			}
+		}
+		if (!unexpired.isEmpty()) {
+			return unexpired;
+		} else {
+			return ttlValues.size() == unexpired.size() ? ttlValues : unexpired;
+		}
+	}
+
 	@Override
 	public void clear() {
 		original.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index f84a2ee..c3f624a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.runtime.state.ttl;
 
-import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.AbstractMap;
 import java.util.Collections;
@@ -44,12 +45,8 @@ import java.util.function.Function;
 class TtlMapState<K, N, UK, UV>
 	extends AbstractTtlState<K, N, Map<UK, UV>, Map<UK, TtlValue<UV>>, InternalMapState<K, N, UK, TtlValue<UV>>>
 	implements InternalMapState<K, N, UK, UV> {
-	TtlMapState(
-		InternalMapState<K, N, UK, TtlValue<UV>> original,
-		StateTtlConfig config,
-		TtlTimeProvider timeProvider,
-		TypeSerializer<Map<UK, UV>> valueSerializer) {
-		super(original, config, timeProvider, valueSerializer);
+	TtlMapState(TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, Map<UK, UV>> ttlStateContext) {
+		super(ttlStateContext);
 	}
 
 	@Override
@@ -59,17 +56,20 @@ class TtlMapState<K, N, UK, UV>
 	}
 
 	private TtlValue<UV> getWrapped(UK key) throws Exception {
+		accessCallback.run();
 		return getWrappedWithTtlCheckAndUpdate(
 			() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
 	}
 
 	@Override
 	public void put(UK key, UV value) throws Exception {
+		accessCallback.run();
 		original.put(key, wrapWithTs(value));
 	}
 
 	@Override
 	public void putAll(Map<UK, UV> map) throws Exception {
+		accessCallback.run();
 		if (map == null) {
 			return;
 		}
@@ -84,6 +84,7 @@ class TtlMapState<K, N, UK, UV>
 
 	@Override
 	public void remove(UK key) throws Exception {
+		accessCallback.run();
 		original.remove(key);
 	}
 
@@ -100,6 +101,7 @@ class TtlMapState<K, N, UK, UV>
 
 	private <R> Iterable<R> entries(
 		Function<Map.Entry<UK, UV>, R> resultMapper) throws Exception {
+		accessCallback.run();
 		Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
 		return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper);
 	}
@@ -119,6 +121,21 @@ class TtlMapState<K, N, UK, UV>
 		return entries().iterator();
 	}
 
+	@Nullable
+	@Override
+	public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> ttlValue) {
+		Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
+		TypeSerializer<TtlValue<UV>> valueSerializer =
+			((MapSerializer<UK, TtlValue<UV>>) original.getValueSerializer()).getValueSerializer();
+		for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
+			if (!expired(e.getValue())) {
+				// we have to do the defensive copy to update the value
+				unexpired.put(e.getKey(), valueSerializer.copy(e.getValue()));
+			}
+		}
+		return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
+	}
+
 	@Override
 	public void clear() {
 		original.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
index 0710808..c3a8cf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.state.ttl;
 
-import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.util.Collection;
 
 /**
@@ -34,24 +34,28 @@ import java.util.Collection;
 class TtlReducingState<K, N, T>
 	extends AbstractTtlState<K, N, T, TtlValue<T>, InternalReducingState<K, N, TtlValue<T>>>
 	implements InternalReducingState<K, N, T> {
-	TtlReducingState(
-		InternalReducingState<K, N, TtlValue<T>> originalState,
-		StateTtlConfig config,
-		TtlTimeProvider timeProvider,
-		TypeSerializer<T> valueSerializer) {
-		super(originalState, config, timeProvider, valueSerializer);
+	TtlReducingState(TtlStateContext<InternalReducingState<K, N, TtlValue<T>>, T> tTtlStateContext) {
+		super(tTtlStateContext);
 	}
 
 	@Override
 	public T get() throws Exception {
+		accessCallback.run();
 		return getInternal();
 	}
 
 	@Override
 	public void add(T value) throws Exception {
+		accessCallback.run();
 		original.add(wrapWithTs(value));
 	}
 
+	@Nullable
+	@Override
+	public TtlValue<T> getUnexpiredOrNull(@Nonnull TtlValue<T> ttlValue) {
+		return expired(ttlValue) ? null : ttlValue;
+	}
+
 	@Override
 	public void clear() {
 		original.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java
similarity index 54%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java
index 7b19341..ca5f601 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java
@@ -20,35 +20,29 @@ package org.apache.flink.runtime.state.ttl;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.internal.InternalValueState;
 
-import java.io.IOException;
+class TtlStateContext<T, SV> {
+	/** Wrapped original state handler. */
+	final T original;
+	final StateTtlConfig config;
+	final TtlTimeProvider timeProvider;
 
-/**
- * This class wraps value state with TTL logic.
- *
- * @param <K> The type of key the state is associated to
- * @param <N> The type of the namespace
- * @param <T> Type of the user value of state with TTL
- */
-class TtlValueState<K, N, T>
-	extends AbstractTtlState<K, N, T, TtlValue<T>, InternalValueState<K, N, TtlValue<T>>>
-	implements InternalValueState<K, N, T> {
-	TtlValueState(
-		InternalValueState<K, N, TtlValue<T>> originalState,
-		StateTtlConfig config,
-		TtlTimeProvider timeProvider,
-		TypeSerializer<T> valueSerializer) {
-		super(originalState, config, timeProvider, valueSerializer);
-	}
+	/** Serializer of original user stored value without timestamp. */
+	final TypeSerializer<SV> valueSerializer;
 
-	@Override
-	public T value() throws IOException {
-		return getWithTtlCheckAndUpdate(original::value, original::update);
-	}
+	/** This registered callback is to be called whenever state is accessed for read or write. */
+	final Runnable accessCallback;
 
-	@Override
-	public void update(T value) throws IOException {
-		original.update(wrapWithTs(value));
+	TtlStateContext(
+		T original,
+		StateTtlConfig config,
+		TtlTimeProvider timeProvider,
+		TypeSerializer<SV> valueSerializer,
+		Runnable accessCallback) {
+		this.original = original;
+		this.config = config;
+		this.timeProvider = timeProvider;
+		this.valueSerializer = valueSerializer;
+		this.accessCallback = accessCallback;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 0a881c7..453f43a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -31,13 +31,15 @@ import org.apache.flink.api.common.typeutils.CompositeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -46,44 +48,52 @@ import java.util.stream.Stream;
 /**
  * This state factory wraps state objects, produced by backends, with TTL logic.
  */
-public class TtlStateFactory<N, SV, S extends State, IS extends S> {
-	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
+public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
+	public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
 		TypeSerializer<N> namespaceSerializer,
 		StateDescriptor<S, SV> stateDesc,
-		KeyedStateFactory originalStateFactory,
+		KeyedStateBackend<K> stateBackend,
 		TtlTimeProvider timeProvider) throws Exception {
 		Preconditions.checkNotNull(namespaceSerializer);
 		Preconditions.checkNotNull(stateDesc);
-		Preconditions.checkNotNull(originalStateFactory);
+		Preconditions.checkNotNull(stateBackend);
 		Preconditions.checkNotNull(timeProvider);
 		return  stateDesc.getTtlConfig().isEnabled() ?
-			new TtlStateFactory<N, SV, S, IS>(
-				namespaceSerializer, stateDesc, originalStateFactory, timeProvider)
+			new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
+				namespaceSerializer, stateDesc, stateBackend, timeProvider)
 				.createState() :
-			originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
+			stateBackend.createInternalState(namespaceSerializer, stateDesc);
 	}
 
 	private final Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories;
 
+	@Nonnull
 	private final TypeSerializer<N> namespaceSerializer;
+	@Nonnull
 	private final StateDescriptor<S, SV> stateDesc;
-	private final KeyedStateFactory originalStateFactory;
+	@Nonnull
+	private final KeyedStateBackend<K> stateBackend;
+	@Nonnull
 	private final StateTtlConfig ttlConfig;
+	@Nonnull
 	private final TtlTimeProvider timeProvider;
 	private final long ttl;
+	@Nullable
+	private final TtlIncrementalCleanup<K, N, TTLSV> incrementalCleanup;
 
 	private TtlStateFactory(
-		TypeSerializer<N> namespaceSerializer,
-		StateDescriptor<S, SV> stateDesc,
-		KeyedStateFactory originalStateFactory,
-		TtlTimeProvider timeProvider) {
+		@Nonnull TypeSerializer<N> namespaceSerializer,
+		@Nonnull StateDescriptor<S, SV> stateDesc,
+		@Nonnull KeyedStateBackend<K> stateBackend,
+		@Nonnull TtlTimeProvider timeProvider) {
 		this.namespaceSerializer = namespaceSerializer;
 		this.stateDesc = stateDesc;
-		this.originalStateFactory = originalStateFactory;
+		this.stateBackend = stateBackend;
 		this.ttlConfig = stateDesc.getTtlConfig();
 		this.timeProvider = timeProvider;
 		this.ttl = ttlConfig.getTtl().toMilliseconds();
 		this.stateFactories = createStateFactories();
+		this.incrementalCleanup = getTtlIncrementalCleanup();
 	}
 
 	@SuppressWarnings("deprecation")
@@ -98,6 +108,7 @@ public class TtlStateFactory<N, SV, S extends State, IS extends S> {
 		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
 	}
 
+	@SuppressWarnings("unchecked")
 	private IS createState() throws Exception {
 		SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
 		if (stateFactory == null) {
@@ -105,16 +116,18 @@ public class TtlStateFactory<N, SV, S extends State, IS extends S> {
 				stateDesc.getClass(), TtlStateFactory.class);
 			throw new FlinkRuntimeException(message);
 		}
-		return stateFactory.get();
+		IS state = stateFactory.get();
+		if (incrementalCleanup != null) {
+			incrementalCleanup.setTtlState((AbstractTtlState<K, N, ?, TTLSV, ?>) state);
+		}
+		return state;
 	}
 
 	@SuppressWarnings("unchecked")
 	private IS createValueState() throws Exception {
 		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
 			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
-		return (IS) new TtlValueState<>(
-			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
-			ttlConfig, timeProvider, stateDesc.getSerializer());
+		return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -122,10 +135,7 @@ public class TtlStateFactory<N, SV, S extends State, IS extends S> {
 		ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
 		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
 			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
-		return (IS) new TtlListState<>(
-			originalStateFactory.createInternalState(
-				namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
-			ttlConfig, timeProvider, listStateDesc.getSerializer());
+		return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -135,9 +145,7 @@ public class TtlStateFactory<N, SV, S extends State, IS extends S> {
 			stateDesc.getName(),
 			mapStateDesc.getKeySerializer(),
 			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
-		return (IS) new TtlMapState<>(
-			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
-			ttlConfig, timeProvider, mapStateDesc.getSerializer());
+		return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -147,9 +155,7 @@ public class TtlStateFactory<N, SV, S extends State, IS extends S> {
 			stateDesc.getName(),
 			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
 			new TtlSerializer<>(stateDesc.getSerializer()));
-		return (IS) new TtlReducingState<>(
-			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
-			ttlConfig, timeProvider, stateDesc.getSerializer());
+		return (IS) new TtlReducingState<>(createTtlStateContext(ttlDescriptor));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -160,9 +166,7 @@ public class TtlStateFactory<N, SV, S extends State, IS extends S> {
 			aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
 		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
 			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
-		return (IS) new TtlAggregatingState<>(
-			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
-			ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
+		return (IS) new TtlAggregatingState<>(createTtlStateContext(ttlDescriptor), ttlAggregateFunction);
 	}
 
 	@SuppressWarnings({"deprecation", "unchecked"})
@@ -175,9 +179,47 @@ public class TtlStateFactory<N, SV, S extends State, IS extends S> {
 			ttlInitAcc,
 			new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
 			new TtlSerializer<>(stateDesc.getSerializer()));
-		return (IS) new TtlFoldingState<>(
-			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
-			ttlConfig, timeProvider, stateDesc.getSerializer());
+		return (IS) new TtlFoldingState<>(createTtlStateContext(ttlDescriptor));
+	}
+
+	@SuppressWarnings("unchecked")
+	private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V>
+		createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDescriptor) throws Exception {
+
+		OIS originalState = (OIS) stateBackend.createInternalState(
+			namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
+		return new TtlStateContext<>(
+			originalState, ttlConfig, timeProvider, (TypeSerializer<V>) stateDesc.getSerializer(),
+			registerTtlIncrementalCleanupCallback((InternalKvState<?, ?, ?>) originalState));
+	}
+
+	private TtlIncrementalCleanup<K, N, TTLSV> getTtlIncrementalCleanup() {
+		StateTtlConfig.IncrementalCleanupStrategy config =
+			ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
+		return config != null ? new TtlIncrementalCleanup<>(config.getCleanupSize()) : null;
+	}
+
+	private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> originalState) {
+		StateTtlConfig.IncrementalCleanupStrategy config =
+			ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
+		boolean cleanupConfigured = config != null && incrementalCleanup != null;
+		boolean isCleanupActive = cleanupConfigured &&
+			isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
+		Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { };
+		if (isCleanupActive && config.runCleanupForEveryRecord()) {
+			stateBackend.registerKeySelectionListener(stub -> callback.run());
+		}
+		return callback;
+	}
+
+	private boolean isStateIteratorSupported(InternalKvState<?, ?, ?> originalState, int size) {
+		boolean stateIteratorSupported = false;
+		try {
+			stateIteratorSupported = originalState.getStateIncrementalVisitor(size) != null;
+		} catch (Throwable t) {
+			// ignore
+		}
+		return stateIteratorSupported;
 	}
 
 	private StateSnapshotTransformFactory<?> getSnapshotTransformFactory() {
@@ -194,10 +236,12 @@ public class TtlStateFactory<N, SV, S extends State, IS extends S> {
 	public static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> {
 		private static final long serialVersionUID = 131020282727167064L;
 
+		@SuppressWarnings("WeakerAccess")
 		public TtlSerializer(TypeSerializer<T> userValueSerializer) {
 			super(true, LongSerializer.INSTANCE, userValueSerializer);
 		}
 
+		@SuppressWarnings("WeakerAccess")
 		public TtlSerializer(PrecomputedParameters precomputed, TypeSerializer<?> ... fieldSerializers) {
 			super(precomputed, fieldSerializers);
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
index 7b19341..54fb501 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.state.ttl;
 
-import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -34,21 +34,25 @@ import java.io.IOException;
 class TtlValueState<K, N, T>
 	extends AbstractTtlState<K, N, T, TtlValue<T>, InternalValueState<K, N, TtlValue<T>>>
 	implements InternalValueState<K, N, T> {
-	TtlValueState(
-		InternalValueState<K, N, TtlValue<T>> originalState,
-		StateTtlConfig config,
-		TtlTimeProvider timeProvider,
-		TypeSerializer<T> valueSerializer) {
-		super(originalState, config, timeProvider, valueSerializer);
+	TtlValueState(TtlStateContext<InternalValueState<K, N, TtlValue<T>>, T> tTtlStateContext) {
+		super(tTtlStateContext);
 	}
 
 	@Override
 	public T value() throws IOException {
+		accessCallback.run();
 		return getWithTtlCheckAndUpdate(original::value, original::update);
 	}
 
 	@Override
 	public void update(T value) throws IOException {
+		accessCallback.run();
 		original.update(wrapWithTs(value));
 	}
+
+	@Nullable
+	@Override
+	public TtlValue<T> getUnexpiredOrNull(@Nonnull TtlValue<T> ttlValue) {
+		return expired(ttlValue) ? null : ttlValue;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index 82d89e0..6a9e543 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -323,6 +323,11 @@ public class KvStateRegistryTest extends TestLogger {
 		}
 
 		@Override
+		public StateIncrementalVisitor<Integer, VoidNamespace, String> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public void clear() {
 			// noop
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index ba79004..3dd4fdb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -28,8 +28,10 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateEntry;
 import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
@@ -208,6 +210,9 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 				}
 			};
 
+		StateIncrementalVisitor<Integer, Integer, ArrayList<Integer>> updatingIterator =
+			stateTable.getStateIncrementalVisitor(5);
+
 		// the main loop for modifications
 		for (int i = 0; i < 10_000_000; ++i) {
 
@@ -215,7 +220,7 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 			int namespace = random.nextInt(4);
 			Tuple2<Integer, Integer> compositeKey = new Tuple2<>(key, namespace);
 
-			int op = random.nextInt(7);
+			int op = random.nextInt(10);
 
 			ArrayList<Integer> state = null;
 			ArrayList<Integer> referenceState = null;
@@ -260,6 +265,18 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 						referenceMap.remove(compositeKey), updateValue));
 					break;
 				}
+				case 7:
+				case 8:
+				case 9:
+					if (!updatingIterator.hasNext()) {
+						updatingIterator = stateTable.getStateIncrementalVisitor(5);
+						if (!updatingIterator.hasNext()) {
+							break;
+						}
+					}
+					testStateIteratorWithUpdate(
+						updatingIterator, stateTable, referenceMap, op == 8, op == 9);
+					break;
 				default: {
 					Assert.fail("Unknown op-code " + op);
 				}
@@ -316,6 +333,40 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 	}
 
 	/**
+	 * Test operations specific for StateIncrementalVisitor in {@code testRandomModificationsAndCopyOnWriteIsolation()}.
+	 *
+	 * <p>Check next, update and remove during global iteration of StateIncrementalVisitor.
+	 */
+	private static void testStateIteratorWithUpdate(
+		StateIncrementalVisitor<Integer, Integer, ArrayList<Integer>> updatingIterator,
+		CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable,
+		HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> referenceMap,
+		boolean update, boolean remove) {
+
+		for (StateEntry<Integer, Integer, ArrayList<Integer>> stateEntry : updatingIterator.nextEntries()) {
+			Integer key = stateEntry.getKey();
+			Integer namespace = stateEntry.getNamespace();
+			Tuple2<Integer, Integer> compositeKey = new Tuple2<>(key, namespace);
+			Assert.assertEquals(referenceMap.get(compositeKey), stateEntry.getState());
+
+			if (update) {
+				ArrayList<Integer> newState = new ArrayList<>(stateEntry.getState());
+				if (!newState.isEmpty()) {
+					newState.remove(0);
+				}
+				updatingIterator.update(stateEntry, newState);
+				referenceMap.put(compositeKey, new ArrayList<>(newState));
+				Assert.assertEquals(newState, stateTable.get(key, namespace));
+			}
+
+			if (remove) {
+				updatingIterator.remove(stateEntry);
+				referenceMap.remove(compositeKey);
+			}
+		}
+	}
+
+	/**
 	 * This tests for the copy-on-write contracts, e.g. ensures that no copy-on-write is active after all snapshots are
 	 * released.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
index 41ef0c9..d7fd3b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateEntry;
 import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapAsyncSnapshotTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapAsyncSnapshotTtlStateTest.java
index 02e09cf..0c9c943 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapAsyncSnapshotTtlStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapAsyncSnapshotTtlStateTest.java
@@ -32,4 +32,9 @@ public class HeapAsyncSnapshotTtlStateTest extends TtlStateTestBase {
 			}
 		};
 	}
+
+	@Override
+	protected boolean incrementalCleanupSupported() {
+		return true;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapSyncSnapshotTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapSyncSnapshotTtlStateTest.java
index c2c0cd2..d9fbedf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapSyncSnapshotTtlStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapSyncSnapshotTtlStateTest.java
@@ -32,4 +32,9 @@ public class HeapSyncSnapshotTtlStateTest extends TtlStateTestBase {
 			}
 		};
 	}
+
+	@Override
+	protected boolean incrementalCleanupSupported() {
+		return true;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
index c1de3cb..7c9f848 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
@@ -90,13 +90,18 @@ public abstract class StateBackendTestContext {
 
 	@Nonnull
 	KeyedStateHandle takeSnapshot() throws Exception {
+		return triggerSnapshot().get().getJobManagerOwnedSnapshot();
+	}
+
+	@Nonnull
+	RunnableFuture<SnapshotResult<KeyedStateHandle>> triggerSnapshot() throws Exception {
 		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture =
 			keyedStateBackend.snapshot(682375462392L, 10L,
 				checkpointStorageLocation, CheckpointOptions.forCheckpointWithDefaultLocation());
 		if (!snapshotRunnableFuture.isDone()) {
 			snapshotRunnableFuture.run();
 		}
-		return snapshotRunnableFuture.get().getJobManagerOwnedSnapshot();
+		return snapshotRunnableFuture;
 	}
 
 	void restoreSnapshot(@Nullable KeyedStateHandle snapshot) throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index 9b6882a..aa80f60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.StateMigrationException;
 
@@ -35,17 +37,21 @@ import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.RunnableFuture;
 import java.util.function.Consumer;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assume.assumeThat;
+import static org.junit.Assume.assumeTrue;
 
 /** State TTL base test suite. */
 @RunWith(Parameterized.class)
 public abstract class TtlStateTestBase {
 	private static final long TTL = 100;
+	private static final int INC_CLEANUP_ALL_KEYS =
+		(CopyOnWriteStateTable.DEFAULT_CAPACITY >> 1) + (CopyOnWriteStateTable.DEFAULT_CAPACITY >> 2) + 1;
 
 	private MockTtlTimeProvider timeProvider;
 	private StateBackendTestContext sbetc;
@@ -127,13 +133,20 @@ public abstract class TtlStateTestBase {
 	}
 
 	private void takeAndRestoreSnapshot() throws Exception {
-		KeyedStateHandle snapshot = sbetc.takeSnapshot();
+		restoreSnapshot(sbetc.takeSnapshot());
+	}
+
+	private void restoreSnapshot(KeyedStateHandle snapshot) throws Exception {
 		sbetc.createAndRestoreKeyedStateBackend();
 		sbetc.restoreSnapshot(snapshot);
 		sbetc.setCurrentKey("defaultKey");
 		createState();
 	}
 
+	protected boolean incrementalCleanupSupported() {
+		return false;
+	}
+
 	@Test
 	public void testNonExistentValue() throws Exception {
 		initTest();
@@ -403,6 +416,92 @@ public abstract class TtlStateTestBase {
 		sbetc.createState(ctx().createStateDescriptor(), "");
 	}
 
+	@Test
+	public void testIncrementalCleanup() throws Exception {
+		assumeTrue(incrementalCleanupSupported());
+
+		initTest(getConfBuilder(TTL).cleanupIncrementally(5, true).build());
+
+		final int keysToUpdate = CopyOnWriteStateTable.DEFAULT_CAPACITY >> 3;
+
+		timeProvider.time = 0;
+		// create enough keys to trigger incremental rehash
+		updateKeys(0, INC_CLEANUP_ALL_KEYS, ctx().updateEmpty);
+
+		timeProvider.time = 50;
+		// update some
+		updateKeys(0, keysToUpdate, ctx().updateUnexpired);
+
+		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture = sbetc.triggerSnapshot();
+
+		// update more concurrently with snapshotting
+		updateKeys(keysToUpdate, keysToUpdate * 2, ctx().updateUnexpired);
+		timeProvider.time = 120; // expire rest
+		triggerMoreIncrementalCleanupByOtherOps();
+		// check rest expired and cleanup updated
+		checkExpiredKeys(keysToUpdate * 2, INC_CLEANUP_ALL_KEYS);
+
+		KeyedStateHandle snapshot = snapshotRunnableFuture.get().getJobManagerOwnedSnapshot();
+		// restore snapshot which should discard concurrent updates
+		timeProvider.time = 50;
+		restoreSnapshot(snapshot);
+
+		// check rest unexpired, also after restore which should discard concurrent updates
+		checkUnexpiredKeys(keysToUpdate, INC_CLEANUP_ALL_KEYS, ctx().getUpdateEmpty);
+
+		timeProvider.time = 120;
+
+		// remove some
+		for (int i = keysToUpdate >> 1; i < keysToUpdate + (keysToUpdate >> 2); i++) {
+			sbetc.setCurrentKey(Integer.toString(i));
+			ctx().ttlState.clear();
+		}
+		// check updated not expired
+		checkUnexpiredKeys(0, keysToUpdate >> 1, ctx().getUnexpired);
+		triggerMoreIncrementalCleanupByOtherOps();
+		// check that concurrently updated and then restored with original values are expired
+		checkExpiredKeys(keysToUpdate, keysToUpdate *2);
+
+		timeProvider.time = 170;
+		// check rest expired and cleanup updated
+		checkExpiredKeys(keysToUpdate >> 1, INC_CLEANUP_ALL_KEYS);
+		// check updated expired
+		checkExpiredKeys(0, keysToUpdate >> 1);
+	}
+
+	private <T> void updateKeys(int startKey, int endKey, T value) throws Exception {
+		for (int i = startKey; i < endKey; i++) {
+			sbetc.setCurrentKey(Integer.toString(i));
+			ctx().update(value);
+		}
+	}
+
+	private <T> void checkUnexpiredKeys(int startKey, int endKey, T value) throws Exception {
+		for (int i = startKey; i < endKey; i++) {
+			sbetc.setCurrentKey(Integer.toString(i));
+			assertEquals("Unexpired state should be available", value, ctx().get());
+		}
+	}
+
+	private void checkExpiredKeys(int startKey, int endKey) throws Exception {
+		for (int i = startKey; i < endKey; i++) {
+			sbetc.setCurrentKey(Integer.toString(i));
+			assertEquals("Original state should be cleared", ctx().emptyValue, ctx().getOriginal());
+		}
+	}
+
+	private void triggerMoreIncrementalCleanupByOtherOps() throws Exception {
+		// trigger more cleanup by doing something out side of INC_CLEANUP_ALL_KEYS
+		for (int i = INC_CLEANUP_ALL_KEYS; i < INC_CLEANUP_ALL_KEYS * 2; i++) {
+			sbetc.setCurrentKey(Integer.toString(i));
+			if (i / 2 == 0) {
+				ctx().get();
+			} else {
+				ctx().update(ctx().updateEmpty);
+			}
+		}
+	}
+
 	@After
 	public void tearDown() {
 		sbetc.disposeKeyedStateBackend();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
index 3f94669..ae5b336 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
@@ -85,4 +85,9 @@ abstract class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T>
 	private Map<Object, Object> getCurrentKeyValues() {
 		return values.get();
 	}
+
+	@Override
+	public StateIncrementalVisitor<K, N, T> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
+		throw new UnsupportedOperationException();
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
index 9a899f4..907a1e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 
 /** mack state backend. */
 public class MockStateBackend extends AbstractStateBackend {
@@ -70,17 +71,17 @@ public class MockStateBackend extends AbstractStateBackend {
 				return new CheckpointStorageLocation() {
 
 					@Override
-					public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) {
+					public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
 						return null;
 					}
 
 					@Override
-					public CheckpointMetadataOutputStream createMetadataOutputStream() {
+					public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
 						return null;
 					}
 
 					@Override
-					public void disposeOnFailure() {
+					public void disposeOnFailure() throws IOException {
 
 					}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index fb19a8a..4ac912c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -235,4 +235,9 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 			return null;
 		}
 	}
+
+	@Override
+	public StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
+		throw new UnsupportedOperationException("Global state entry iterator is unsupported for RocksDb backend");
+	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
index 7a817eb..039fba9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
@@ -101,7 +101,10 @@ public class RocksDBNativeMetricMonitorTest {
 	}
 
 	@Test
-	public void testReturnsUnsigned() {
+	public void testReturnsUnsigned() throws Throwable {
+		RocksDBResource localRocksDBResource = new RocksDBResource();
+		localRocksDBResource.before();
+
 		SimpleMetricRegistry registry = new SimpleMetricRegistry();
 		GenericMetricGroup group = new GenericMetricGroup(
 			registry,
@@ -113,17 +116,20 @@ public class RocksDBNativeMetricMonitorTest {
 		options.enableSizeAllMemTables();
 
 		RocksDBNativeMetricMonitor monitor = new RocksDBNativeMetricMonitor(
-			null,
+			localRocksDBResource.getRocksDB(),
 			options,
 			group
 		);
 
-		monitor.registerColumnFamily(COLUMN_FAMILY_NAME, null);
+		ColumnFamilyHandle handle = rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
+		monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
 		RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0);
 
 		view.setValue(-1);
 		BigInteger result = view.getValue();
 
+		localRocksDBResource.after();
+
 		Assert.assertEquals("Failed to interpret RocksDB result as an unsigned long", 1, result.signum());
 	}