You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/03/04 09:58:03 UTC

[flink] branch release-1.8 updated: [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 60d9b96  [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job
60d9b96 is described below

commit 60d9b96456f142f8d18d5882016840a00159403e
Author: Aitozi <10...@qq.com>
AuthorDate: Wed Sep 4 22:59:09 2019 +0800

    [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job
    
    This closes #8479.
---
 .../streaming/state/RocksDBStateBackend.java       | 32 +++++++++++++++-----
 .../state/RocksDBStateBackendConfigTest.java       | 34 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 8 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index c5604e5..cfac3b2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -147,7 +147,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	private TernaryBoolean enableTtlCompactionFilter;
 
 	/** This determines the type of priority queue state. */
-	private final PriorityQueueStateType priorityQueueStateType;
+	@Nullable
+	private PriorityQueueStateType priorityQueueStateType;
 
 	/** The default rocksdb metrics options. */
 	private final RocksDBNativeMetricOptions defaultMetricOptions;
@@ -265,8 +266,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
 		this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
 		this.numberOfTransferingThreads = UNDEFINED_NUMBER_OF_TRANSFERING_THREADS;
-		// for now, we use still the heap-based implementation as default
-		this.priorityQueueStateType = PriorityQueueStateType.HEAP;
 		this.defaultMetricOptions = new RocksDBNativeMetricOptions();
 		this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
 	}
@@ -314,10 +313,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		this.enableTtlCompactionFilter = original.enableTtlCompactionFilter
 			.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));
 
-		final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY);
-
-		this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
-			PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
+		if (null == original.priorityQueueStateType) {
+			this.priorityQueueStateType = config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY);
+		} else {
+			this.priorityQueueStateType = original.priorityQueueStateType;
+		}
 
 		// configure local directories
 		if (original.localRocksDbDirectories != null) {
@@ -507,7 +507,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 			keyGroupRange,
 			executionConfig,
 			localRecoveryConfig,
-			priorityQueueStateType,
+			getPriorityQueueStateType(),
 			ttlTimeProvider,
 			metricGroup,
 			stateHandles,
@@ -716,6 +716,22 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		enableTtlCompactionFilter = TernaryBoolean.TRUE;
 	}
 
+	/**
+	 * Gets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set.
+	 * @return The type of the priority queue state.
+	 */
+	public PriorityQueueStateType getPriorityQueueStateType() {
+		return priorityQueueStateType == null ?
+			PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : priorityQueueStateType;
+	}
+
+	/**
+	 * Sets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set.
+	 */
+	public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType) {
+		this.priorityQueueStateType = checkNotNull(priorityQueueStateType);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Parametrize with RocksDB Options
 	// ------------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index c00ffdb..d4ef2e3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -32,6 +32,8 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -60,6 +62,7 @@ import java.io.IOException;
 import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertArrayEquals;
@@ -172,6 +175,37 @@ public class RocksDBStateBackendConfigTest {
 		keyedBackend.dispose();
 	}
 
+	/**
+	 * Validates that user custom configuration from code should override the flink-conf.yaml.
+	 */
+	@Test
+	public void testConfigureTimerServiceLoadingFromApplication() throws Exception {
+		final MockEnvironment env = new MockEnvironmentBuilder().build();
+
+		// priorityQueueStateType of the job backend
+		final RocksDBStateBackend backend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+		backend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.HEAP);
+
+		// priorityQueueStateType in the cluster config
+		final Configuration configFromConfFile = new Configuration();
+		configFromConfFile.setString(
+			RocksDBOptions.TIMER_SERVICE_FACTORY,
+			RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
+
+		// configure final backend from job and cluster config
+		final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure(
+			configFromConfFile,
+			Thread.currentThread().getContextClassLoader());
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env);
+
+		// priorityQueueStateType of the job backend should be preserved
+		assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class));
+
+		keyedBackend.close();
+		keyedBackend.dispose();
+		env.close();
+	}
+
 	@Test
 	public void testStoragePathWithFilePrefix() throws Exception {
 		final File folder = tempFolder.newFolder();