You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2019/03/19 09:36:10 UTC
[parquet-mr] branch bloom-filter updated: PARQUET-1391: Integrate
Bloom filter logic (#619)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch bloom-filter
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/bloom-filter by this push:
new dd7e655 PARQUET-1391: Integrate Bloom filter logic (#619)
dd7e655 is described below
commit dd7e655de908cb4b35e1ea94f5b54525865dce6d
Author: Chen, Junjie <ji...@tencent.com>
AuthorDate: Tue Mar 19 17:36:02 2019 +0800
PARQUET-1391: Integrate Bloom filter logic (#619)
---
.../values/bloomfilter/BlockSplitBloomFilter.java | 41 +++++-
.../column/values/bloomfilter/BloomFilter.java | 15 ++-
.../values/bloomfilter/BloomFilterReadStore.java | 34 -----
.../values/bloomfilter/BloomFilterReader.java | 32 -----
.../filter2/BloomFilterLevel/BloomFilterImpl.java | 150 +++++++++++++++++++++
.../parquet/filter2/compat/RowGroupFilter.java | 18 ++-
...ilterDataReader.java => BloomFilterReader.java} | 49 +++----
.../apache/parquet/hadoop/ParquetFileReader.java | 4 +-
.../parquet/hadoop/TestParquetFileWriter.java | 9 +-
9 files changed, 239 insertions(+), 113 deletions(-)
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
index b637897..d8ac0b4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -18,11 +18,13 @@
*/
package org.apache.parquet.column.values.bloomfilter;
+
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.io.api.Binary;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -139,7 +141,7 @@ public class BlockSplitBloomFilter implements BloomFilter {
hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
break;
default:
- throw new RuntimeException("Not supported hash strategy");
+ throw new RuntimeException("Unsupported hash strategy");
}
}
@@ -255,6 +257,38 @@ public class BlockSplitBloomFilter implements BloomFilter {
}
@Override
+ public long getBitsetSize() {
+ return this.bitset.length;
+ }
+
+ @Override
+ public long hash(Object value) {
+ ByteBuffer plain;
+
+ if (value instanceof Binary) {
+ return hashFunction.hashBytes(((Binary) value).getBytes()).asLong();
+ }
+
+ if (value instanceof Integer) {
+ plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value).intValue());
+ } else if (value instanceof Long) {
+ plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value).longValue());
+ } else if (value instanceof Float) {
+ plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value).floatValue());
+ } else if (value instanceof Double) {
+ plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value).doubleValue());
+ } else {
+ throw new RuntimeException("Parquet Bloom filter: Not supported type");
+ }
+
+ return hashFunction.hashBytes(plain.array()).asLong();
+ }
+
+ @Override
public long hash(int value) {
ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
@@ -286,9 +320,4 @@ public class BlockSplitBloomFilter implements BloomFilter {
public long hash(Binary value) {
return hashFunction.hashBytes(value.getBytes()).asLong();
}
-
- @Override
- public long getBitsetSize() {
- return this.bitset.length;
- }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
index 3ec192e..a6e548f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -19,6 +19,7 @@
package org.apache.parquet.column.values.bloomfilter;
import org.apache.parquet.io.api.Binary;
+
import java.io.IOException;
import java.io.OutputStream;
@@ -71,6 +72,13 @@ public interface BloomFilter {
boolean findHash(long hash);
/**
+ * Get the number of bytes for bitset in this Bloom filter.
+ *
+ * @return The number of bytes for bitset in this Bloom filter.
+ */
+ long getBitsetSize();
+
+ /**
* Compute hash for int value by using its plain encoding result.
*
* @param value the value to hash
@@ -111,9 +119,10 @@ public interface BloomFilter {
long hash(Binary value);
/**
- * Get the number of bytes for bitset in this Bloom filter.
+ * Compute hash for Object value by using its plain encoding result.
*
- * @return The number of bytes for bitset in this Bloom filter.
+ * @param value the value to hash
+ * @return hash result
*/
- long getBitsetSize();
+ long hash(Object value);
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java
deleted file mode 100644
index 3373bc1..0000000
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.column.values.bloomfilter;
-
-import org.apache.parquet.column.ColumnDescriptor;
-
-/**
- * contains all the bloom filter reader for all columns of a row group
- */
-public interface BloomFilterReadStore {
- /**
- * Get a Bloom filter reader of a column
- *
- * @param path the descriptor of the column
- * @return the corresponding Bloom filter writer
- */
- BloomFilterReader getBloomFilterReader(ColumnDescriptor path);
-}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java
deleted file mode 100644
index 7a43058..0000000
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.column.values.bloomfilter;
-
-import org.apache.parquet.column.ColumnDescriptor;
-
-public interface BloomFilterReader {
- /**
- * Returns a {@link BloomFilter} for the given column descriptor.
- *
- * @param path the descriptor of the column
- * @return the bloomFilter dta for that column, or null if there isn't one
- */
- BloomFilter readBloomFilter(ColumnDescriptor path);
-}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java
new file mode 100644
index 0000000..c1e3774
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.filter2.BloomFilterLevel;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.BloomFilterReader;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+public class BloomFilterImpl implements FilterPredicate.Visitor<Boolean>{
+ private static final Logger LOG = LoggerFactory.getLogger(BloomFilterImpl.class);
+ private static final boolean BLOCK_MIGHT_MATCH = false;
+ private static final boolean BLOCK_CANNOT_MATCH = true;
+
+ private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
+
+ public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns, BloomFilterReader bloomFilterReader) {
+ checkNotNull(pred, "pred");
+ checkNotNull(columns, "columns");
+ return pred.accept(new BloomFilterImpl(columns, bloomFilterReader));
+ }
+
+ private BloomFilterImpl(List<ColumnChunkMetaData> columnsList, BloomFilterReader bloomFilterReader) {
+ for (ColumnChunkMetaData chunk : columnsList) {
+ columns.put(chunk.getPath(), chunk);
+ }
+
+ this.bloomFilterReader = bloomFilterReader;
+ }
+
+ private BloomFilterReader bloomFilterReader;
+
+ private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
+ return columns.get(columnPath);
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.Eq<T> eq) {
+ T value = eq.getValue();
+
+ if (value == null) {
+ // the bloom filter bitset contains only non-null values so isn't helpful. this
+ // could check the column stats, but the StatisticsFilter is responsible
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ Operators.Column<T> filterColumn = eq.getColumn();
+ ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+ if (meta == null) {
+ // the column isn't in this file so all values are null, but the value
+ // must be non-null because of the above check.
+ return BLOCK_CANNOT_MATCH;
+ }
+
+ try {
+ BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta);
+ if (bloomFilter != null && !bloomFilter.findHash(bloomFilter.hash(value))) {
+ return BLOCK_CANNOT_MATCH;
+ }
+ } catch (RuntimeException e) {
+ LOG.warn(e.getMessage());
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.NotEq<T> notEq) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.Lt<T> lt) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.LtEq<T> ltEq) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.Gt<T> gt) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.GtEq<T> gtEq) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean visit(Operators.And and) {
+ return and.getLeft().accept(this) || and.getRight().accept(this);
+ }
+
+ @Override
+ public Boolean visit(Operators.Or or) {
+ return or.getLeft().accept(this) && or.getRight().accept(this);
+ }
+
+ @Override
+ public Boolean visit(Operators.Not not) {
+ throw new IllegalArgumentException(
+ "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+ }
+
+ private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.UserDefined<T, U> ud, boolean inverted) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.UserDefined<T, U> udp) {
+ return visit(udp, false);
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.LogicalNotUserDefined<T, U> udp) {
+ return visit(udp.getUserDefined(), true);
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
index d1d40e9..fe6f637 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.parquet.filter2.BloomFilterLevel.BloomFilterImpl;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
import org.apache.parquet.filter2.compat.FilterCompat.Visitor;
@@ -32,6 +33,8 @@ import org.apache.parquet.filter2.statisticslevel.StatisticsFilter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.parquet.Preconditions.checkNotNull;
@@ -45,10 +48,12 @@ public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
private final MessageType schema;
private final List<FilterLevel> levels;
private final ParquetFileReader reader;
+ private Logger logger = LoggerFactory.getLogger(RowGroupFilter.class);
public enum FilterLevel {
STATISTICS,
- DICTIONARY
+ DICTIONARY,
+ BLOOMFILTER
}
/**
@@ -104,6 +109,11 @@ public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block));
}
+ if (!drop && levels.contains(FilterLevel.BLOOMFILTER)) {
+ drop = BloomFilterImpl.canDrop(filterPredicate, block.getColumns(), reader.getBloomFilterDataReader(block));
+ if (drop) logger.info("Block drop by Bloom filter");
+ }
+
if(!drop) {
filteredBlocks.add(block);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
similarity index 51%
rename from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java
rename to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
index 96e258f..3ad91ce 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
@@ -17,55 +17,50 @@
* under the License.
*/
package org.apache.parquet.hadoop;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.parquet.Strings;
-import org.apache.parquet.column.ColumnDescriptor;
+
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.column.values.bloomfilter.BloomFilterReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
/**
- * A {@link BloomFilterReader} implementation that reads Bloom filter data from
- * an open {@link ParquetFileReader}.
+ * Bloom filter reader that reads Bloom filter data from an open {@link ParquetFileReader}.
*
*/
-public class BloomFilterDataReader implements BloomFilterReader {
+public class BloomFilterReader {
private final ParquetFileReader reader;
- private final Map<String, ColumnChunkMetaData> columns;
- private final Map<String, BloomFilter> cache = new HashMap<>();
- public BloomFilterDataReader(ParquetFileReader fileReader, BlockMetaData block) {
+ private final Map<ColumnPath, ColumnChunkMetaData> columns;
+ private final Map<ColumnPath, BloomFilter> cache = new HashMap<>();
+
+ public BloomFilterReader(ParquetFileReader fileReader, BlockMetaData block) {
this.reader = fileReader;
this.columns = new HashMap<>();
for (ColumnChunkMetaData column : block.getColumns()) {
- columns.put(column.getPath().toDotString(), column);
+ columns.put(column.getPath(), column);
}
}
- @Override
- public BloomFilter readBloomFilter(ColumnDescriptor descriptor) {
- String dotPath = Strings.join(descriptor.getPath(), ".");
- ColumnChunkMetaData column = columns.get(dotPath);
- if (column == null) {
- throw new ParquetDecodingException(
- "Cannot load Bloom filter data, unknown column: " + dotPath);
- }
- if (cache.containsKey(dotPath)) {
- return cache.get(dotPath);
+
+ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) {
+ if (cache.containsKey(meta.getPath())) {
+ return cache.get(meta.getPath());
}
try {
synchronized (cache) {
- if (!cache.containsKey(dotPath)) {
- BloomFilter bloomFilter = reader.readBloomFilter(column);
+ if (!cache.containsKey(meta.getPath())) {
+ BloomFilter bloomFilter = reader.readBloomFilter(meta);
if (bloomFilter == null) return null;
- cache.put(dotPath, bloomFilter);
+ cache.put(meta.getPath(), bloomFilter);
}
}
- return cache.get(dotPath);
+ return cache.get(meta.getPath());
} catch (IOException e) {
- throw new ParquetDecodingException(
- "Failed to read Bloom data", e);
+ throw new RuntimeException(
+ "Failed to read Bloom filter data", e);
}
}
+
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 7fe0e41..860e044 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1049,8 +1049,8 @@ public class ParquetFileReader implements Closeable {
converter.getEncoding(dictHeader.getEncoding()));
}
- public BloomFilterDataReader getBloomFilterDataReader(BlockMetaData block) {
- return new BloomFilterDataReader(this, block);
+ public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) {
+ return new BloomFilterReader(this, block);
}
/**
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 71ca5ea..e19e35c 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -29,7 +29,6 @@ import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.column.values.bloomfilter.BloomFilterReader;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.junit.Assume;
import org.junit.Rule;
@@ -249,10 +248,10 @@ public class TestParquetFileWriter {
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
- BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
- BloomFilter bloomDataRead = bloomFilterReader.readBloomFilter(col);
- assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("hello"))));
- assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("world"))));
+ BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
+ BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
+ assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("hello"))));
+ assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("world"))));
}
@Test