You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/01/07 11:16:51 UTC
[parquet-mr] branch bloom-filter updated: PARQUET-1660: align Bloom
filter implementation with format (#686)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch bloom-filter
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/bloom-filter by this push:
new ba28686 PARQUET-1660: align Bloom filter implementation with format (#686)
ba28686 is described below
commit ba28686ac84e22ef8694e4b9d25ec89295267b91
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Tue Jan 7 19:16:42 2020 +0800
PARQUET-1660: align Bloom filter implementation with format (#686)
---
parquet-column/pom.xml | 6 +-
.../apache/parquet/column/ParquetProperties.java | 65 +++++++--
.../parquet/column/impl/ColumnWriteStoreV2.java | 7 +
.../parquet/column/impl/ColumnWriterBase.java | 24 +++-
.../apache/parquet/column/page/PageWriteStore.java | 6 +-
.../values/bloomfilter/BlockSplitBloomFilter.java | 152 +++++++++++++--------
.../column/values/bloomfilter/BloomFilter.java | 19 ++-
.../values/bloomfilter/BloomFilterWriter.java | 2 -
.../{BloomFilterWriter.java => HashFunction.java} | 28 ++--
.../{BloomFilterWriter.java => XxHash.java} | 29 ++--
.../bloomfilter/TestBlockSplitBloomFilter.java | 20 +--
parquet-format-structures/pom.xml | 2 +-
.../parquet/hadoop/ColumnChunkPageWriteStore.java | 8 +-
.../hadoop/InternalParquetRecordWriter.java | 11 +-
.../apache/parquet/hadoop/ParquetFileReader.java | 9 +-
.../apache/parquet/hadoop/ParquetOutputFormat.java | 25 +++-
.../org/apache/parquet/hadoop/ParquetWriter.java | 23 +++-
.../parquet/hadoop/TestParquetFileWriter.java | 20 +--
.../apache/parquet/hadoop/TestParquetWriter.java | 50 ++++++-
pom.xml | 2 +-
20 files changed, 372 insertions(+), 136 deletions(-)
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index 9428508..bce3cf1 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -58,7 +58,11 @@
<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>net.openhft</groupId>
+ <artifactId>zero-allocation-hashing</artifactId>
+ <version>0.9</version>
+ </dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index b447912..519c0f3 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,6 +29,7 @@ import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
@@ -36,7 +37,9 @@ import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.schema.MessageType;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
/**
* This class represents all the configurable Parquet properties.
@@ -52,6 +55,7 @@ public class ParquetProperties {
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
+ public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
@@ -94,13 +98,16 @@ public class ParquetProperties {
// The key-value pair represents the column name and its expected distinct number of values in a row group.
private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
+ private final int maxBloomFilterBytes;
+ private final Set<String> bloomFilterColumns;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
- boolean pageWriteChecksumEnabled, Map<String, Long> bloomFilterExpectedDistinctNumber) {
+ boolean pageWriteChecksumEnabled, Map<String, Long> bloomFilterExpectedDistinctNumber,
+ Set<String> bloomFilterColumns, int maxBloomFilterBytes) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -115,6 +122,8 @@ public class ParquetProperties {
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber;
+ this.bloomFilterColumns = bloomFilterColumns;
+ this.maxBloomFilterBytes = maxBloomFilterBytes;
this.pageRowCountLimit = pageRowCountLimit;
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
}
@@ -178,12 +187,13 @@ public class ParquetProperties {
}
public ColumnWriteStore newColumnWriteStore(MessageType schema,
- PageWriteStore pageStore) {
+ PageWriteStore pageStore,
+ BloomFilterWriteStore bloomFilterWriteStore) {
switch (writerVersion) {
case PARQUET_1_0:
- return new ColumnWriteStoreV1(schema, pageStore, this);
+ return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this);
case PARQUET_2_0:
- return new ColumnWriteStoreV2(schema, pageStore, this);
+ return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this);
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
@@ -221,6 +231,14 @@ public class ParquetProperties {
return bloomFilterExpectedDistinctNumbers;
}
+ public Set<String> getBloomFilterColumns() {
+ return bloomFilterColumns;
+ }
+
+ public int getMaxBloomFilterBytes() {
+ return maxBloomFilterBytes;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -241,6 +259,8 @@ public class ParquetProperties {
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
+ private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
+ private Set<String> bloomFilterColumns = new HashSet<>();
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
@@ -260,6 +280,8 @@ public class ParquetProperties {
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
+ this.bloomFilterColumns = toCopy.bloomFilterColumns;
+ this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
}
/**
@@ -349,12 +371,34 @@ public class ParquetProperties {
}
/**
- * Set Bloom filter info for columns.
+ * Set max Bloom filter bytes for related columns.
+ *
+ * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column.
+ * @return this builder for method chaining
+ */
+ public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) {
+ this.maxBloomFilterBytes = maxBloomFilterBytes;
+ return this;
+ }
+
+ /**
+ * Set Bloom filter column names.
+ *
+ * @param columns the columns which has bloom filter enabled.
+ * @return this builder for method chaining
+ */
+ public Builder withBloomFilterColumnNames(Set<String> columns) {
+ this.bloomFilterColumns = columns;
+ return this;
+ }
+
+ /**
+ * Set expected columns distinct number for Bloom filter.
*
* @param columnExpectedNDVs the columns expected number of distinct values in a row group
* @return this builder for method chaining
*/
- public Builder withBloomFilterInfo(Map<String, Long> columnExpectedNDVs) {
+ public Builder withBloomFilterColumnNdvs(Map<String, Long> columnExpectedNDVs) {
this.bloomFilterColumnExpectedNDVs = columnExpectedNDVs;
return this;
}
@@ -375,7 +419,8 @@ public class ParquetProperties {
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
- pageRowCountLimit, pageWriteChecksumEnabled, bloomFilterColumnExpectedNDVs);
+ pageRowCountLimit, pageWriteChecksumEnabled, bloomFilterColumnExpectedNDVs,
+ bloomFilterColumns, maxBloomFilterBytes);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index 953b63e..590c3ed 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -22,6 +22,7 @@ import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;
@@ -31,6 +32,12 @@ public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
super(schema, pageWriteStore, props);
}
+ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
+ BloomFilterWriteStore bloomFilterWriteStore,
+ ParquetProperties props) {
+ super(schema, pageWriteStore, bloomFilterWriteStore, props);
+ }
+
@Override
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 73f6138..b3b799a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.impl;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
@@ -85,14 +86,27 @@ abstract class ColumnWriterBase implements ColumnWriter {
if (path.getPath().length != 1 || bloomFilterWriter == null) {
return;
}
+ String column = path.getPath()[0];
this.bloomFilterWriter = bloomFilterWriter;
+ Set<String> bloomFilterColumns = props.getBloomFilterColumns();
+ if (!bloomFilterColumns.contains(column)) {
+ return;
+ }
+ int maxBloomFilterSize = props.getMaxBloomFilterBytes();
+
Map<String, Long> bloomFilterColumnExpectedNDVs = props.getBloomFilterColumnExpectedNDVs();
- String column = path.getPath()[0];
- if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
- int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
- BlockSplitBloomFilter.DEFAULT_FPP);
- this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits/8);
+ if (bloomFilterColumnExpectedNDVs.size() > 0) {
+ // If user specify the column NDV, we construct Bloom filter from it.
+ if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
+ int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
+ BlockSplitBloomFilter.DEFAULT_FPP);
+
+ this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize);
+ }
+ }
+ else {
+ this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize);
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
index 0aac63e..aa68cc1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
index d8ac0b4..cc9f674 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -19,8 +19,6 @@
package org.apache.parquet.column.values.bloomfilter;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.io.api.Binary;
@@ -42,28 +40,27 @@ public class BlockSplitBloomFilter implements BloomFilter {
// Bytes in a tiny Bloom filter block.
private static final int BYTES_PER_BLOCK = 32;
- // Default seed for the hash function. It comes from System.nanoTime().
- private static final int DEFAULT_SEED = 1361930890;
+ // Bits in a tiny Bloom filter block.
+ private static final int BITS_PER_BLOCK = 256;
- // Minimum Bloom filter size, set to the size of a tiny Bloom filter block
- public static final int MINIMUM_BYTES = 32;
+ // Default minimum Bloom filter size, set to the size of a tiny Bloom filter block
+ public static final int DEFAULT_MINIMUM_BYTES = 32;
- // Maximum Bloom filter size, set to the default HDFS block size for upper boundary check
- // This should be re-consider when implementing write side logic.
- public static final int MAXIMUM_BYTES = 128 * 1024 * 1024;
+ // Default Maximum Bloom filter size, set to 1MB which should cover most cases.
+ public static final int DEFAULT_MAXIMUM_BYTES = 1024 * 1024;
// The number of bits to set in a tiny Bloom filter
private static final int BITS_SET_PER_BLOCK = 8;
- // The metadata in the header of a serialized Bloom filter is three four-byte values: the number of bytes,
- // the filter algorithm, and the hash algorithm.
- public static final int HEADER_SIZE = 12;
+ // The metadata in the header of a serialized Bloom filter is four four-byte values: the number of bytes,
+ // the filter algorithm, the hash algorithm, and the compression.
+ public static final int HEADER_SIZE = 16;
// The default false positive probability value
public static final double DEFAULT_FPP = 0.01;
// Hash strategy used in this Bloom filter.
- public final HashStrategy hashStrategy;
+ private final HashStrategy hashStrategy;
// The underlying byte array for Bloom filter bitset.
private byte[] bitset;
@@ -74,51 +71,84 @@ public class BlockSplitBloomFilter implements BloomFilter {
// Hash function use to compute hash for column value.
private HashFunction hashFunction;
+ private int maximumBytes = DEFAULT_MAXIMUM_BYTES;
+ private int minimumBytes = DEFAULT_MINIMUM_BYTES;
+
// The block-based algorithm needs 8 odd SALT values to calculate eight indexes
// of bits to set, one per 32-bit word.
- private static final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+ private static final int[] SALT = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
/**
- * Constructor of Bloom filter.
+ * Constructor of block-based Bloom filter.
*
* @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
- * [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down
+ * [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
* to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
- * of 2. It uses murmur3_x64_128 as its default hash function.
+ * of 2. It uses XXH64 as its default hash function.
*/
public BlockSplitBloomFilter(int numBytes) {
- this(numBytes, HashStrategy.MURMUR3_X64_128);
+ this(numBytes, DEFAULT_MAXIMUM_BYTES, HashStrategy.XXH64);
}
/**
- * Constructor of block-based Bloom filter. It uses murmur3_x64_128 as its default hash
- * function.
+ * Constructor of block-based Bloom filter.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+ * [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
+ * to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
+ * of 2. It uses XXH64 as its default hash function.
+ * @param maximumBytes The maximum bytes of the Bloom filter.
+ */
+ public BlockSplitBloomFilter(int numBytes, int maximumBytes) {
+ this(numBytes, maximumBytes, HashStrategy.XXH64);
+ }
+
+ /**
+ * Constructor of block-based Bloom filter.
*
* @param numBytes The number of bytes for Bloom filter bitset
* @param hashStrategy The hash strategy of Bloom filter.
*/
private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) {
+ this(numBytes, DEFAULT_MAXIMUM_BYTES, hashStrategy);
+ }
+
+ /**
+ * Constructor of block-based Bloom filter.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+ * [DEFAULT_MINIMUM_BYTES, maximumBytes], it will be rounded up/down to lower/upper bound if
+ * num_bytes is out of range. It will also be rounded up to a power of 2.
+ * @param maximumBytes The maximum bytes of the Bloom filter.
+ * @param hashStrategy The adopted hash strategy of the Bloom filter.
+ */
+ public BlockSplitBloomFilter(int numBytes, int maximumBytes, HashStrategy hashStrategy) {
+ if (maximumBytes > DEFAULT_MINIMUM_BYTES) {
+ this.maximumBytes = maximumBytes;
+ }
initBitset(numBytes);
+
switch (hashStrategy) {
- case MURMUR3_X64_128:
+ case XXH64:
this.hashStrategy = hashStrategy;
- hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+ hashFunction = new XxHash();
break;
default:
throw new RuntimeException("Unsupported hash strategy");
}
}
+
/**
* Construct the Bloom filter with given bitset, it is used when reconstructing
- * Bloom filter from parquet file. It use murmur3_x64_128 as its default hash
+ * Bloom filter from parquet file. It use XXH64 as its default hash
* function.
*
* @param bitset The given bitset to construct Bloom filter.
*/
public BlockSplitBloomFilter(byte[] bitset) {
- this(bitset, HashStrategy.MURMUR3_X64_128);
+ this(bitset, HashStrategy.XXH64);
}
/**
@@ -136,9 +166,9 @@ public class BlockSplitBloomFilter implements BloomFilter {
this.bitset = bitset;
this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
switch (hashStrategy) {
- case MURMUR3_X64_128:
+ case XXH64:
this.hashStrategy = hashStrategy;
- hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+ hashFunction = new XxHash();
break;
default:
throw new RuntimeException("Unsupported hash strategy");
@@ -149,21 +179,21 @@ public class BlockSplitBloomFilter implements BloomFilter {
* Create a new bitset for Bloom filter.
*
* @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
- * [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down
+ * [minimumBytes, maximumBytes], it will be rounded up/down
* to lower/upper bound if num_bytes is out of range and also will rounded up to a power
- * of 2. It uses murmur3_x64_128 as its default hash function and block-based algorithm
+ * of 2. It uses XXH64 as its default hash function and block-based algorithm
* as default algorithm.
*/
private void initBitset(int numBytes) {
- if (numBytes < MINIMUM_BYTES) {
- numBytes = MINIMUM_BYTES;
+ if (numBytes < minimumBytes) {
+ numBytes = minimumBytes;
}
// Get next power of 2 if it is not power of 2.
if ((numBytes & (numBytes - 1)) != 0) {
numBytes = Integer.highestOneBit(numBytes) << 1;
}
- if (numBytes > MAXIMUM_BYTES || numBytes < 0) {
- numBytes = MAXIMUM_BYTES;
+ if (numBytes > maximumBytes || numBytes < 0) {
+ numBytes = maximumBytes;
}
this.bitset = new byte[numBytes];
this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
@@ -177,12 +207,14 @@ public class BlockSplitBloomFilter implements BloomFilter {
out.write(BytesUtils.intToBytes(hashStrategy.value));
// Write algorithm
out.write(BytesUtils.intToBytes(Algorithm.BLOCK.value));
+ // Write compression
+ out.write(BytesUtils.intToBytes(Compression.UNCOMPRESSED.value));
// Write bitset
out.write(bitset);
}
private int[] setMask(int key) {
- int mask[] = new int[BITS_SET_PER_BLOCK];
+ int[] mask = new int[BITS_SET_PER_BLOCK];
for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
mask[i] = key * SALT[i];
@@ -199,27 +231,31 @@ public class BlockSplitBloomFilter implements BloomFilter {
@Override
public void insertHash(long hash) {
- int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1);
+ long numBlocks = bitset.length / BYTES_PER_BLOCK;
+ long lowHash = hash >>> 32;
+ int blockIndex = (int)((lowHash * numBlocks) >> 32);
int key = (int)hash;
// Calculate mask for bucket.
- int mask[] = setMask(key);
+ int[] mask = setMask(key);
for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
- int value = intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i);
+ int value = intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i);
value |= mask[i];
- intBuffer.put(bucketIndex * (BYTES_PER_BLOCK / 4) + i, value);
+ intBuffer.put(blockIndex * (BYTES_PER_BLOCK / 4) + i, value);
}
}
@Override
public boolean findHash(long hash) {
- int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1);
+ long numBlocks = bitset.length / BYTES_PER_BLOCK;
+ long lowHash = hash >>> 32;
+ int blockIndex = (int)((lowHash * numBlocks) >> 32);
int key = (int)hash;
// Calculate mask for the tiny Bloom filter.
- int mask[] = setMask(key);
+ int[] mask = setMask(key);
for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
- if (0 == (intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) {
+ if (0 == (intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) {
return false;
}
}
@@ -238,19 +274,19 @@ public class BlockSplitBloomFilter implements BloomFilter {
Preconditions.checkArgument((p > 0.0 && p < 1.0),
"FPP should be less than 1.0 and great than 0.0");
final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8));
- final double MAX = MAXIMUM_BYTES << 3;
+ final double MAX = DEFAULT_MAXIMUM_BYTES << 3;
int numBits = (int)m;
// Handle overflow.
if (m > MAX || m < 0) {
numBits = (int)MAX;
}
- // Get next power of 2 if bits is not power of 2.
- if ((numBits & (numBits - 1)) != 0) {
- numBits = Integer.highestOneBit(numBits) << 1;
- }
- if (numBits < (MINIMUM_BYTES << 3)) {
- numBits = MINIMUM_BYTES << 3;
+
+ // Round to BITS_PER_BLOCK
+ numBits = (numBits + BITS_PER_BLOCK -1) & ~BITS_PER_BLOCK;
+
+ if (numBits < (DEFAULT_MINIMUM_BYTES << 3)) {
+ numBits = DEFAULT_MINIMUM_BYTES << 3;
}
return numBits;
@@ -266,58 +302,58 @@ public class BlockSplitBloomFilter implements BloomFilter {
ByteBuffer plain;
if (value instanceof Binary) {
- return hashFunction.hashBytes(((Binary) value).getBytes()).asLong();
+ return hashFunction.hashBytes(((Binary) value).getBytes());
}
if (value instanceof Integer) {
plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
- plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value).intValue());
+ plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value));
} else if (value instanceof Long) {
plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
- plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value).longValue());
+ plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value));
} else if (value instanceof Float) {
plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
- plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value).floatValue());
+ plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value));
} else if (value instanceof Double) {
plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE);
- plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value).doubleValue());
+ plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value));
} else {
throw new RuntimeException("Parquet Bloom filter: Not supported type");
}
- return hashFunction.hashBytes(plain.array()).asLong();
+ return hashFunction.hashByteBuffer(plain);
}
@Override
public long hash(int value) {
ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
- return hashFunction.hashBytes(plain.array()).asLong();
+ return hashFunction.hashByteBuffer(plain);
}
@Override
public long hash(long value) {
ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value);
- return hashFunction.hashBytes(plain.array()).asLong();
+ return hashFunction.hashByteBuffer(plain);
}
@Override
public long hash(double value) {
ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value);
- return hashFunction.hashBytes(plain.array()).asLong();
+ return hashFunction.hashByteBuffer(plain);
}
@Override
public long hash(float value) {
ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value);
- return hashFunction.hashBytes(plain.array()).asLong();
+ return hashFunction.hashByteBuffer(plain);
}
@Override
public long hash(Binary value) {
- return hashFunction.hashBytes(value.getBytes()).asLong();
+ return hashFunction.hashBytes(value.getBytes());
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
index a6e548f..8b26c97 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -29,9 +29,15 @@ import java.io.OutputStream;
* a hash strategy and a Bloom filter algorithm.
*/
public interface BloomFilter {
- // Bloom filter Hash strategy.
+ /* Bloom filter Hash strategy.
+ *
+ * xxHash is an extremely fast hash algorithm, running at RAM speed limits. It successfully
+ * completes the SMHasher test suite which evaluates collision, dispersion and randomness qualities
+ * of hash functions. It shows good performance advantage from its benchmark result.
+ * (see https://github.com/Cyan4973/xxHash).
+ */
enum HashStrategy {
- MURMUR3_X64_128(0);
+ XXH64(0);
HashStrategy(int value) {
this.value = value;
}
@@ -47,6 +53,15 @@ public interface BloomFilter {
int value;
}
+ // Bloom filter compression.
+ enum Compression {
+ UNCOMPRESSED(0);
+ Compression(int value) {
+ this.value = value;
+ }
+ int value;
+ }
+
/**
* Write the Bloom filter to an output stream. It writes the Bloom filter header including the
* bitset's length in bytes, the hash strategy, the algorithm, and the bitset.
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
index 0fab73b..e2504d8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
@@ -1,5 +1,3 @@
-
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
similarity index 65%
copy from parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
copy to parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
index 0fab73b..2043934 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
@@ -1,5 +1,3 @@
-
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,16 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.parquet.column.values.bloomfilter;
-public interface BloomFilterWriter {
+import java.nio.ByteBuffer;
+
+/**
+ * A interface contains a set of hash functions used by Bloom filter.
+ */
+public interface HashFunction {
+
/**
- * Write a Bloom filter
- *
- * @param bloomFilter the Bloom filter to write
- *
+ * compute the hash value for a byte array.
+ * @param input the input byte array
+ * @return a result of long value.
*/
- void writeBloomFilter(BloomFilter bloomFilter);
-}
+ long hashBytes(byte[] input);
+ /**
+ * compute the hash value for a ByteBuffer.
+ * @param input the input ByteBuffer
+ * @return a result of long value.
+ */
+ long hashByteBuffer(ByteBuffer input);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
similarity index 65%
copy from parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
copy to parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
index 0fab73b..6c52b3c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
@@ -1,5 +1,3 @@
-
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -21,13 +19,22 @@
package org.apache.parquet.column.values.bloomfilter;
-public interface BloomFilterWriter {
- /**
- * Write a Bloom filter
- *
- * @param bloomFilter the Bloom filter to write
- *
- */
- void writeBloomFilter(BloomFilter bloomFilter);
-}
+import net.openhft.hashing.LongHashFunction;
+
+import java.nio.ByteBuffer;
+/**
+ * The implementation of HashFunction interface. The XxHash uses XXH64 version xxHash
+ * with a seed of 0.
+ */
+public class XxHash implements HashFunction {
+ @Override
+ public long hashBytes(byte[] input) {
+ return LongHashFunction.xx(0).hashBytes(input);
+ }
+
+ @Override
+ public long hashByteBuffer(ByteBuffer input) {
+ return LongHashFunction.xx(0).hashBytes(input);
+ }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
index 8dbb0ba..d75c0e2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -37,11 +37,11 @@ import static org.junit.Assert.assertTrue;
public class TestBlockSplitBloomFilter {
@Test
- public void testConstructor () throws IOException {
+ public void testConstructor () {
BloomFilter bloomFilter1 = new BlockSplitBloomFilter(0);
- assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.MINIMUM_BYTES);
- BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.MAXIMUM_BYTES + 1);
- assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.MAXIMUM_BYTES);
+ assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MINIMUM_BYTES);
+ BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES + 1);
+ assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES);
BloomFilter bloomFilter3 = new BlockSplitBloomFilter(1000);
assertEquals(bloomFilter3.getBitsetSize(), 1024);
}
@@ -55,7 +55,7 @@ public class TestBlockSplitBloomFilter {
*/
@Test
public void testBasic () throws IOException {
- final String testStrings[] = {"hello", "parquet", "bloom", "filter"};
+ final String[] testStrings = {"hello", "parquet", "bloom", "filter"};
BloomFilter bloomFilter = new BlockSplitBloomFilter(1024);
for(int i = 0; i < testStrings.length; i++) {
@@ -75,17 +75,21 @@ public class TestBlockSplitBloomFilter {
fileInputStream.read(value);
int hash = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
- assertEquals(hash, BloomFilter.HashStrategy.MURMUR3_X64_128.ordinal());
+ assertEquals(hash, BloomFilter.HashStrategy.XXH64.ordinal());
fileInputStream.read(value);
int algorithm = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal());
+ fileInputStream.read(value);
+ int compression = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ assertEquals(compression, BloomFilter.Compression.UNCOMPRESSED.ordinal());
+
byte[] bitset = new byte[length];
fileInputStream.read(bitset);
bloomFilter = new BlockSplitBloomFilter(bitset);
- for(int i = 0; i < testStrings.length; i++) {
- assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testStrings[i]))));
+ for (String testString : testStrings) {
+ assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testString))));
}
}
diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml
index a5dbabb..a05cd2b 100644
--- a/parquet-format-structures/pom.xml
+++ b/parquet-format-structures/pom.xml
@@ -70,7 +70,7 @@
<plugin>
<groupId>org.apache.thrift</groupId>
<artifactId>thrift-maven-plugin</artifactId>
- <version>${thrift-maven-plugin.version}</version>
+ <version>${thrift-maven-plugin.version}</version>
<configuration>
<thriftSourceRoot>${parquet.thrift.path}</thriftSourceRoot>
<thriftExecutable>${format.thrift.executable}</thriftExecutable>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 897c7c2..d2e4c96 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -38,6 +38,7 @@ import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
@@ -49,7 +50,7 @@ import org.apache.parquet.bytes.ByteBufferAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class ColumnChunkPageWriteStore implements PageWriteStore {
+class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore {
private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class);
private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
@@ -321,6 +322,11 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
return writers.get(path);
}
+ @Override
+ public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) {
+ return writers.get(path);
+ }
+
public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
for (ColumnDescriptor path : schema.getColumns()) {
ColumnChunkPageWriter pageWriter = writers.get(path);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index c3da323..18ee788 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -64,6 +65,7 @@ class InternalParquetRecordWriter<T> {
private ColumnWriteStore columnStore;
private ColumnChunkPageWriteStore pageStore;
+ private BloomFilterWriteStore bloomFilterWriteStore;
private RecordConsumer recordConsumer;
/**
@@ -101,9 +103,12 @@ class InternalParquetRecordWriter<T> {
}
private void initStore() {
- pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
- props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled());
- columnStore = props.newColumnWriteStore(schema, pageStore);
+ ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor,
+ schema, props.getAllocator(), props.getColumnIndexTruncateLength());
+ pageStore = columnChunkPageWriteStore;
+ bloomFilterWriteStore = columnChunkPageWriteStore;
+
+ columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
this.recordConsumer = columnIO.getRecordWriter(columnStore);
writeSupport.prepareForWrite(recordConsumer);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index ba15558..4cd846c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1074,12 +1074,12 @@ public class ParquetFileReader implements Closeable {
ByteBuffer bloomHeader = ByteBuffer.wrap(bytes);
IntBuffer headerBuffer = bloomHeader.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
int numBytes = headerBuffer.get();
- if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.MAXIMUM_BYTES) {
+ if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES) {
return null;
}
BloomFilter.HashStrategy hash = BloomFilter.HashStrategy.values()[headerBuffer.get()];
- if (hash != BlockSplitBloomFilter.HashStrategy.MURMUR3_X64_128) {
+ if (hash != BlockSplitBloomFilter.HashStrategy.XXH64) {
return null;
}
@@ -1088,6 +1088,11 @@ public class ParquetFileReader implements Closeable {
return null;
}
+ BloomFilter.Compression compression = BloomFilter.Compression.values()[headerBuffer.get()];
+ if (compression != BlockSplitBloomFilter.Compression.UNCOMPRESSED) {
+ return null;
+ }
+
byte[] bitset = new byte[numBytes];
f.readFully(bitset);
return new BlockSplitBloomFilter(bitset);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 8741aee..4a72134 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -23,8 +23,11 @@ import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -148,6 +151,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
public static final String BLOOM_FILTER_COLUMN_NAMES = "parquet.bloom.filter.column.names";
public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
+ public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
@@ -215,6 +219,20 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
return getEnableDictionary(getConfiguration(jobContext));
}
+ public static int getBloomFilterMaxBytes(Configuration conf) {
+ return conf.getInt(BLOOM_FILTER_MAX_BYTES,
+ ParquetProperties.DEFAULT_MAX_BLOOM_FILTER_BYTES);
+ }
+
+ public static Set<String> getBloomFilterColumns(Configuration conf) {
+ String columnNames = conf.get(BLOOM_FILTER_COLUMN_NAMES);
+ if (columnNames != null) {
+ return new HashSet<>(Arrays.asList(columnNames.split(",")));
+ } else {
+ return new HashSet<>();
+ }
+ }
+
public static Map<String, Long> getBloomFilterColumnExpectedNDVs(Configuration conf) {
Map<String, Long> kv = new HashMap<>();
String columnNamesConf = conf.get(BLOOM_FILTER_COLUMN_NAMES);
@@ -443,7 +461,9 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
.withPageSize(getPageSize(conf))
.withDictionaryPageSize(getDictionaryPageSize(conf))
.withDictionaryEncoding(getEnableDictionary(conf))
- .withBloomFilterInfo(getBloomFilterColumnExpectedNDVs(conf))
+ .withBloomFilterColumnNames(getBloomFilterColumns(conf))
+ .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
+ .withBloomFilterColumnNdvs(getBloomFilterColumnExpectedNDVs(conf))
.withWriterVersion(getWriterVersion(conf))
.estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
@@ -469,7 +489,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
- LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumnExpectedNDVs().keySet());
+ LOG.info("Max Bloom filter size for a column is {}", props.getMaxBloomFilterBytes());
+ LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumns());
LOG.info("Bloom filter enabled column expected number of distinct values are: {}",
props.getBloomFilterColumnExpectedNDVs().values());
LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 7fb7186..638d4e7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,8 @@ package org.apache.parquet.hadoop;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -538,6 +540,21 @@ public class ParquetWriter<T> implements Closeable {
}
/**
+ * Enables bloom filter column names for the constructed writer.
+ *
+ * @return this builder for method chaining.
+ */
+ public SELF withBloomFilterColumnNames(String... columnNames) {
+ if (columnNames != null) {
+ encodingPropsBuilder.withBloomFilterColumnNames(
+ new HashSet<>(Arrays.asList(columnNames))
+ );
+ }
+
+ return self();
+ }
+
+ /**
* Set a property that will be available to the read path. For writers that use a Hadoop
* configuration, this is the recommended way to add configuration values.
*
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index dd84e63..3de4524 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -222,14 +222,14 @@ public class TestParquetFileWriter {
}
@Test
- public void testBloomWriteRead() throws Exception {
+ public void testBloomFilterWriteRead() throws Exception {
MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
File testFile = temp.newFile();
testFile.delete();
Path path = new Path(testFile.toURI());
Configuration configuration = new Configuration();
- configuration.set("parquet.bloomFilter.filter.column.names", "foo");
- String colPath[] = {"foo"};
+ configuration.set("parquet.bloom.filter.column.names", "foo");
+ String[] colPath = {"foo"};
ColumnDescriptor col = schema.getColumnDescription(colPath);
BinaryStatistics stats1 = new BinaryStatistics();
ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
@@ -239,19 +239,19 @@ public class TestParquetFileWriter {
w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
w.endColumn();
- BloomFilter bloomData = new BlockSplitBloomFilter(0);
- bloomData.insertHash(bloomData.hash(Binary.fromString("hello")));
- bloomData.insertHash(bloomData.hash(Binary.fromString("world")));
- w.writeBloomFilter(bloomData);
+ BloomFilter blockSplitBloomFilter = new BlockSplitBloomFilter(0);
+ blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("hello")));
+ blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("world")));
+ w.writeBloomFilter(blockSplitBloomFilter);
w.endBlock();
- w.end(new HashMap<String, String>());
+ w.end(new HashMap<>());
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
- assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("hello"))));
- assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("world"))));
+ assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))));
+ assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world"))));
}
@Test
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 25c9608..343b1fa 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,8 @@
package org.apache.parquet.hadoop;
import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -45,8 +47,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import net.openhft.hashing.LongHashFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.hadoop.example.ExampleInputFormat;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.schema.GroupType;
@@ -207,4 +212,43 @@ public class TestParquetWriter {
assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount);
}
}
+
+ @Test
+ public void testParquetFileWithBloomFilter() throws IOException {
+ MessageType schema = Types.buildMessage().
+ required(BINARY).as(stringType()).named("name").named("msg");
+
+ String[] testNames = {"hello", "parquet", "bloom", "filter"};
+
+ final int recordCount = testNames.length;
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ GroupFactory factory = new SimpleGroupFactory(schema);
+ File file = temp.newFile();
+ file.delete();
+ Path path = new Path(file.getAbsolutePath());
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+ .withPageRowCountLimit(10)
+ .withConf(conf)
+ .withDictionaryEncoding(false)
+ .withBloomFilterColumnNames("name")
+ .build()) {
+ for (String testName : testNames) {
+ writer.write(factory.newGroup().append("name", testName));
+ }
+ }
+
+ ParquetMetadata footer = readFooter(conf, path, NO_FILTER);
+ ParquetFileReader reader = new ParquetFileReader(
+ conf, footer.getFileMetaData(), path, footer.getBlocks(), schema.getColumns());
+
+ BloomFilter bloomFilter = reader.getBloomFilterDataReader(footer.getBlocks().get(0))
+ .readBloomFilter(footer.getBlocks().get(0).getColumns().get(0));
+
+ for (String name: testNames) {
+ assertTrue(bloomFilter.findHash(
+ LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index 5dd4aae..6ea28ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@
<hadoop1.version>1.2.1</hadoop1.version>
<cascading.version>2.7.1</cascading.version>
<cascading3.version>3.1.2</cascading3.version>
- <parquet.format.version>2.7.0-SNAPSHOT</parquet.format.version>
+ <parquet.format.version>2.7.0</parquet.format.version>
<previous.version>1.7.0</previous.version>
<thrift.executable>thrift</thrift.executable>
<format.thrift.executable>thrift</format.thrift.executable>