You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/21 04:56:21 UTC

[flink] 03/03: [FLINK-18242][state-backend-rocksdb] Separate RocksDBOptionsFactory from OptionsFactory

This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd91affd8040123e2f757b24859b7dad33e09532
Author: Yu Li <li...@apache.org>
AuthorDate: Thu Jun 11 19:32:39 2020 +0800

    [FLINK-18242][state-backend-rocksdb] Separate RocksDBOptionsFactory from OptionsFactory
    
    This closes #12673.
---
 flink-python/pyflink/datastream/state_backend.py   |  8 ++--
 .../streaming/state/OptionsFactoryAdapter.java     | 55 ++++++++++++++++++++++
 .../streaming/state/RocksDBOptionsFactory.java     | 28 +----------
 .../state/RocksDBOptionsFactoryAdapter.java        |  4 +-
 .../streaming/state/RocksDBStateBackend.java       | 10 ++--
 .../state/RocksDBStateBackendConfigTest.java       | 11 ++++-
 6 files changed, 78 insertions(+), 38 deletions(-)

diff --git a/flink-python/pyflink/datastream/state_backend.py b/flink-python/pyflink/datastream/state_backend.py
index d894137..46ccf66 100644
--- a/flink-python/pyflink/datastream/state_backend.py
+++ b/flink-python/pyflink/datastream/state_backend.py
@@ -682,11 +682,11 @@ class RocksDBStateBackend(StateBackend):
                                            The options factory must have a default constructor.
         """
         gateway = get_gateway()
-        JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.OptionsFactory
+        JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory
         j_options_factory_clz = load_java_class(options_factory_class_name)
         if not get_java_class(JOptionsFactory).isAssignableFrom(j_options_factory_clz):
-            raise ValueError("The input class not implements OptionsFactory.")
-        self._j_rocks_db_state_backend.setOptions(j_options_factory_clz.newInstance())
+            raise ValueError("The input class not implements RocksDBOptionsFactory.")
+        self._j_rocks_db_state_backend.setRocksDBOptions(j_options_factory_clz.newInstance())
 
     def get_options(self):
         """
@@ -695,7 +695,7 @@ class RocksDBStateBackend(StateBackend):
 
         :return: The fully-qualified class name of the options factory in Java.
         """
-        j_options_factory = self._j_rocks_db_state_backend.getOptions()
+        j_options_factory = self._j_rocks_db_state_backend.getRocksDBOptions()
         if j_options_factory is not None:
             return j_options_factory.getClass().getName()
         else:
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java
new file mode 100644
index 0000000..666bc4b
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactoryAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.util.ArrayList;
+
+/**
+ * A conversion from {@link RocksDBOptionsFactory} to {@link OptionsFactory}.
+ */
+public class OptionsFactoryAdapter implements OptionsFactory {
+
+	private static final long serialVersionUID = 1L;
+
+	private final RocksDBOptionsFactory rocksDBOptionsFactory;
+
+	OptionsFactoryAdapter(RocksDBOptionsFactory rocksDBOptionsFactory) {
+		this.rocksDBOptionsFactory = rocksDBOptionsFactory;
+	}
+
+	@Override
+	public DBOptions createDBOptions(DBOptions currentOptions) {
+		return rocksDBOptionsFactory.createDBOptions(currentOptions, new ArrayList<>());
+	}
+
+	@Override
+	public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+		return rocksDBOptionsFactory.createColumnOptions(currentOptions, new ArrayList<>());
+	}
+
+	@VisibleForTesting
+	RocksDBOptionsFactory getRocksDBOptionsFactory() {
+		return rocksDBOptionsFactory;
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index a5fd1e9..dffa9e9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
-import java.util.ArrayList;
 import java.util.Collection;
 
 /**
@@ -32,7 +31,7 @@ import java.util.Collection;
  * <p>A typical pattern to use this OptionsFactory is as follows:
  *
  * <pre>{@code
- * rocksDbBackend.setOptions(new RocksDBOptionsFactory() {
+ * rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
  *
  *		public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
  *			return currentOptions.setMaxOpenFiles(1024);
@@ -49,8 +48,7 @@ import java.util.Collection;
  * });
  * }</pre>
  */
-@SuppressWarnings("deprecation")
-public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializable {
+public interface RocksDBOptionsFactory extends java.io.Serializable {
 
 	/**
 	 * This method should set the additional options on top of the current options object.
@@ -92,26 +90,4 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa
 	default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
 		return nativeMetricOptions;
 	}
-
-	// ------------------------------------------------------------------------
-	//  for compatibility
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Do not override these methods, they are only to maintain interface compatibility with
-	 * prior versions. They will be removed in one of the next versions.
-	 */
-	@Override
-	default DBOptions createDBOptions(DBOptions currentOptions) {
-		return createDBOptions(currentOptions, new ArrayList<>());
-	}
-
-	/**
-	 * Do not override these methods, they are only to maintain interface compatibility with
-	 * prior versions. They will be removed in one of the next versions.
-	 */
-	@Override
-	default ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
-		return createColumnOptions(currentOptions, new ArrayList<>());
-	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
index 36e8c3f..7efc32d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
@@ -67,9 +67,9 @@ final class RocksDBOptionsFactoryAdapter implements ConfigurableRocksDBOptionsFa
 	}
 
 	@Nullable
-	public static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
+	static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
 		return factory instanceof RocksDBOptionsFactoryAdapter
 				? ((RocksDBOptionsFactoryAdapter) factory).optionsFactory
-				: factory;
+				: new OptionsFactoryAdapter(factory);
 	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 873f57d..e9d27b6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -861,9 +861,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	 */
 	@Deprecated
 	public void setOptions(OptionsFactory optionsFactory) {
-		this.rocksDbOptionsFactory = optionsFactory instanceof RocksDBOptionsFactory
-				? (RocksDBOptionsFactory) optionsFactory
-				: new RocksDBOptionsFactoryAdapter(optionsFactory);
+		this.rocksDbOptionsFactory = new RocksDBOptionsFactoryAdapter(optionsFactory);
 	}
 
 	/**
@@ -874,7 +872,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	 */
 	@Deprecated
 	public OptionsFactory getOptions() {
-		return RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
+		if (rocksDbOptionsFactory == null) {
+			return null;
+		} else {
+			return RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
+		}
 	}
 
 	/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 2167a00..1f691bf 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -552,7 +552,14 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend = rocksDbBackend.configure(config, getClass().getClassLoader());
 
 		assertTrue(rocksDbBackend.getRocksDBOptions() instanceof TestOptionsFactory);
-		assertTrue(rocksDbBackend.getOptions() instanceof TestOptionsFactory);
+		OptionsFactory optionsFactory = rocksDbBackend.getOptions();
+		if (optionsFactory instanceof OptionsFactoryAdapter) {
+			RocksDBOptionsFactory rocksDBOptionsFactory =
+				((OptionsFactoryAdapter) optionsFactory).getRocksDBOptionsFactory();
+			assertTrue(rocksDBOptionsFactory instanceof TestOptionsFactory);
+		} else {
+			assertTrue(optionsFactory instanceof TestOptionsFactory);
+		}
 
 		try (RocksDBResourceContainer optionsContainer = rocksDbBackend.createOptionsAndResourceContainer()) {
 			DBOptions dbOptions = optionsContainer.getDbOptions();
@@ -640,7 +647,7 @@ public class RocksDBStateBackendConfigTest {
 
 		assertEquals(original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled());
 		assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths());
-		assertEquals(original.getOptions(), copy.getOptions());
+		assertEquals(original.getRocksDBOptions(), copy.getRocksDBOptions());
 		assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions());
 
 		FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend();