You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2021/11/29 02:44:34 UTC
[flink] branch release-1.14 updated: [FLINK-23798][state] Avoid using reflection to get filter when partition filter is enabled
This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new f7c0381 [FLINK-23798][state] Avoid using reflection to get filter when partition filter is enabled
f7c0381 is described below
commit f7c0381eb202819b9c1ecc1e3693b31377fe2a9a
Author: PengFei Li <lp...@gmail.com>
AuthorDate: Fri Nov 26 14:26:04 2021 +0800
[FLINK-23798][state] Avoid using reflection to get filter when partition filter is enabled
---
.../streaming/state/RocksDBResourceContainer.java | 24 ++--------------------
.../state/RocksDBResourceContainerTest.java | 3 +--
2 files changed, 3 insertions(+), 24 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 3be8382..d5917be 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -18,7 +18,6 @@
package org.apache.flink.contrib.streaming.state;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
@@ -38,7 +37,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -225,33 +223,15 @@ public final class RocksDBResourceContainer implements AutoCloseable {
* worked in full bloom filter, not blocked based.
*/
private boolean overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConfig) {
- Filter filter = null;
- try {
- filter = getFilterFromBlockBasedTableConfig(blockBasedTableConfig);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- LOG.warn(
- "Reflection exception occurred when getting filter from BlockBasedTableConfig, disable partition index filters!");
- return false;
- }
- if (filter != null) {
+ if (blockBasedTableConfig.filterPolicy() != null) {
// TODO Can get filter's config in the future RocksDB version, and build new filter use
// existing config.
BloomFilter newFilter = new BloomFilter(10, false);
LOG.info(
"Existing filter has been overwritten to full filters since partitioned index filters is enabled.");
- blockBasedTableConfig.setFilter(newFilter);
+ blockBasedTableConfig.setFilterPolicy(newFilter);
handlesToClose.add(newFilter);
}
return true;
}
-
- @VisibleForTesting
- static Filter getFilterFromBlockBasedTableConfig(BlockBasedTableConfig blockBasedTableConfig)
- throws NoSuchFieldException, IllegalAccessException {
- Field filterField = blockBasedTableConfig.getClass().getDeclaredField("filterPolicy");
- filterField.setAccessible(true);
- Object filter = filterField.get(blockBasedTableConfig);
- filterField.setAccessible(false);
- return filter == null ? null : (Filter) filter;
- }
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index 13b6a0a..ba889395 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -44,7 +44,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import static org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getFilterFromBlockBasedTableConfig;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -318,7 +317,7 @@ public class RocksDBResourceContainerTest {
assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch));
assertThat(actual.partitionFilters(), is(true));
assertThat(actual.pinTopLevelIndexAndFilter(), is(true));
- assertThat(getFilterFromBlockBasedTableConfig(actual), not(blockBasedFilter));
+ assertThat(actual.filterPolicy(), not(blockBasedFilter));
}
assertFalse("Block based filter is left unclosed.", blockBasedFilter.isOwningHandle());
}