You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/04 04:09:48 UTC

[GitHub] [flink] Myasuka commented on a change in pull request #14341: [FLINK-20496][state backends] RocksDB partitioned index/filters option.

Myasuka commented on a change in pull request #14341:
URL: https://github.com/apache/flink/pull/14341#discussion_r551108392



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -162,7 +162,7 @@ public ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handles
 			final long targetFileSize = 256 * 1024 * 1024;
 			final long writeBufferSize = 64 * 1024 * 1024;
 
-			BloomFilter bloomFilter = new BloomFilter();
+			BloomFilter bloomFilter = new BloomFilter(10, false);

Review comment:
       I think we could add some basic description of changing to full filter here. 

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
##########
@@ -127,6 +127,14 @@
 			.withDescription("The approximate size (in bytes) of user data packed per block. " +
 				"RocksDB has default blocksize as '4KB'.");
 
+	public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE =
+		key("state.backend.rocksdb.block.metadata-blocksize")
+			.memoryType()
+			.noDefaultValue()
+			.withDescription("Approximate size (in bytes) of partitioned metadata packed per block. " +

Review comment:
       As we use `MemorySize` to wrap this option, we could no longer need to describe it as `in bytes`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -209,4 +223,34 @@ public void close() throws Exception {
 			sharedResources.close();
 		}
 	}
+
+	/**
+	 * Overwrite configured {@link Filter} if enable partitioned filter.
+	 * Partitioned filter only worked in full bloom filter, not blocked based.
+	 */
+	private void overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConfig) {
+		Filter filter = getFilterFromBlockBasedTableConfig(blockBasedTableConfig);
+		if (filter != null) {
+			// TODO Can get filter's config in the future RocksDB version, and build new filter use existing config.
+			BloomFilter newFilter = new BloomFilter(10, false);
+			LOG.warn("Overwrite existing filter if '{}' is enabled.", RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS);
+			blockBasedTableConfig.setFilter(newFilter);
+			handlesToClose.add(newFilter);
+			IOUtils.closeQuietly(filter);
+			handlesToClose.remove(filter);
+		}
+	}
+
+	@VisibleForTesting
+	protected static Filter getFilterFromBlockBasedTableConfig(BlockBasedTableConfig blockBasedTableConfig) {

Review comment:
       This static method could be package level.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java
##########
@@ -50,6 +53,10 @@ public long getWriteBufferManagerCapacity() {
 		return writeBufferManagerCapacity;
 	}
 
+	public boolean isUsingPartitionedIndex() {

Review comment:
       I think this method could be renamed to `isUsingPartitionedIndexFilters`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org