You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 21:06:37 UTC
[8/8] flink git commit: [hotfix] Consolidate RocksDB configuration
options in RocksDBOptions
[hotfix] Consolidate RocksDB configuration options in RocksDBOptions
Rename from backend.rocksdb.priority_queue_state_type into state.backend.rocksdb.timer-service.impl
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4b4cb70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4b4cb70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4b4cb70
Branch: refs/heads/master
Commit: a4b4cb70a5a26acd31294041c8a15479e69ef3bc
Parents: a88d6ef
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 09:29:37 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200
----------------------------------------------------------------------
.../generated/checkpointing_configuration.html | 5 ---
.../generated/rocks_db_configuration.html | 21 +++++++++
.../configuration/CheckpointingOptions.java | 11 -----
flink-docs/pom.xml | 5 +++
.../ConfigOptionsDocGenerator.java | 3 +-
.../streaming/state/RockDBBackendOptions.java | 38 ----------------
.../contrib/streaming/state/RocksDBOptions.java | 47 ++++++++++++++++++++
.../streaming/state/RocksDBStateBackend.java | 6 +--
.../state/RocksDBStateBackendFactoryTest.java | 6 +--
9 files changed, 81 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/docs/_includes/generated/checkpointing_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html
index 7a2791f..8f5ce7b 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -33,11 +33,6 @@
<td></td>
</tr>
<tr>
- <td><h5>state.backend.rocksdb.localdir</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
- </tr>
- <tr>
<td><h5>state.checkpoints.dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/docs/_includes/generated/rocks_db_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rocks_db_configuration.html b/docs/_includes/generated/rocks_db_configuration.html
new file mode 100644
index 0000000..57b9511
--- /dev/null
+++ b/docs/_includes/generated/rocks_db_configuration.html
@@ -0,0 +1,21 @@
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 65%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>state.backend.rocksdb.localdir</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.timer-service.impl</h5></td>
+ <td style="word-wrap: break-word;">"HEAP"</td>
+ <td>This determines the timer service implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB.</td>
+ </tr>
+ </tbody>
+</table>
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index 60b7613..6557a9f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -115,15 +115,4 @@ public class CheckpointingOptions {
.defaultValue(1024)
.withDescription("The minimum size of state data files. All state chunks smaller than that are stored" +
" inline in the root checkpoint metadata file.");
-
- // ------------------------------------------------------------------------
- // Options specific to the RocksDB state backend
- // ------------------------------------------------------------------------
-
- /** The local directory (on the TaskManager) where RocksDB puts its files. */
- public static final ConfigOption<String> ROCKSDB_LOCAL_DIRECTORIES = ConfigOptions
- .key("state.backend.rocksdb.localdir")
- .noDefaultValue()
- .withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
- .withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 2135dea..b7709f5 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -83,6 +83,11 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 953122f..d333719 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -59,7 +59,8 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
- new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus")
+ new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
+ new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state")
};
static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
deleted file mode 100644
index ede45e3..0000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-/**
- * Configuration options for the RocksDB backend.
- */
-public class RockDBBackendOptions {
-
- /**
- * Choice of implementation for priority queue state (e.g. timers).
- */
- public static final ConfigOption<String> PRIORITY_QUEUE_STATE_TYPE = ConfigOptions
- .key("backend.rocksdb.priority_queue_state_type")
- .defaultValue(RocksDBStateBackend.PriorityQueueStateType.HEAP.name())
- .withDescription("This determines the implementation for the priority queue state (e.g. timers). Options are" +
- "either " + RocksDBStateBackend.PriorityQueueStateType.HEAP.name() + " (heap-based, default) or " +
- RocksDBStateBackend.PriorityQueueStateType.ROCKS.name() + " for in implementation based on RocksDB.");
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
new file mode 100644
index 0000000..37eb6cf
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.HEAP;
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKS;
+
+/**
+ * Configuration options for the RocksDB backend.
+ */
+public class RocksDBOptions {
+
+ /** The local directory (on the TaskManager) where RocksDB puts its files. */
+ public static final ConfigOption<String> LOCAL_DIRECTORIES = ConfigOptions
+ .key("state.backend.rocksdb.localdir")
+ .noDefaultValue()
+ .withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
+ .withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");
+
+ /**
+ * Choice of timer service implementation.
+ */
+ public static final ConfigOption<String> TIMER_SERVICE_IMPL = ConfigOptions
+ .key("state.backend.rocksdb.timer-service.impl")
+ .defaultValue(HEAP.name())
+ .withDescription(String.format("This determines the timer service implementation. Options are either %s " +
+ "(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKS.name()));
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 1794e17..58e8de6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -60,7 +60,7 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
-import static org.apache.flink.contrib.streaming.state.RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE;
+import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_IMPL;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -271,7 +271,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
- final String priorityQueueTypeString = config.getString(PRIORITY_QUEUE_STATE_TYPE.key(), "");
+ final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL.key(), "");
this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
@@ -281,7 +281,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
this.localRocksDbDirectories = original.localRocksDbDirectories;
}
else {
- final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
+ final String rocksdbLocalPaths = config.getString(RocksDBOptions.LOCAL_DIRECTORIES);
if (rocksdbLocalPaths != null) {
String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
index 7612c4c..9e16dea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
@@ -82,14 +82,14 @@ public class RocksDBStateBackendFactoryTest {
config1.setString(backendKey, "rocksdb");
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
- config1.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+ config1.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
config1.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
final Configuration config2 = new Configuration();
config2.setString(backendKey, RocksDBStateBackendFactory.class.getName());
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
- config2.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+ config2.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
config2.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
@@ -143,7 +143,7 @@ public class RocksDBStateBackendFactoryTest {
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, !incremental); // this should not be picked up
- config.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDir3 + ":" + localDir4); // this should not be picked up
+ config.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDir3 + ":" + localDir4); // this should not be picked up
final StateBackend loadedBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);