You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2023/12/26 05:45:04 UTC

(flink) 01/02: [FLINK-30535] Customize TtlTimeProvider in state benchmarks

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8c2f9f8f6824137d14715be9ff3c714d565da86a
Author: Zakelly <za...@gmail.com>
AuthorDate: Sun Dec 24 15:13:05 2023 +0800

    [FLINK-30535] Customize TtlTimeProvider in state benchmarks
---
 .../benchmark/StateBackendBenchmarkUtils.java      | 37 ++++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)

diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
index 09c15a7eb14..63199d9b4ca 100644
--- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
@@ -83,33 +83,42 @@ public class StateBackendBenchmarkUtils {
     private static File rootDir;
 
     public static KeyedStateBackend<Long> createKeyedStateBackend(
-            StateBackendType backendType, File baseDir) throws IOException {
+            StateBackendType backendType, File baseDir, TtlTimeProvider ttlTimeProvider)
+            throws IOException {
         switch (backendType) {
             case HEAP:
                 rootDir = prepareDirectory(rootDirName, baseDir);
-                return createHeapKeyedStateBackend(rootDir);
+                return createHeapKeyedStateBackend(rootDir, ttlTimeProvider);
             case ROCKSDB:
                 rootDir = prepareDirectory(rootDirName, baseDir);
-                return createRocksDBKeyedStateBackend(rootDir);
+                return createRocksDBKeyedStateBackend(rootDir, ttlTimeProvider);
             case HEAP_CHANGELOG:
                 rootDir = prepareDirectory(rootDirName, baseDir);
-                return createChangelogKeyedStateBackend(createHeapKeyedStateBackend(rootDir));
+                return createChangelogKeyedStateBackend(
+                        createHeapKeyedStateBackend(rootDir, ttlTimeProvider));
             case ROCKSDB_CHANGELOG:
                 rootDir = prepareDirectory(rootDirName, baseDir);
-                return createChangelogKeyedStateBackend(createRocksDBKeyedStateBackend(rootDir));
+                return createChangelogKeyedStateBackend(
+                        createRocksDBKeyedStateBackend(rootDir, ttlTimeProvider));
             case BATCH_EXECUTION:
-                return createBatchExecutionStateBackend();
+                return createBatchExecutionStateBackend(ttlTimeProvider);
             default:
                 throw new IllegalArgumentException("Unknown backend type: " + backendType);
         }
     }
 
+    public static KeyedStateBackend<Long> createKeyedStateBackend(
+            StateBackendType backendType, File baseDir) throws IOException {
+        return createKeyedStateBackend(backendType, baseDir, TtlTimeProvider.DEFAULT);
+    }
+
     public static KeyedStateBackend<Long> createKeyedStateBackend(StateBackendType backendType)
             throws IOException {
         return createKeyedStateBackend(backendType, null);
     }
 
-    private static CheckpointableKeyedStateBackend<Long> createBatchExecutionStateBackend() {
+    private static CheckpointableKeyedStateBackend<Long> createBatchExecutionStateBackend(
+            TtlTimeProvider ttlTimeProvider) {
         return new BatchExecutionStateBackend()
                 .createKeyedStateBackend(
                         MockEnvironment.builder().build(),
@@ -119,7 +128,7 @@ public class StateBackendBenchmarkUtils {
                         2,
                         new KeyGroupRange(0, 1),
                         null,
-                        TtlTimeProvider.DEFAULT,
+                        ttlTimeProvider,
                         new UnregisteredMetricsGroup(),
                         Collections.emptyList(),
                         null);
@@ -154,8 +163,8 @@ public class StateBackendBenchmarkUtils {
                         new Path(cpPathFile.getPath()), null, new JobID(), 1024, 4096));
     }
 
-    private static RocksDBKeyedStateBackend<Long> createRocksDBKeyedStateBackend(File rootDir)
-            throws IOException {
+    private static RocksDBKeyedStateBackend<Long> createRocksDBKeyedStateBackend(
+            File rootDir, TtlTimeProvider ttlTimeProvider) throws IOException {
         File recoveryBaseDir = prepareDirectory(recoveryDirName, rootDir);
         File dbPathFile = prepareDirectory(dbDirName, rootDir);
         ExecutionConfig executionConfig = new ExecutionConfig();
@@ -175,7 +184,7 @@ public class StateBackendBenchmarkUtils {
                         new LocalRecoveryConfig(null),
                         RocksDBPriorityQueueConfig.buildWithPriorityQueueType(
                                 EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB),
-                        TtlTimeProvider.DEFAULT,
+                        ttlTimeProvider,
                         LatencyTrackingStateConfig.disabled(),
                         new UnregisteredMetricsGroup(),
                         Collections.emptyList(),
@@ -189,8 +198,8 @@ public class StateBackendBenchmarkUtils {
         }
     }
 
-    private static HeapKeyedStateBackend<Long> createHeapKeyedStateBackend(File rootDir)
-            throws IOException {
+    private static HeapKeyedStateBackend<Long> createHeapKeyedStateBackend(
+            File rootDir, TtlTimeProvider ttlTimeProvider) throws IOException {
         File recoveryBaseDir = prepareDirectory(recoveryDirName, rootDir);
         KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
         int numberOfKeyGroups = keyGroupRange.getNumberOfKeyGroups();
@@ -205,7 +214,7 @@ public class StateBackendBenchmarkUtils {
                         numberOfKeyGroups,
                         keyGroupRange,
                         executionConfig,
-                        TtlTimeProvider.DEFAULT,
+                        ttlTimeProvider,
                         LatencyTrackingStateConfig.disabled(),
                         Collections.emptyList(),
                         AbstractStateBackend.getCompressionDecorator(executionConfig),