You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 21:06:37 UTC

[8/8] flink git commit: [hotfix] Consolidate RocksDB configuration options in RocksDBOptions

[hotfix] Consolidate RocksDB configuration options in RocksDBOptions

Rename from backend.rocksdb.priority_queue_state_type into state.backend.rocksdb.timer-service.impl


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4b4cb70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4b4cb70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4b4cb70

Branch: refs/heads/master
Commit: a4b4cb70a5a26acd31294041c8a15479e69ef3bc
Parents: a88d6ef
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 09:29:37 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200

----------------------------------------------------------------------
 .../generated/checkpointing_configuration.html  |  5 ---
 .../generated/rocks_db_configuration.html       | 21 +++++++++
 .../configuration/CheckpointingOptions.java     | 11 -----
 flink-docs/pom.xml                              |  5 +++
 .../ConfigOptionsDocGenerator.java              |  3 +-
 .../streaming/state/RockDBBackendOptions.java   | 38 ----------------
 .../contrib/streaming/state/RocksDBOptions.java | 47 ++++++++++++++++++++
 .../streaming/state/RocksDBStateBackend.java    |  6 +--
 .../state/RocksDBStateBackendFactoryTest.java   |  6 +--
 9 files changed, 81 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/docs/_includes/generated/checkpointing_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html
index 7a2791f..8f5ce7b 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -33,11 +33,6 @@
             <td></td>
         </tr>
         <tr>
-            <td><h5>state.backend.rocksdb.localdir</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
-        </tr>
-        <tr>
             <td><h5>state.checkpoints.dir</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).</td>

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/docs/_includes/generated/rocks_db_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rocks_db_configuration.html b/docs/_includes/generated/rocks_db_configuration.html
new file mode 100644
index 0000000..57b9511
--- /dev/null
+++ b/docs/_includes/generated/rocks_db_configuration.html
@@ -0,0 +1,21 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>state.backend.rocksdb.localdir</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.timer-service.impl</h5></td>
+            <td style="word-wrap: break-word;">"HEAP"</td>
+            <td>This determines the timer service implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index 60b7613..6557a9f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -115,15 +115,4 @@ public class CheckpointingOptions {
 			.defaultValue(1024)
 			.withDescription("The minimum size of state data files. All state chunks smaller than that are stored" +
 				" inline in the root checkpoint metadata file.");
-
-	// ------------------------------------------------------------------------
-	//  Options specific to the RocksDB state backend
-	// ------------------------------------------------------------------------
-
-	/** The local directory (on the TaskManager) where RocksDB puts its files. */
-	public static final ConfigOption<String> ROCKSDB_LOCAL_DIRECTORIES = ConfigOptions
-			.key("state.backend.rocksdb.localdir")
-			.noDefaultValue()
-			.withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
-			.withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 2135dea..b7709f5 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -83,6 +83,11 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 
 		<dependency>
 			<groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 953122f..d333719 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -59,7 +59,8 @@ public class ConfigOptionsDocGenerator {
 		new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
-		new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus")
+		new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
+		new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state")
 	};
 
 	static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
deleted file mode 100644
index ede45e3..0000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-/**
- * Configuration options for the RocksDB backend.
- */
-public class RockDBBackendOptions {
-
-	/**
-	 * Choice of implementation for priority queue state (e.g. timers).
-	 */
-	public static final ConfigOption<String> PRIORITY_QUEUE_STATE_TYPE = ConfigOptions
-		.key("backend.rocksdb.priority_queue_state_type")
-		.defaultValue(RocksDBStateBackend.PriorityQueueStateType.HEAP.name())
-		.withDescription("This determines the implementation for the priority queue state (e.g. timers). Options are" +
-			"either " + RocksDBStateBackend.PriorityQueueStateType.HEAP.name() + " (heap-based, default) or " +
-			RocksDBStateBackend.PriorityQueueStateType.ROCKS.name() + " for in implementation based on RocksDB.");
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
new file mode 100644
index 0000000..37eb6cf
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.HEAP;
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKS;
+
+/**
+ * Configuration options for the RocksDB backend.
+ */
+public class RocksDBOptions {
+
+	/** The local directory (on the TaskManager) where RocksDB puts its files. */
+	public static final ConfigOption<String> LOCAL_DIRECTORIES = ConfigOptions
+		.key("state.backend.rocksdb.localdir")
+		.noDefaultValue()
+		.withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
+		.withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");
+
+	/**
+	 * Choice of timer service implementation.
+	 */
+	public static final ConfigOption<String> TIMER_SERVICE_IMPL = ConfigOptions
+		.key("state.backend.rocksdb.timer-service.impl")
+		.defaultValue(HEAP.name())
+		.withDescription(String.format("This determines the timer service implementation. Options are either %s " +
+			"(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKS.name()));
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 1794e17..58e8de6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -60,7 +60,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
-import static org.apache.flink.contrib.streaming.state.RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE;
+import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_IMPL;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -271,7 +271,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
 			config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
 
-		final String priorityQueueTypeString = config.getString(PRIORITY_QUEUE_STATE_TYPE.key(), "");
+		final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL.key(), "");
 
 		this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
 			PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
@@ -281,7 +281,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 			this.localRocksDbDirectories = original.localRocksDbDirectories;
 		}
 		else {
-			final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
+			final String rocksdbLocalPaths = config.getString(RocksDBOptions.LOCAL_DIRECTORIES);
 			if (rocksdbLocalPaths != null) {
 				String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
index 7612c4c..9e16dea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
@@ -82,14 +82,14 @@ public class RocksDBStateBackendFactoryTest {
 		config1.setString(backendKey, "rocksdb");
 		config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
 		config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
-		config1.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+		config1.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
 		config1.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
 
 		final Configuration config2 = new Configuration();
 		config2.setString(backendKey, RocksDBStateBackendFactory.class.getName());
 		config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
 		config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
-		config2.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+		config2.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
 		config2.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
 
 		StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
@@ -143,7 +143,7 @@ public class RocksDBStateBackendFactoryTest {
 		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
 		config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, !incremental);  // this should not be picked up
-		config.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDir3 + ":" + localDir4);  // this should not be picked up
+		config.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDir3 + ":" + localDir4);  // this should not be picked up
 
 		final StateBackend loadedBackend =
 				StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);