You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/03/04 09:58:03 UTC
[flink] branch release-1.8 updated: [FLINK-11193][State] Fix Rockdb
timer service factory configuration option to be settable per job
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 60d9b96 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job
60d9b96 is described below
commit 60d9b96456f142f8d18d5882016840a00159403e
Author: Aitozi <10...@qq.com>
AuthorDate: Wed Sep 4 22:59:09 2019 +0800
[FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job
This closes #8479.
---
.../streaming/state/RocksDBStateBackend.java | 32 +++++++++++++++-----
.../state/RocksDBStateBackendConfigTest.java | 34 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 8 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index c5604e5..cfac3b2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -147,7 +147,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
private TernaryBoolean enableTtlCompactionFilter;
/** This determines the type of priority queue state. */
- private final PriorityQueueStateType priorityQueueStateType;
+ @Nullable
+ private PriorityQueueStateType priorityQueueStateType;
/** The default rocksdb metrics options. */
private final RocksDBNativeMetricOptions defaultMetricOptions;
@@ -265,8 +266,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
this.numberOfTransferingThreads = UNDEFINED_NUMBER_OF_TRANSFERING_THREADS;
- // for now, we use still the heap-based implementation as default
- this.priorityQueueStateType = PriorityQueueStateType.HEAP;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
}
@@ -314,10 +313,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
this.enableTtlCompactionFilter = original.enableTtlCompactionFilter
.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));
- final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY);
-
- this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
- PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
+ if (null == original.priorityQueueStateType) {
+ this.priorityQueueStateType = config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY);
+ } else {
+ this.priorityQueueStateType = original.priorityQueueStateType;
+ }
// configure local directories
if (original.localRocksDbDirectories != null) {
@@ -507,7 +507,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
keyGroupRange,
executionConfig,
localRecoveryConfig,
- priorityQueueStateType,
+ getPriorityQueueStateType(),
ttlTimeProvider,
metricGroup,
stateHandles,
@@ -716,6 +716,22 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
enableTtlCompactionFilter = TernaryBoolean.TRUE;
}
+ /**
+ * Gets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set.
+ * @return The type of the priority queue state.
+ */
+ public PriorityQueueStateType getPriorityQueueStateType() {
+ return priorityQueueStateType == null ?
+ PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : priorityQueueStateType;
+ }
+
+ /**
+ * Sets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set.
+ */
+ public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType) {
+ this.priorityQueueStateType = checkNotNull(priorityQueueStateType);
+ }
+
// ------------------------------------------------------------------------
// Parametrize with RocksDB Options
// ------------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index c00ffdb..d4ef2e3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -32,6 +32,8 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -60,6 +62,7 @@ import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertArrayEquals;
@@ -172,6 +175,37 @@ public class RocksDBStateBackendConfigTest {
keyedBackend.dispose();
}
+ /**
+ * Validates that user custom configuration from code should override the flink-conf.yaml.
+ */
+ @Test
+ public void testConfigureTimerServiceLoadingFromApplication() throws Exception {
+ final MockEnvironment env = new MockEnvironmentBuilder().build();
+
+ // priorityQueueStateType of the job backend
+ final RocksDBStateBackend backend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+ backend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.HEAP);
+
+ // priorityQueueStateType in the cluster config
+ final Configuration configFromConfFile = new Configuration();
+ configFromConfFile.setString(
+ RocksDBOptions.TIMER_SERVICE_FACTORY,
+ RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
+
+ // configure final backend from job and cluster config
+ final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure(
+ configFromConfFile,
+ Thread.currentThread().getContextClassLoader());
+ final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env);
+
+ // priorityQueueStateType of the job backend should be preserved
+ assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class));
+
+ keyedBackend.close();
+ keyedBackend.dispose();
+ env.close();
+ }
+
@Test
public void testStoragePathWithFilePrefix() throws Exception {
final File folder = tempFolder.newFolder();