You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2021/11/29 02:44:34 UTC

[flink] branch release-1.14 updated: [FLINK-23798][state] Avoid using reflection to get filter when partition filter is enabled

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new f7c0381  [FLINK-23798][state] Avoid using reflection to get filter when partition filter is enabled
f7c0381 is described below

commit f7c0381eb202819b9c1ecc1e3693b31377fe2a9a
Author: PengFei Li <lp...@gmail.com>
AuthorDate: Fri Nov 26 14:26:04 2021 +0800

    [FLINK-23798][state] Avoid using reflection to get filter when partition filter is enabled
---
 .../streaming/state/RocksDBResourceContainer.java  | 24 ++--------------------
 .../state/RocksDBResourceContainerTest.java        |  3 +--
 2 files changed, 3 insertions(+), 24 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 3be8382..d5917be 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -38,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -225,33 +223,15 @@ public final class RocksDBResourceContainer implements AutoCloseable {
      * worked in full bloom filter, not blocked based.
      */
     private boolean overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConfig) {
-        Filter filter = null;
-        try {
-            filter = getFilterFromBlockBasedTableConfig(blockBasedTableConfig);
-        } catch (NoSuchFieldException | IllegalAccessException e) {
-            LOG.warn(
-                    "Reflection exception occurred when getting filter from BlockBasedTableConfig, disable partition index filters!");
-            return false;
-        }
-        if (filter != null) {
+        if (blockBasedTableConfig.filterPolicy() != null) {
             // TODO Can get filter's config in the future RocksDB version, and build new filter use
             // existing config.
             BloomFilter newFilter = new BloomFilter(10, false);
             LOG.info(
                     "Existing filter has been overwritten to full filters since partitioned index filters is enabled.");
-            blockBasedTableConfig.setFilter(newFilter);
+            blockBasedTableConfig.setFilterPolicy(newFilter);
             handlesToClose.add(newFilter);
         }
         return true;
     }
-
-    @VisibleForTesting
-    static Filter getFilterFromBlockBasedTableConfig(BlockBasedTableConfig blockBasedTableConfig)
-            throws NoSuchFieldException, IllegalAccessException {
-        Field filterField = blockBasedTableConfig.getClass().getDeclaredField("filterPolicy");
-        filterField.setAccessible(true);
-        Object filter = filterField.get(blockBasedTableConfig);
-        filterField.setAccessible(false);
-        return filter == null ? null : (Filter) filter;
-    }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index 13b6a0a..ba889395 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -44,7 +44,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 
-import static org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getFilterFromBlockBasedTableConfig;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -318,7 +317,7 @@ public class RocksDBResourceContainerTest {
             assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch));
             assertThat(actual.partitionFilters(), is(true));
             assertThat(actual.pinTopLevelIndexAndFilter(), is(true));
-            assertThat(getFilterFromBlockBasedTableConfig(actual), not(blockBasedFilter));
+            assertThat(actual.filterPolicy(), not(blockBasedFilter));
         }
         assertFalse("Block based filter is left unclosed.", blockBasedFilter.isOwningHandle());
     }