You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2023/12/26 05:45:04 UTC
(flink) 01/02: [FLINK-30535] Customize TtlTimeProvider in state benchmarks
This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8c2f9f8f6824137d14715be9ff3c714d565da86a
Author: Zakelly <za...@gmail.com>
AuthorDate: Sun Dec 24 15:13:05 2023 +0800
[FLINK-30535] Customize TtlTimeProvider in state benchmarks
---
.../benchmark/StateBackendBenchmarkUtils.java | 37 ++++++++++++++--------
1 file changed, 23 insertions(+), 14 deletions(-)
diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
index 09c15a7eb14..63199d9b4ca 100644
--- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java
@@ -83,33 +83,42 @@ public class StateBackendBenchmarkUtils {
private static File rootDir;
public static KeyedStateBackend<Long> createKeyedStateBackend(
- StateBackendType backendType, File baseDir) throws IOException {
+ StateBackendType backendType, File baseDir, TtlTimeProvider ttlTimeProvider)
+ throws IOException {
switch (backendType) {
case HEAP:
rootDir = prepareDirectory(rootDirName, baseDir);
- return createHeapKeyedStateBackend(rootDir);
+ return createHeapKeyedStateBackend(rootDir, ttlTimeProvider);
case ROCKSDB:
rootDir = prepareDirectory(rootDirName, baseDir);
- return createRocksDBKeyedStateBackend(rootDir);
+ return createRocksDBKeyedStateBackend(rootDir, ttlTimeProvider);
case HEAP_CHANGELOG:
rootDir = prepareDirectory(rootDirName, baseDir);
- return createChangelogKeyedStateBackend(createHeapKeyedStateBackend(rootDir));
+ return createChangelogKeyedStateBackend(
+ createHeapKeyedStateBackend(rootDir, ttlTimeProvider));
case ROCKSDB_CHANGELOG:
rootDir = prepareDirectory(rootDirName, baseDir);
- return createChangelogKeyedStateBackend(createRocksDBKeyedStateBackend(rootDir));
+ return createChangelogKeyedStateBackend(
+ createRocksDBKeyedStateBackend(rootDir, ttlTimeProvider));
case BATCH_EXECUTION:
- return createBatchExecutionStateBackend();
+ return createBatchExecutionStateBackend(ttlTimeProvider);
default:
throw new IllegalArgumentException("Unknown backend type: " + backendType);
}
}
+ public static KeyedStateBackend<Long> createKeyedStateBackend(
+ StateBackendType backendType, File baseDir) throws IOException {
+ return createKeyedStateBackend(backendType, baseDir, TtlTimeProvider.DEFAULT);
+ }
+
public static KeyedStateBackend<Long> createKeyedStateBackend(StateBackendType backendType)
throws IOException {
return createKeyedStateBackend(backendType, null);
}
- private static CheckpointableKeyedStateBackend<Long> createBatchExecutionStateBackend() {
+ private static CheckpointableKeyedStateBackend<Long> createBatchExecutionStateBackend(
+ TtlTimeProvider ttlTimeProvider) {
return new BatchExecutionStateBackend()
.createKeyedStateBackend(
MockEnvironment.builder().build(),
@@ -119,7 +128,7 @@ public class StateBackendBenchmarkUtils {
2,
new KeyGroupRange(0, 1),
null,
- TtlTimeProvider.DEFAULT,
+ ttlTimeProvider,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
null);
@@ -154,8 +163,8 @@ public class StateBackendBenchmarkUtils {
new Path(cpPathFile.getPath()), null, new JobID(), 1024, 4096));
}
- private static RocksDBKeyedStateBackend<Long> createRocksDBKeyedStateBackend(File rootDir)
- throws IOException {
+ private static RocksDBKeyedStateBackend<Long> createRocksDBKeyedStateBackend(
+ File rootDir, TtlTimeProvider ttlTimeProvider) throws IOException {
File recoveryBaseDir = prepareDirectory(recoveryDirName, rootDir);
File dbPathFile = prepareDirectory(dbDirName, rootDir);
ExecutionConfig executionConfig = new ExecutionConfig();
@@ -175,7 +184,7 @@ public class StateBackendBenchmarkUtils {
new LocalRecoveryConfig(null),
RocksDBPriorityQueueConfig.buildWithPriorityQueueType(
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB),
- TtlTimeProvider.DEFAULT,
+ ttlTimeProvider,
LatencyTrackingStateConfig.disabled(),
new UnregisteredMetricsGroup(),
Collections.emptyList(),
@@ -189,8 +198,8 @@ public class StateBackendBenchmarkUtils {
}
}
- private static HeapKeyedStateBackend<Long> createHeapKeyedStateBackend(File rootDir)
- throws IOException {
+ private static HeapKeyedStateBackend<Long> createHeapKeyedStateBackend(
+ File rootDir, TtlTimeProvider ttlTimeProvider) throws IOException {
File recoveryBaseDir = prepareDirectory(recoveryDirName, rootDir);
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
int numberOfKeyGroups = keyGroupRange.getNumberOfKeyGroups();
@@ -205,7 +214,7 @@ public class StateBackendBenchmarkUtils {
numberOfKeyGroups,
keyGroupRange,
executionConfig,
- TtlTimeProvider.DEFAULT,
+ ttlTimeProvider,
LatencyTrackingStateConfig.disabled(),
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),