You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/21 04:56:21 UTC
[flink] 03/03: [FLINK-18242][state-backend-rocksdb] Separate
RocksDBOptionsFactory from OptionsFactory
This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd91affd8040123e2f757b24859b7dad33e09532
Author: Yu Li <li...@apache.org>
AuthorDate: Thu Jun 11 19:32:39 2020 +0800
[FLINK-18242][state-backend-rocksdb] Separate RocksDBOptionsFactory from OptionsFactory
This closes #12673.
---
flink-python/pyflink/datastream/state_backend.py | 8 ++--
.../streaming/state/OptionsFactoryAdapter.java | 55 ++++++++++++++++++++++
.../streaming/state/RocksDBOptionsFactory.java | 28 +----------
.../state/RocksDBOptionsFactoryAdapter.java | 4 +-
.../streaming/state/RocksDBStateBackend.java | 10 ++--
.../state/RocksDBStateBackendConfigTest.java | 11 ++++-
6 files changed, 78 insertions(+), 38 deletions(-)
diff --git a/flink-python/pyflink/datastream/state_backend.py b/flink-python/pyflink/datastream/state_backend.py
index d894137..46ccf66 100644
--- a/flink-python/pyflink/datastream/state_backend.py
+++ b/flink-python/pyflink/datastream/state_backend.py
@@ -682,11 +682,11 @@ class RocksDBStateBackend(StateBackend):
The options factory must have a default constructor.
"""
gateway = get_gateway()
- JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.OptionsFactory
+ JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory
j_options_factory_clz = load_java_class(options_factory_class_name)
if not get_java_class(JOptionsFactory).isAssignableFrom(j_options_factory_clz):
- raise ValueError("The input class not implements OptionsFactory.")
- self._j_rocks_db_state_backend.setOptions(j_options_factory_clz.newInstance())
+ raise ValueError("The input class not implements RocksDBOptionsFactory.")
+ self._j_rocks_db_state_backend.setRocksDBOptions(j_options_factory_clz.newInstance())
def get_options(self):
"""
@@ -695,7 +695,7 @@ class RocksDBStateBackend(StateBackend):
:return: The fully-qualified class name of the options factory in Java.
"""
- j_options_factory = self._j_rocks_db_state_backend.getOptions()
+ j_options_factory = self._j_rocks_db_state_backend.getRocksDBOptions()
if j_options_factory is not None:
return j_options_factory.getClass().getName()
else:
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java
new file mode 100644
index 0000000..666bc4b
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.util.ArrayList;
+
+/**
+ * A conversion from {@link RocksDBOptionsFactory} to {@link OptionsFactory}.
+ */
+public class OptionsFactoryAdapter implements OptionsFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RocksDBOptionsFactory rocksDBOptionsFactory;
+
+ OptionsFactoryAdapter(RocksDBOptionsFactory rocksDBOptionsFactory) {
+ this.rocksDBOptionsFactory = rocksDBOptionsFactory;
+ }
+
+ @Override
+ public DBOptions createDBOptions(DBOptions currentOptions) {
+ return rocksDBOptionsFactory.createDBOptions(currentOptions, new ArrayList<>());
+ }
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+ return rocksDBOptionsFactory.createColumnOptions(currentOptions, new ArrayList<>());
+ }
+
+ @VisibleForTesting
+ RocksDBOptionsFactory getRocksDBOptionsFactory() {
+ return rocksDBOptionsFactory;
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index a5fd1e9..dffa9e9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import java.util.ArrayList;
import java.util.Collection;
/**
@@ -32,7 +31,7 @@ import java.util.Collection;
* <p>A typical pattern to use this OptionsFactory is as follows:
*
* <pre>{@code
- * rocksDbBackend.setOptions(new RocksDBOptionsFactory() {
+ * rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
*
* public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
* return currentOptions.setMaxOpenFiles(1024);
@@ -49,8 +48,7 @@ import java.util.Collection;
* });
* }</pre>
*/
-@SuppressWarnings("deprecation")
-public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializable {
+public interface RocksDBOptionsFactory extends java.io.Serializable {
/**
* This method should set the additional options on top of the current options object.
@@ -92,26 +90,4 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa
default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
return nativeMetricOptions;
}
-
- // ------------------------------------------------------------------------
- // for compatibility
- // ------------------------------------------------------------------------
-
- /**
- * Do not override these methods, they are only to maintain interface compatibility with
- * prior versions. They will be removed in one of the next versions.
- */
- @Override
- default DBOptions createDBOptions(DBOptions currentOptions) {
- return createDBOptions(currentOptions, new ArrayList<>());
- }
-
- /**
- * Do not override these methods, they are only to maintain interface compatibility with
- * prior versions. They will be removed in one of the next versions.
- */
- @Override
- default ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
- return createColumnOptions(currentOptions, new ArrayList<>());
- }
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
index 36e8c3f..7efc32d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
@@ -67,9 +67,9 @@ final class RocksDBOptionsFactoryAdapter implements ConfigurableRocksDBOptionsFa
}
@Nullable
- public static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
+ static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
return factory instanceof RocksDBOptionsFactoryAdapter
? ((RocksDBOptionsFactoryAdapter) factory).optionsFactory
- : factory;
+ : new OptionsFactoryAdapter(factory);
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 873f57d..e9d27b6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -861,9 +861,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
*/
@Deprecated
public void setOptions(OptionsFactory optionsFactory) {
- this.rocksDbOptionsFactory = optionsFactory instanceof RocksDBOptionsFactory
- ? (RocksDBOptionsFactory) optionsFactory
- : new RocksDBOptionsFactoryAdapter(optionsFactory);
+ this.rocksDbOptionsFactory = new RocksDBOptionsFactoryAdapter(optionsFactory);
}
/**
@@ -874,7 +872,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
*/
@Deprecated
public OptionsFactory getOptions() {
- return RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
+ if (rocksDbOptionsFactory == null) {
+ return null;
+ } else {
+ return RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
+ }
}
/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 2167a00..1f691bf 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -552,7 +552,14 @@ public class RocksDBStateBackendConfigTest {
rocksDbBackend = rocksDbBackend.configure(config, getClass().getClassLoader());
assertTrue(rocksDbBackend.getRocksDBOptions() instanceof TestOptionsFactory);
- assertTrue(rocksDbBackend.getOptions() instanceof TestOptionsFactory);
+ OptionsFactory optionsFactory = rocksDbBackend.getOptions();
+ if (optionsFactory instanceof OptionsFactoryAdapter) {
+ RocksDBOptionsFactory rocksDBOptionsFactory =
+ ((OptionsFactoryAdapter) optionsFactory).getRocksDBOptionsFactory();
+ assertTrue(rocksDBOptionsFactory instanceof TestOptionsFactory);
+ } else {
+ assertTrue(optionsFactory instanceof TestOptionsFactory);
+ }
try (RocksDBResourceContainer optionsContainer = rocksDbBackend.createOptionsAndResourceContainer()) {
DBOptions dbOptions = optionsContainer.getDbOptions();
@@ -640,7 +647,7 @@ public class RocksDBStateBackendConfigTest {
assertEquals(original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled());
assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths());
- assertEquals(original.getOptions(), copy.getOptions());
+ assertEquals(original.getRocksDBOptions(), copy.getRocksDBOptions());
assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions());
FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend();