You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2019/03/19 09:36:10 UTC

[parquet-mr] branch bloom-filter updated: PARQUET-1391: Integrate Bloom filter logic (#619)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch bloom-filter
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/bloom-filter by this push:
     new dd7e655  PARQUET-1391: Integrate Bloom filter logic (#619)
dd7e655 is described below

commit dd7e655de908cb4b35e1ea94f5b54525865dce6d
Author: Chen, Junjie <ji...@tencent.com>
AuthorDate: Tue Mar 19 17:36:02 2019 +0800

    PARQUET-1391: Integrate Bloom filter logic (#619)
---
 .../values/bloomfilter/BlockSplitBloomFilter.java  |  41 +++++-
 .../column/values/bloomfilter/BloomFilter.java     |  15 ++-
 .../values/bloomfilter/BloomFilterReadStore.java   |  34 -----
 .../values/bloomfilter/BloomFilterReader.java      |  32 -----
 .../filter2/BloomFilterLevel/BloomFilterImpl.java  | 150 +++++++++++++++++++++
 .../parquet/filter2/compat/RowGroupFilter.java     |  18 ++-
 ...ilterDataReader.java => BloomFilterReader.java} |  49 +++----
 .../apache/parquet/hadoop/ParquetFileReader.java   |   4 +-
 .../parquet/hadoop/TestParquetFileWriter.java      |   9 +-
 9 files changed, 239 insertions(+), 113 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
index b637897..d8ac0b4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -18,11 +18,13 @@
  */
 
 package org.apache.parquet.column.values.bloomfilter;
+
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.io.api.Binary;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -139,7 +141,7 @@ public class BlockSplitBloomFilter implements BloomFilter {
         hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
         break;
       default:
-        throw new RuntimeException("Not supported hash strategy");
+        throw new RuntimeException("Unsupported hash strategy");
     }
   }
 
@@ -255,6 +257,38 @@ public class BlockSplitBloomFilter implements BloomFilter {
   }
 
   @Override
+  public long getBitsetSize() {
+    return this.bitset.length;
+  }
+
+  @Override
+  public long hash(Object value) {
+    ByteBuffer plain;
+
+    if (value instanceof Binary) {
+      return hashFunction.hashBytes(((Binary) value).getBytes()).asLong();
+    }
+
+    if (value instanceof Integer) {
+      plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
+      plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value).intValue());
+    } else if (value instanceof Long) {
+      plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
+      plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value).longValue());
+    } else if (value instanceof Float) {
+      plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
+      plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value).floatValue());
+    } else if (value instanceof Double) {
+      plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE);
+      plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value).doubleValue());
+    } else {
+      throw new RuntimeException("Parquet Bloom filter: Not supported type");
+    }
+
+    return hashFunction.hashBytes(plain.array()).asLong();
+  }
+
+  @Override
   public long hash(int value) {
     ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
     plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
@@ -286,9 +320,4 @@ public class BlockSplitBloomFilter implements BloomFilter {
   public long hash(Binary value) {
     return hashFunction.hashBytes(value.getBytes()).asLong();
   }
-
-  @Override
-  public long getBitsetSize() {
-    return this.bitset.length;
-  }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
index 3ec192e..a6e548f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.values.bloomfilter;
 
 import org.apache.parquet.io.api.Binary;
+
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -71,6 +72,13 @@ public interface BloomFilter {
   boolean findHash(long hash);
 
   /**
+   * Get the number of bytes for bitset in this Bloom filter.
+   *
+   * @return The number of bytes for bitset in this Bloom filter.
+   */
+  long getBitsetSize();
+
+  /**
    * Compute hash for int value by using its plain encoding result.
    *
    * @param value the value to hash
@@ -111,9 +119,10 @@ public interface BloomFilter {
   long hash(Binary value);
 
   /**
-   * Get the number of bytes for bitset in this Bloom filter.
+   * Compute hash for Object value by using its plain encoding result.
    *
-   * @return The number of bytes for bitset in this Bloom filter.
+   * @param value the value to hash
+   * @return hash result
    */
-  long getBitsetSize();
+  long hash(Object value);
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java
deleted file mode 100644
index 3373bc1..0000000
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.column.values.bloomfilter;
-
-import org.apache.parquet.column.ColumnDescriptor;
-
-/**
- * contains all the bloom filter reader for all columns of a row group
- */
-public interface BloomFilterReadStore {
-  /**
-   * Get a Bloom filter reader of a column
-   *
-   * @param path the descriptor of the column
-   * @return the corresponding Bloom filter writer
-   */
-  BloomFilterReader getBloomFilterReader(ColumnDescriptor path);
-}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java
deleted file mode 100644
index 7a43058..0000000
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.column.values.bloomfilter;
-
-import org.apache.parquet.column.ColumnDescriptor;
-
-public interface BloomFilterReader {
-  /**
-   * Returns a {@link BloomFilter} for the given column descriptor.
-   *
-   * @param path the descriptor of the column
-   * @return the bloomFilter dta for that column, or null if there isn't one
-   */
-  BloomFilter readBloomFilter(ColumnDescriptor path);
-}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java
new file mode 100644
index 0000000..c1e3774
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.filter2.BloomFilterLevel;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.BloomFilterReader;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+public class BloomFilterImpl implements FilterPredicate.Visitor<Boolean>{
+  private static final Logger LOG = LoggerFactory.getLogger(BloomFilterImpl.class);
+  private static final boolean BLOCK_MIGHT_MATCH = false;
+  private static final boolean BLOCK_CANNOT_MATCH = true;
+
+  private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
+
+  public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns, BloomFilterReader bloomFilterReader) {
+    checkNotNull(pred, "pred");
+    checkNotNull(columns, "columns");
+    return pred.accept(new BloomFilterImpl(columns, bloomFilterReader));
+  }
+
+  private BloomFilterImpl(List<ColumnChunkMetaData> columnsList, BloomFilterReader bloomFilterReader) {
+    for (ColumnChunkMetaData chunk : columnsList) {
+      columns.put(chunk.getPath(), chunk);
+    }
+
+    this.bloomFilterReader = bloomFilterReader;
+  }
+
+  private BloomFilterReader bloomFilterReader;
+
+  private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
+    return columns.get(columnPath);
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.Eq<T> eq) {
+    T value = eq.getValue();
+
+    if (value == null) {
+      // the bloom filter bitset contains only non-null values so isn't helpful. this
+      // could check the column stats, but the StatisticsFilter is responsible
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    Operators.Column<T> filterColumn = eq.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+    if (meta == null) {
+      // the column isn't in this file so all values are null, but the value
+      // must be non-null because of the above check.
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    try {
+      BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta);
+      if (bloomFilter != null && !bloomFilter.findHash(bloomFilter.hash(value))) {
+        return BLOCK_CANNOT_MATCH;
+      }
+    } catch (RuntimeException e) {
+      LOG.warn(e.getMessage());
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.NotEq<T> notEq) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.Lt<T> lt) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.LtEq<T> ltEq) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.Gt<T> gt) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.GtEq<T> gtEq) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public Boolean visit(Operators.And and) {
+    return and.getLeft().accept(this) || and.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Operators.Or or) {
+    return or.getLeft().accept(this) && or.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Operators.Not not) {
+    throw new IllegalArgumentException(
+      "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+  }
+
+  private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.UserDefined<T, U> ud, boolean inverted) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.UserDefined<T, U> udp) {
+    return visit(udp, false);
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.LogicalNotUserDefined<T, U> udp) {
+    return visit(udp.getUserDefined(), true);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
index d1d40e9..fe6f637 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.parquet.filter2.BloomFilterLevel.BloomFilterImpl;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
 import org.apache.parquet.filter2.compat.FilterCompat.Visitor;
@@ -32,6 +33,8 @@ import org.apache.parquet.filter2.statisticslevel.StatisticsFilter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.parquet.Preconditions.checkNotNull;
 
@@ -45,10 +48,12 @@ public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
   private final MessageType schema;
   private final List<FilterLevel> levels;
   private final ParquetFileReader reader;
+  private Logger logger = LoggerFactory.getLogger(RowGroupFilter.class);
 
   public enum FilterLevel {
     STATISTICS,
-    DICTIONARY
+    DICTIONARY,
+    BLOOMFILTER
   }
 
   /**
@@ -104,6 +109,11 @@ public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
         drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block));
       }
 
+      if (!drop && levels.contains(FilterLevel.BLOOMFILTER)) {
+        drop = BloomFilterImpl.canDrop(filterPredicate, block.getColumns(), reader.getBloomFilterDataReader(block));
+        if (drop) logger.info("Block drop by Bloom filter");
+      }
+
       if(!drop) {
         filteredBlocks.add(block);
       }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
similarity index 51%
rename from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java
rename to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
index 96e258f..3ad91ce 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
@@ -17,55 +17,50 @@
  * under the License.
  */
 package org.apache.parquet.hadoop;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.parquet.Strings;
-import org.apache.parquet.column.ColumnDescriptor;
+
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.column.values.bloomfilter.BloomFilterReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
 /**
- * A {@link BloomFilterReader} implementation that reads Bloom filter data from
- * an open {@link ParquetFileReader}.
+ * Bloom filter reader that reads Bloom filter data from an open {@link ParquetFileReader}.
  *
  */
-public class BloomFilterDataReader implements BloomFilterReader {
+public class BloomFilterReader {
   private final ParquetFileReader reader;
-  private final Map<String, ColumnChunkMetaData> columns;
-  private final Map<String, BloomFilter> cache = new HashMap<>();
-  public BloomFilterDataReader(ParquetFileReader fileReader, BlockMetaData block) {
+  private final Map<ColumnPath, ColumnChunkMetaData> columns;
+  private final Map<ColumnPath, BloomFilter> cache = new HashMap<>();
+
+  public BloomFilterReader(ParquetFileReader fileReader, BlockMetaData block) {
     this.reader = fileReader;
     this.columns = new HashMap<>();
     for (ColumnChunkMetaData column : block.getColumns()) {
-      columns.put(column.getPath().toDotString(), column);
+      columns.put(column.getPath(), column);
     }
   }
-  @Override
-  public BloomFilter readBloomFilter(ColumnDescriptor descriptor) {
-    String dotPath = Strings.join(descriptor.getPath(), ".");
-    ColumnChunkMetaData column = columns.get(dotPath);
-    if (column == null) {
-      throw new ParquetDecodingException(
-        "Cannot load Bloom filter data, unknown column: " + dotPath);
-    }
-    if (cache.containsKey(dotPath)) {
-      return cache.get(dotPath);
+
+  public BloomFilter readBloomFilter(ColumnChunkMetaData meta) {
+    if (cache.containsKey(meta.getPath())) {
+      return cache.get(meta.getPath());
     }
     try {
       synchronized (cache) {
-        if (!cache.containsKey(dotPath)) {
-          BloomFilter bloomFilter = reader.readBloomFilter(column);
+        if (!cache.containsKey(meta.getPath())) {
+          BloomFilter bloomFilter = reader.readBloomFilter(meta);
           if (bloomFilter == null) return null;
-          cache.put(dotPath, bloomFilter);
+          cache.put(meta.getPath(), bloomFilter);
         }
       }
-      return cache.get(dotPath);
+      return cache.get(meta.getPath());
     } catch (IOException e) {
-      throw new ParquetDecodingException(
-        "Failed to read Bloom data", e);
+      throw new RuntimeException(
+        "Failed to read Bloom filter data", e);
     }
   }
+
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 7fe0e41..860e044 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1049,8 +1049,8 @@ public class ParquetFileReader implements Closeable {
         converter.getEncoding(dictHeader.getEncoding()));
   }
 
-  public BloomFilterDataReader getBloomFilterDataReader(BlockMetaData block) {
-    return new BloomFilterDataReader(this, block);
+  public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) {
+    return new BloomFilterReader(this, block);
   }
 
   /**
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 71ca5ea..e19e35c 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -29,7 +29,6 @@ import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.column.values.bloomfilter.BloomFilterReader;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.junit.Assume;
 import org.junit.Rule;
@@ -249,10 +248,10 @@ public class TestParquetFileWriter {
     ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
     ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
       Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
-    BloomFilterReader bloomFilterReader =  r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
-    BloomFilter bloomDataRead = bloomFilterReader.readBloomFilter(col);
-    assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("hello"))));
-    assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("world"))));
+    BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
+    BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
+    assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("hello"))));
+    assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("world"))));
   }
 
   @Test