You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2024/01/04 11:13:56 UTC

(flink-benchmarks) branch master updated (e0d922c -> 9fb3482)

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


    from e0d922c  [hotfix] Relocate the HighAvailabilityServiceBenchmark (#82)
     new 9da39ce  [FLINK-30535] Introduce TTL state based benchmarks
     new 9fb3482  [FLINK-30535] Use customized TtlTimeProvider

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/state/benchmark/StateBenchmarkBase.java  |  25 +++--
 .../state/benchmark/StateBenchmarkConstants.java   |  28 +++---
 .../TtlListStateBenchmark.java}                    |  34 ++++---
 .../TtlMapStateBenchmark.java}                     |  72 ++++----------
 .../state/benchmark/ttl/TtlStateBenchmarkBase.java | 108 +++++++++++++++++++++
 .../TtlValueStateBenchmark.java}                   |  18 ++--
 6 files changed, 185 insertions(+), 100 deletions(-)
 copy src/main/java/org/apache/flink/state/benchmark/{ListStateBenchmark.java => ttl/TtlListStateBenchmark.java} (80%)
 copy src/main/java/org/apache/flink/state/benchmark/{MapStateBenchmark.java => ttl/TtlMapStateBenchmark.java} (64%)
 create mode 100644 src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
 copy src/main/java/org/apache/flink/state/benchmark/{ValueStateBenchmark.java => ttl/TtlValueStateBenchmark.java} (82%)


(flink-benchmarks) 02/02: [FLINK-30535] Use customized TtlTimeProvider

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 9fb3482dc772cf19c87bd9e6d4a1709ada8ace65
Author: Zakelly <za...@gmail.com>
AuthorDate: Wed Dec 27 19:43:22 2023 +0800

    [FLINK-30535] Use customized TtlTimeProvider
---
 .../flink/state/benchmark/StateBenchmarkBase.java  |  5 ++
 .../state/benchmark/ttl/TtlListStateBenchmark.java | 14 ++--
 .../state/benchmark/ttl/TtlMapStateBenchmark.java  |  7 ++
 .../state/benchmark/ttl/TtlStateBenchmarkBase.java | 80 +++++++++++++++++++---
 .../benchmark/ttl/TtlValueStateBenchmark.java      |  8 +++
 5 files changed, 98 insertions(+), 16 deletions(-)

diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
index 6b5e89e..99e9c48 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.config.StateBenchmarkOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
@@ -62,6 +63,10 @@ public class StateBenchmarkBase extends BenchmarkBase {
     protected KeyedStateBackend<Long> keyedStateBackend;
 
     protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception {
+        return createKeyedStateBackend(TtlTimeProvider.DEFAULT);
+    }
+
+    protected KeyedStateBackend<Long> createKeyedStateBackend(TtlTimeProvider ttlTimeProvider) throws Exception {
         Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
         String stateDataDirPath = benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
         File dataDir = null;
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
index 4853c87..1977cf6 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
@@ -46,8 +46,7 @@ import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyC
 /** Implementation for list state benchmark testing. */
 public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
     private final String STATE_NAME = "listState";
-    private final ListStateDescriptor<Long> STATE_DESC =
-            configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
+    private ListStateDescriptor<Long> stateDesc;
     private ListState<Long> listState;
     private List<Long> dummyLists;
 
@@ -64,7 +63,8 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
     @Setup
     public void setUp() throws Exception {
         keyedStateBackend = createKeyedStateBackend();
-        listState = getListState(keyedStateBackend, STATE_DESC);
+        stateDesc = configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
+        listState = getListState(keyedStateBackend, stateDesc);
         dummyLists = new ArrayList<>(listValueCount);
         for (int i = 0; i < listValueCount; ++i) {
             dummyLists.add(random.nextLong());
@@ -76,6 +76,7 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
     public void setUpPerIteration() throws Exception {
         for (int i = 0; i < setupKeyCount; ++i) {
             keyedStateBackend.setCurrentKey((long) i);
+            setTtlWhenInitialization();
             listState.add(random.nextLong());
         }
         // make sure only one sst file left, so all get invocation will access this single file,
@@ -84,15 +85,16 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
         if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
             RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
                     (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
-            compactState(rocksDBKeyedStateBackend, STATE_DESC);
+            compactState(rocksDBKeyedStateBackend, stateDesc);
         }
+        advanceTimePerIteration();
     }
 
     @TearDown(Level.Iteration)
     public void tearDownPerIteration() throws Exception {
         applyToAllKeys(
                 keyedStateBackend,
-                STATE_DESC,
+                stateDesc,
                 (k, state) -> {
                     keyedStateBackend.setCurrentKey(k);
                     state.clear();
@@ -101,7 +103,7 @@ public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
         if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
             RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
                     (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
-            compactState(rocksDBKeyedStateBackend, STATE_DESC);
+            compactState(rocksDBKeyedStateBackend, stateDesc);
         } else {
             System.gc();
         }
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
index ec2ccb1..772a103 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.state.benchmark.StateBenchmarkBase;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.infra.Blackhole;
@@ -70,12 +71,18 @@ public class TtlMapStateBenchmark extends TtlStateBenchmarkBase {
         for (int i = 0; i < setupKeyCount; ++i) {
             keyedStateBackend.setCurrentKey((long) i);
             for (int j = 0; j < mapKeyCount; j++) {
+                setTtlWhenInitialization();
                 mapState.put(mapKeys.get(j), random.nextDouble());
             }
         }
         keyIndex = new AtomicInteger();
     }
 
+    @Setup(Level.Iteration)
+    public void setUpPerIteration() throws Exception {
+        advanceTimePerIteration();
+    }
+
     @Benchmark
     public void mapUpdate(StateBenchmarkBase.KeyValue keyValue) throws Exception {
         keyedStateBackend.setCurrentKey(keyValue.setUpKey);
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
index 12d62e9..45d28d2 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
@@ -1,8 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.state.benchmark.ttl;
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.state.benchmark.StateBenchmarkBase;
 import org.openjdk.jmh.annotations.Param;
 
@@ -11,38 +31,78 @@ import java.util.concurrent.TimeUnit;
 /** The base class for state tests with ttl. */
 public class TtlStateBenchmarkBase extends StateBenchmarkBase {
 
+    private static final long initialTime = 1000000;
+
     /** The expired time of ttl. */
     public enum ExpiredTimeOptions {
 
-        /** 5 seconds. */
-        Seconds5(5000),
+        /** Expire 3 percent of the initial keys per iteration. */
+        Expire3PercentPerIteration(3),
 
         /** never expired but enable the ttl. */
-        MaxTime(Long.MAX_VALUE);
+        NeverExpired(0);
 
-        private Time time;
-        ExpiredTimeOptions(long mills) {
-            time = Time.of(mills, TimeUnit.MILLISECONDS);
+        public long advanceTimePerIteration;
+        ExpiredTimeOptions(int expirePercentPerIteration) {
+            this.advanceTimePerIteration = initialTime * expirePercentPerIteration / 100;
         }
     }
 
-    @Param({"Seconds5", "MaxTime"})
-    protected ExpiredTimeOptions expiredTime;
+    @Param({"Expire3PercentPerIteration", "NeverExpired"})
+    protected ExpiredTimeOptions expiredOption;
 
     @Param({"OnCreateAndWrite", "OnReadAndWrite"})
     protected StateTtlConfig.UpdateType updateType;
 
-    @Param({"ReturnExpiredIfNotCleanedUp", "NeverReturnExpired"})
+    @Param({"NeverReturnExpired", "ReturnExpiredIfNotCleanedUp"})
     protected StateTtlConfig.StateVisibility stateVisibility;
 
+    protected ControllableTtlTimeProvider timeProvider;
+
     /** Configure the state descriptor with ttl. */
     protected <T extends StateDescriptor<?, ?>> T configTtl(T stateDescriptor) {
         StateTtlConfig ttlConfig =
-                new StateTtlConfig.Builder(expiredTime.time)
+                new StateTtlConfig.Builder(Time.of(initialTime, TimeUnit.MILLISECONDS))
                         .setUpdateType(updateType)
                         .setStateVisibility(stateVisibility)
                         .build();
         stateDescriptor.enableTimeToLive(ttlConfig);
         return stateDescriptor;
     }
+
+    @Override
+    protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception {
+        timeProvider = new ControllableTtlTimeProvider();
+        return createKeyedStateBackend(timeProvider);
+    }
+
+    protected void setTtlWhenInitialization() {
+        timeProvider.setCurrentTimestamp(random.nextLong(initialTime + 1));
+    }
+
+    protected void finishInitialization() {
+        timeProvider.setCurrentTimestamp(initialTime);
+    }
+
+    protected void advanceTimePerIteration() {
+        timeProvider.advanceTimestamp(expiredOption.advanceTimePerIteration);
+    }
+
+    static class ControllableTtlTimeProvider implements TtlTimeProvider {
+
+        long current = 0L;
+
+        @Override
+        public long currentTimestamp() {
+            return current;
+        }
+
+        public void setCurrentTimestamp(long value) {
+            current = value;
+        }
+
+        public void advanceTimestamp(long value) {
+            current += value;
+        }
+    }
 }
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
index 7b3ca27..ee34cfb 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
@@ -21,6 +21,7 @@ package org.apache.flink.state.benchmark.ttl;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.RunnerException;
@@ -53,10 +54,17 @@ public class TtlValueStateBenchmark extends TtlStateBenchmarkBase {
         keyedStateBackend = createKeyedStateBackend();
         valueState = getValueState(keyedStateBackend, configTtl(new ValueStateDescriptor<>("kvState", Long.class)));
         for (int i = 0; i < setupKeyCount; ++i) {
+            setTtlWhenInitialization();
             keyedStateBackend.setCurrentKey((long) i);
             valueState.update(random.nextLong());
         }
         keyIndex = new AtomicInteger();
+        finishInitialization();
+    }
+
+    @Setup(Level.Iteration)
+    public void setUpPerIteration() throws Exception {
+        advanceTimePerIteration();
     }
 
     @Benchmark


(flink-benchmarks) 01/02: [FLINK-30535] Introduce TTL state based benchmarks

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 9da39cec93d65b0ef9770080f2823a6d140f1d2c
Author: Zakelly <za...@gmail.com>
AuthorDate: Sat Dec 16 19:19:55 2023 +0800

    [FLINK-30535] Introduce TTL state based benchmarks
---
 .../flink/state/benchmark/StateBenchmarkBase.java  |  20 +--
 .../state/benchmark/StateBenchmarkConstants.java   |  28 ++--
 .../state/benchmark/ttl/TtlListStateBenchmark.java | 150 +++++++++++++++++++++
 .../state/benchmark/ttl/TtlMapStateBenchmark.java  | 120 +++++++++++++++++
 .../state/benchmark/ttl/TtlStateBenchmarkBase.java |  48 +++++++
 .../benchmark/ttl/TtlValueStateBenchmark.java      |  79 +++++++++++
 6 files changed, 421 insertions(+), 24 deletions(-)

diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
index bc8bdb9..6b5e89e 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
@@ -53,13 +53,13 @@ import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeys
 /** Base implementation of the state benchmarks. */
 public class StateBenchmarkBase extends BenchmarkBase {
     // TODO: why AtomicInteger?
-    static AtomicInteger keyIndex;
-    final ThreadLocalRandom random = ThreadLocalRandom.current();
+    protected static AtomicInteger keyIndex;
+    protected final ThreadLocalRandom random = ThreadLocalRandom.current();
 
     @Param({"HEAP", "ROCKSDB", "ROCKSDB_CHANGELOG"})
-    private StateBackendBenchmarkUtils.StateBackendType backendType;
+    protected StateBackendBenchmarkUtils.StateBackendType backendType;
 
-    KeyedStateBackend<Long> keyedStateBackend;
+    protected KeyedStateBackend<Long> keyedStateBackend;
 
     protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception {
         Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
@@ -89,12 +89,12 @@ public class StateBenchmarkBase extends BenchmarkBase {
 
     @State(Scope.Thread)
     public static class KeyValue {
-        long newKey;
-        long setUpKey;
-        long mapKey;
-        double mapValue;
-        long value;
-        List<Long> listValue;
+        public long newKey;
+        public long setUpKey;
+        public long mapKey;
+        public double mapValue;
+        public long value;
+        public List<Long> listValue;
 
         @Setup(Level.Invocation)
         public void kvSetup() {
diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
index 1bb9eed..c0a141f 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
@@ -26,22 +26,22 @@ import java.util.Random;
  * Constants for state benchmark tests. Also generates random keys/values in advance to avoid
  * possible affect of using {@link Random#nextLong()}
  */
-class StateBenchmarkConstants {
+public class StateBenchmarkConstants {
     // TODO: why all of those static fields? Those should be inside a context class
-    static final int mapKeyCount = 10;
-    static final int listValueCount = 100;
-    static final int setupKeyCount = 500_000;
-    static final String rootDirName = "benchmark";
-    static final String recoveryDirName = "localRecovery";
-    static final String dbDirName = "dbPath";
+    public static final int mapKeyCount = 10;
+    public static final int listValueCount = 100;
+    public static final int setupKeyCount = 500_000;
+    public static final String rootDirName = "benchmark";
+    public static final String recoveryDirName = "localRecovery";
+    public static final String dbDirName = "dbPath";
 
-    static final ArrayList<Long> mapKeys = new ArrayList<>(mapKeyCount);
-    static final ArrayList<Double> mapValues = new ArrayList<>(mapKeyCount);
-    static final ArrayList<Long> setupKeys = new ArrayList<>(setupKeyCount);
-    static final int newKeyCount = 500_000;
-    static final ArrayList<Long> newKeys = new ArrayList<>(newKeyCount);
-    static final int randomValueCount = 1_000_000;
-    static final ArrayList<Long> randomValues = new ArrayList<>(randomValueCount);
+    public static final ArrayList<Long> mapKeys = new ArrayList<>(mapKeyCount);
+    public static final ArrayList<Double> mapValues = new ArrayList<>(mapKeyCount);
+    public static final ArrayList<Long> setupKeys = new ArrayList<>(setupKeyCount);
+    public static final int newKeyCount = 500_000;
+    public static final ArrayList<Long> newKeys = new ArrayList<>(newKeyCount);
+    public static final int randomValueCount = 1_000_000;
+    public static final ArrayList<Long> randomValues = new ArrayList<>(randomValueCount);
 
     static {
         for (int i = 0; i < mapKeyCount; i++) {
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
new file mode 100644
index 0000000..4853c87
--- /dev/null
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
+import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState;
+import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+
+/** Implementation for list state benchmark testing. */
+public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
+    private final String STATE_NAME = "listState";
+    private final ListStateDescriptor<Long> STATE_DESC =
+            configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
+    private ListState<Long> listState;
+    private List<Long> dummyLists;
+
+    public static void main(String[] args) throws RunnerException {
+        Options opt =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(".*" + TtlListStateBenchmark.class.getCanonicalName() + ".*")
+                        .build();
+
+        new Runner(opt).run();
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+        keyedStateBackend = createKeyedStateBackend();
+        listState = getListState(keyedStateBackend, STATE_DESC);
+        dummyLists = new ArrayList<>(listValueCount);
+        for (int i = 0; i < listValueCount; ++i) {
+            dummyLists.add(random.nextLong());
+        }
+        keyIndex = new AtomicInteger();
+    }
+
+    @Setup(Level.Iteration)
+    public void setUpPerIteration() throws Exception {
+        for (int i = 0; i < setupKeyCount; ++i) {
+            keyedStateBackend.setCurrentKey((long) i);
+            listState.add(random.nextLong());
+        }
+        // make sure only one sst file left, so all get invocation will access this single file,
+        // to prevent the spike caused by different key distribution in multiple sst files,
+        // the more access to the older sst file, the lower throughput will be.
+        if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
+            RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
+                    (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
+            compactState(rocksDBKeyedStateBackend, STATE_DESC);
+        }
+    }
+
+    @TearDown(Level.Iteration)
+    public void tearDownPerIteration() throws Exception {
+        applyToAllKeys(
+                keyedStateBackend,
+                STATE_DESC,
+                (k, state) -> {
+                    keyedStateBackend.setCurrentKey(k);
+                    state.clear();
+                });
+        // make the clearance effective, trigger compaction for RocksDB, and GC for heap.
+        if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
+            RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
+                    (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
+            compactState(rocksDBKeyedStateBackend, STATE_DESC);
+        } else {
+            System.gc();
+        }
+        // wait a while for the clearance to take effect.
+        Thread.sleep(1000);
+    }
+
+    @Benchmark
+    public void listUpdate(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        listState.update(keyValue.listValue);
+    }
+
+    @Benchmark
+    public void listAdd(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.newKey);
+        listState.update(keyValue.listValue);
+    }
+
+    @Benchmark
+    public void listAppend(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        listState.add(keyValue.value);
+    }
+
+    @Benchmark
+    public Iterable<Long> listGet(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        return listState.get();
+    }
+
+    @Benchmark
+    public void listGetAndIterate(StateBenchmarkBase.KeyValue keyValue, Blackhole bh) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        Iterable<Long> iterable = listState.get();
+        for (Long value : iterable) {
+            bh.consume(value);
+        }
+    }
+
+    @Benchmark
+    public void listAddAll(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        listState.addAll(dummyLists);
+    }
+}
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
new file mode 100644
index 0000000..ec2ccb1
--- /dev/null
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getMapState;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+
+/** Implementation for map state benchmark testing. */
+public class TtlMapStateBenchmark extends TtlStateBenchmarkBase {
+    private MapState<Long, Double> mapState;
+    private Map<Long, Double> dummyMaps;
+
+    public static void main(String[] args) throws RunnerException {
+        Options opt =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(".*" + TtlMapStateBenchmark.class.getCanonicalName() + ".*")
+                        .build();
+
+        new Runner(opt).run();
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+        keyedStateBackend = createKeyedStateBackend();
+        mapState =
+                getMapState(
+                        keyedStateBackend,
+                        configTtl(new MapStateDescriptor<>("mapState", Long.class, Double.class)));
+        dummyMaps = new HashMap<>(mapKeyCount);
+        for (int i = 0; i < mapKeyCount; ++i) {
+            dummyMaps.put(mapKeys.get(i), random.nextDouble());
+        }
+        for (int i = 0; i < setupKeyCount; ++i) {
+            keyedStateBackend.setCurrentKey((long) i);
+            for (int j = 0; j < mapKeyCount; j++) {
+                mapState.put(mapKeys.get(j), random.nextDouble());
+            }
+        }
+        keyIndex = new AtomicInteger();
+    }
+
+    @Benchmark
+    public void mapUpdate(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        mapState.put(keyValue.mapKey, keyValue.mapValue);
+    }
+
+    @Benchmark
+    public void mapAdd(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.newKey);
+        mapState.put(keyValue.mapKey, keyValue.mapValue);
+    }
+
+    @Benchmark
+    public Double mapGet(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        return mapState.get(keyValue.mapKey);
+    }
+
+    @Benchmark
+    public boolean mapIsEmpty(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        return mapState.isEmpty();
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(mapKeyCount)
+    public void mapIterator(StateBenchmarkBase.KeyValue keyValue, Blackhole bh) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        Iterator<Map.Entry<Long, Double>> iterator = mapState.iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Long, Double> entry = iterator.next();
+            bh.consume(entry.getKey());
+            bh.consume(entry.getValue());
+        }
+    }
+
+    @Benchmark
+    public void mapPutAll(StateBenchmarkBase.KeyValue keyValue) throws Exception {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        mapState.putAll(dummyMaps);
+    }
+}
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
new file mode 100644
index 0000000..12d62e9
--- /dev/null
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
@@ -0,0 +1,48 @@
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Param;
+
+import java.util.concurrent.TimeUnit;
+
+/** The base class for state tests with ttl. */
+public class TtlStateBenchmarkBase extends StateBenchmarkBase {
+
+    /** The expired time of ttl. */
+    public enum ExpiredTimeOptions {
+
+        /** 5 seconds. */
+        Seconds5(5000),
+
+        /** never expired but enable the ttl. */
+        MaxTime(Long.MAX_VALUE);
+
+        private Time time;
+        ExpiredTimeOptions(long mills) {
+            time = Time.of(mills, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Param({"Seconds5", "MaxTime"})
+    protected ExpiredTimeOptions expiredTime;
+
+    @Param({"OnCreateAndWrite", "OnReadAndWrite"})
+    protected StateTtlConfig.UpdateType updateType;
+
+    @Param({"ReturnExpiredIfNotCleanedUp", "NeverReturnExpired"})
+    protected StateTtlConfig.StateVisibility stateVisibility;
+
+    /** Configure the state descriptor with ttl. */
+    protected <T extends StateDescriptor<?, ?>> T configTtl(T stateDescriptor) {
+        StateTtlConfig ttlConfig =
+                new StateTtlConfig.Builder(expiredTime.time)
+                        .setUpdateType(updateType)
+                        .setStateVisibility(stateVisibility)
+                        .build();
+        stateDescriptor.enableTimeToLive(ttlConfig);
+        return stateDescriptor;
+    }
+}
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
new file mode 100644
index 0000000..7b3ca27
--- /dev/null
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getValueState;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+
+/** Implementation for listValue state benchmark testing. */
+public class TtlValueStateBenchmark extends TtlStateBenchmarkBase {
+    private ValueState<Long> valueState;
+
+    public static void main(String[] args) throws RunnerException {
+        Options opt =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(".*" + TtlValueStateBenchmark.class.getCanonicalName() + ".*")
+                        .build();
+
+        new Runner(opt).run();
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+        keyedStateBackend = createKeyedStateBackend();
+        valueState = getValueState(keyedStateBackend, configTtl(new ValueStateDescriptor<>("kvState", Long.class)));
+        for (int i = 0; i < setupKeyCount; ++i) {
+            keyedStateBackend.setCurrentKey((long) i);
+            valueState.update(random.nextLong());
+        }
+        keyIndex = new AtomicInteger();
+    }
+
+    @Benchmark
+    public void valueUpdate(KeyValue keyValue) throws IOException {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        valueState.update(keyValue.value);
+    }
+
+    @Benchmark
+    public void valueAdd(KeyValue keyValue) throws IOException {
+        keyedStateBackend.setCurrentKey(keyValue.newKey);
+        valueState.update(keyValue.value);
+    }
+
+    @Benchmark
+    public Long valueGet(KeyValue keyValue) throws IOException {
+        keyedStateBackend.setCurrentKey(keyValue.setUpKey);
+        return valueState.value();
+    }
+}