You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/06/17 12:34:03 UTC
[flink] 01/02: Revert "[FLINK-17800][roksdb] Support customized
RocksDB write/read options and use RocksDBResourceContainer to get them"
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 59486908060cbb04c9b34d800a935758d06d1c69
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Wed Jun 17 10:13:19 2020 +0200
Revert "[FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them"
This reverts commit f1250625b2ade530fa2619d6e1bb734832748d31.
---
.../streaming/state/RocksDBKeyedStateBackend.java | 8 +++--
.../state/RocksDBKeyedStateBackendBuilder.java | 17 +++++++++--
.../streaming/state/RocksDBOptionsFactory.java | 34 ----------------------
.../streaming/state/RocksDBResourceContainer.java | 34 ----------------------
.../state/RocksDBResourceContainerTest.java | 14 ---------
5 files changed, 21 insertions(+), 86 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 24b897a..f0cce0b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -219,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
RocksDB db,
+ WriteOptions writeOptions,
+ ReadOptions readOptions,
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
@@ -257,8 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.kvStateInformation = kvStateInformation;
- this.writeOptions = optionsContainer.getWriteOptions();
- this.readOptions = optionsContainer.getReadOptions();
+ this.writeOptions = writeOptions;
+ this.readOptions = readOptions;
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
this.writeBatchSize = writeBatchSize;
this.db = db;
@@ -368,6 +370,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
columnFamilyOptions.forEach(IOUtils::closeQuietly);
IOUtils.closeQuietly(optionsContainer);
+ IOUtils.closeQuietly(readOptions);
+ IOUtils.closeQuietly(writeOptions);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 1846bde..b9bdbb8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,7 +54,9 @@ import org.apache.flink.util.ResourceGuard;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -241,6 +243,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
ColumnFamilyHandle defaultColumnFamilyHandle = null;
RocksDBNativeMetricMonitor nativeMetricMonitor = null;
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
+ // The write options to use in the states.
+ WriteOptions writeOptions = null;
+ ReadOptions readOptions = null;
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -278,7 +283,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
}
}
- writeBatchWrapper = new RocksDBWriteBatchWrapper(db, optionsContainer.getWriteOptions(), writeBatchSize);
+ writeOptions = new WriteOptions().setDisableWAL(true);
+ readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+ writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
// it is important that we only create the key builder after the restore, and not before;
// restore operations may reconfigure the key serializer, so accessing the key serializer
// only now we can be certain that the key serializer used in the builder is final.
@@ -294,6 +301,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
keyGroupPrefixBytes,
kvStateInformation,
db,
+ readOptions,
writeBatchWrapper,
nativeMetricMonitor);
} catch (Throwable e) {
@@ -313,6 +321,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
IOUtils.closeQuietly(restoreOperation);
IOUtils.closeAllQuietly(columnFamilyOptions);
IOUtils.closeQuietly(optionsContainer);
+ IOUtils.closeQuietly(readOptions);
+ IOUtils.closeQuietly(writeOptions);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
kvStateInformation.clear();
try {
@@ -343,6 +353,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
this.executionConfig,
this.ttlTimeProvider,
db,
+ writeOptions,
+ readOptions,
kvStateInformation,
keyGroupPrefixBytes,
cancelStreamRegistryForBackend,
@@ -471,6 +483,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
int keyGroupPrefixBytes,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
+ ReadOptions readOptions,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor) {
PriorityQueueSetFactory priorityQueueFactory;
@@ -485,7 +498,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
numberOfKeyGroups,
kvStateInformation,
db,
- optionsContainer.getReadOptions(),
+ readOptions,
writeBatchWrapper,
nativeMetricMonitor,
columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index 5ee5098..a5fd1e9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -20,8 +20,6 @@ package org.apache.flink.contrib.streaming.state;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.WriteOptions;
import java.util.ArrayList;
import java.util.Collection;
@@ -95,38 +93,6 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa
return nativeMetricOptions;
}
- /**
- * This method should set the additional options on top of the current options object.
- * The current options object may contain pre-defined options based on flags that have
- * been configured on the state backend.
- *
- * <p>It is important to set the options on the current object and return the result from
- * the setter methods, otherwise the pre-defined options may get lost.
- *
- * @param currentOptions The options object with the pre-defined options.
- * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
- * @return The options object on which the additional options are set.
- */
- default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
- return currentOptions;
- }
-
- /**
- * This method should set the additional options on top of the current options object.
- * The current options object may contain pre-defined options based on flags that have
- * been configured on the state backend.
- *
- * <p>It is important to set the options on the current object and return the result from
- * the setter methods, otherwise the pre-defined options may get lost.
- *
- * @param currentOptions The options object with the pre-defined options.
- * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
- * @return The options object on which the additional options are set.
- */
- default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
- return currentOptions;
- }
-
// ------------------------------------------------------------------------
// for compatibility
// ------------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 8c4d957..c5227a5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -26,9 +26,7 @@ import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
import org.rocksdb.TableFormatConfig;
-import org.rocksdb.WriteOptions;
import javax.annotation.Nullable;
@@ -140,38 +138,6 @@ public final class RocksDBResourceContainer implements AutoCloseable {
return opt;
}
- /**
- * Gets the RocksDB {@link WriteOptions} to be used for write operations.
- */
- public WriteOptions getWriteOptions() {
- // Disable WAL by default
- WriteOptions opt = new WriteOptions().setDisableWAL(true);
- handlesToClose.add(opt);
-
- // add user-defined options factory, if specified
- if (optionsFactory != null) {
- opt = optionsFactory.createWriteOptions(opt, handlesToClose);
- }
-
- return opt;
- }
-
- /**
- * Gets the RocksDB {@link ReadOptions} to be used for read operations.
- */
- public ReadOptions getReadOptions() {
- // We ensure total order seek by default to prevent user misuse, see FLINK-17800 for more details
- ReadOptions opt = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
- handlesToClose.add(opt);
-
- // add user-defined options factory, if specified
- if (optionsFactory != null) {
- opt = optionsFactory.createReadOptions(opt, handlesToClose);
- }
-
- return opt;
- }
-
RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
return optionsFactory == null
? defaultMetricOptions
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index eb76e62..73d4bab 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -31,9 +31,7 @@ import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.NativeLibraryLoader;
-import org.rocksdb.ReadOptions;
import org.rocksdb.WriteBufferManager;
-import org.rocksdb.WriteOptions;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -248,16 +246,4 @@ public class RocksDBResourceContainerTest {
assertThat(cache.isOwningHandle(), is(false));
assertThat(wbm.isOwningHandle(), is(false));
}
-
- @Test
- public void testFreeWriteReadOptionsAfterClose() throws Exception {
- RocksDBResourceContainer container = new RocksDBResourceContainer();
- WriteOptions writeOptions = container.getWriteOptions();
- ReadOptions readOptions = container.getReadOptions();
- assertThat(writeOptions.isOwningHandle(), is(true));
- assertThat(readOptions.isOwningHandle(), is(true));
- container.close();
- assertThat(writeOptions.isOwningHandle(), is(false));
- assertThat(readOptions.isOwningHandle(), is(false));
- }
}