You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/01/07 11:16:51 UTC

[parquet-mr] branch bloom-filter updated: PARQUET-1660: align Bloom filter implementation with format (#686)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch bloom-filter
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/bloom-filter by this push:
     new ba28686  PARQUET-1660: align Bloom filter implementation with format (#686)
ba28686 is described below

commit ba28686ac84e22ef8694e4b9d25ec89295267b91
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Tue Jan 7 19:16:42 2020 +0800

    PARQUET-1660: align Bloom filter implementation with format (#686)
---
 parquet-column/pom.xml                             |   6 +-
 .../apache/parquet/column/ParquetProperties.java   |  65 +++++++--
 .../parquet/column/impl/ColumnWriteStoreV2.java    |   7 +
 .../parquet/column/impl/ColumnWriterBase.java      |  24 +++-
 .../apache/parquet/column/page/PageWriteStore.java |   6 +-
 .../values/bloomfilter/BlockSplitBloomFilter.java  | 152 +++++++++++++--------
 .../column/values/bloomfilter/BloomFilter.java     |  19 ++-
 .../values/bloomfilter/BloomFilterWriter.java      |   2 -
 .../{BloomFilterWriter.java => HashFunction.java}  |  28 ++--
 .../{BloomFilterWriter.java => XxHash.java}        |  29 ++--
 .../bloomfilter/TestBlockSplitBloomFilter.java     |  20 +--
 parquet-format-structures/pom.xml                  |   2 +-
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  |   8 +-
 .../hadoop/InternalParquetRecordWriter.java        |  11 +-
 .../apache/parquet/hadoop/ParquetFileReader.java   |   9 +-
 .../apache/parquet/hadoop/ParquetOutputFormat.java |  25 +++-
 .../org/apache/parquet/hadoop/ParquetWriter.java   |  23 +++-
 .../parquet/hadoop/TestParquetFileWriter.java      |  20 +--
 .../apache/parquet/hadoop/TestParquetWriter.java   |  50 ++++++-
 pom.xml                                            |   2 +-
 20 files changed, 372 insertions(+), 136 deletions(-)

diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index 9428508..bce3cf1 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -58,7 +58,11 @@
       <artifactId>fastutil</artifactId>
       <version>${fastutil.version}</version>
     </dependency>
-
+    <dependency>
+      <groupId>net.openhft</groupId>
+      <artifactId>zero-allocation-hashing</artifactId>
+      <version>0.9</version>
+    </dependency>
     <dependency>
       <groupId>com.carrotsearch</groupId>
       <artifactId>junit-benchmarks</artifactId>
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index b447912..519c0f3 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,6 +29,7 @@ import org.apache.parquet.column.impl.ColumnWriteStoreV2;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
 import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
@@ -36,7 +37,9 @@ import org.apache.parquet.column.values.factory.ValuesWriterFactory;
 import org.apache.parquet.schema.MessageType;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * This class represents all the configurable Parquet properties.
@@ -52,6 +55,7 @@ public class ParquetProperties {
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
   public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
   public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
+  public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
 
   public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
 
@@ -94,13 +98,16 @@ public class ParquetProperties {
 
   // The key-value pair represents the column name and its expected distinct number of values in a row group.
   private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
+  private final int maxBloomFilterBytes;
+  private final Set<String> bloomFilterColumns;
   private final int pageRowCountLimit;
   private final boolean pageWriteChecksumEnabled;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
                             int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
                             ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
-                            boolean pageWriteChecksumEnabled, Map<String, Long> bloomFilterExpectedDistinctNumber) {
+                            boolean pageWriteChecksumEnabled, Map<String, Long> bloomFilterExpectedDistinctNumber,
+                            Set<String> bloomFilterColumns, int maxBloomFilterBytes) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -115,6 +122,8 @@ public class ParquetProperties {
     this.valuesWriterFactory = writerFactory;
     this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
     this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber;
+    this.bloomFilterColumns = bloomFilterColumns;
+    this.maxBloomFilterBytes = maxBloomFilterBytes;
     this.pageRowCountLimit = pageRowCountLimit;
     this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
   }
@@ -178,12 +187,13 @@ public class ParquetProperties {
   }
 
   public ColumnWriteStore newColumnWriteStore(MessageType schema,
-                                              PageWriteStore pageStore) {
+                                              PageWriteStore pageStore,
+                                              BloomFilterWriteStore bloomFilterWriteStore) {
     switch (writerVersion) {
     case PARQUET_1_0:
-      return new ColumnWriteStoreV1(schema, pageStore, this);
+      return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this);
     case PARQUET_2_0:
-      return new ColumnWriteStoreV2(schema, pageStore, this);
+      return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this);
     default:
       throw new IllegalArgumentException("unknown version " + writerVersion);
     }
@@ -221,6 +231,14 @@ public class ParquetProperties {
     return bloomFilterExpectedDistinctNumbers;
   }
 
+  public Set<String> getBloomFilterColumns() {
+    return bloomFilterColumns;
+  }
+
+  public int getMaxBloomFilterBytes() {
+    return maxBloomFilterBytes;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -241,6 +259,8 @@ public class ParquetProperties {
     private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
     private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
     private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
+    private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
+    private Set<String> bloomFilterColumns = new HashSet<>();
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
     private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
 
@@ -260,6 +280,8 @@ public class ParquetProperties {
       this.pageRowCountLimit = toCopy.pageRowCountLimit;
       this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
       this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
+      this.bloomFilterColumns = toCopy.bloomFilterColumns;
+      this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
     }
 
     /**
@@ -349,12 +371,34 @@ public class ParquetProperties {
     }
 
     /**
-     * Set Bloom filter info for columns.
+     * Set max Bloom filter bytes for related columns.
+     *
+     * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column.
+     * @return this builder for method chaining
+     */
+    public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) {
+      this.maxBloomFilterBytes = maxBloomFilterBytes;
+      return this;
+    }
+
+    /**
+     * Set Bloom filter column names.
+     *
+     * @param columns the columns which has bloom filter enabled.
+     * @return this builder for method chaining
+     */
+    public Builder withBloomFilterColumnNames(Set<String> columns) {
+      this.bloomFilterColumns = columns;
+      return this;
+    }
+
+    /**
+     * Set expected columns distinct number for Bloom filter.
      *
      * @param columnExpectedNDVs the columns expected number of distinct values in a row group
      * @return this builder for method chaining
      */
-    public Builder withBloomFilterInfo(Map<String, Long> columnExpectedNDVs) {
+    public Builder withBloomFilterColumnNdvs(Map<String, Long> columnExpectedNDVs) {
       this.bloomFilterColumnExpectedNDVs = columnExpectedNDVs;
       return this;
     }
@@ -375,7 +419,8 @@ public class ParquetProperties {
         new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
           estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
-          pageRowCountLimit, pageWriteChecksumEnabled, bloomFilterColumnExpectedNDVs);
+          pageRowCountLimit, pageWriteChecksumEnabled, bloomFilterColumnExpectedNDVs,
+          bloomFilterColumns, maxBloomFilterBytes);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index 953b63e..590c3ed 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -22,6 +22,7 @@ import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
 import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.schema.MessageType;
 
@@ -31,6 +32,12 @@ public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
     super(schema, pageWriteStore, props);
   }
 
+  public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
+                            BloomFilterWriteStore bloomFilterWriteStore,
+                            ParquetProperties props) {
+    super(schema, pageWriteStore, bloomFilterWriteStore, props);
+  }
+
   @Override
   ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
                                       BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 73f6138..b3b799a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.impl;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriter;
@@ -85,14 +86,27 @@ abstract class ColumnWriterBase implements ColumnWriter {
     if (path.getPath().length != 1 || bloomFilterWriter == null) {
       return;
     }
+    String column = path.getPath()[0];
 
     this.bloomFilterWriter = bloomFilterWriter;
+    Set<String> bloomFilterColumns = props.getBloomFilterColumns();
+    if (!bloomFilterColumns.contains(column)) {
+      return;
+    }
+    int maxBloomFilterSize = props.getMaxBloomFilterBytes();
+
     Map<String, Long> bloomFilterColumnExpectedNDVs = props.getBloomFilterColumnExpectedNDVs();
-    String column = path.getPath()[0];
-    if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
-      int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
-        BlockSplitBloomFilter.DEFAULT_FPP);
-      this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits/8);
+    if (bloomFilterColumnExpectedNDVs.size() > 0) {
+      // If user specify the column NDV, we construct Bloom filter from it.
+      if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
+        int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
+          BlockSplitBloomFilter.DEFAULT_FPP);
+
+        this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize);
+      }
+    }
+    else {
+      this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize);
     }
   }
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
index 0aac63e..aa68cc1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
index d8ac0b4..cc9f674 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -19,8 +19,6 @@
 
 package org.apache.parquet.column.values.bloomfilter;
 
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.io.api.Binary;
@@ -42,28 +40,27 @@ public class BlockSplitBloomFilter implements BloomFilter {
   // Bytes in a tiny Bloom filter block.
   private static final int BYTES_PER_BLOCK = 32;
 
-  // Default seed for the hash function. It comes from System.nanoTime().
-  private static final int DEFAULT_SEED = 1361930890;
+  // Bits in a tiny Bloom filter block.
+  private static final int BITS_PER_BLOCK = 256;
 
-  // Minimum Bloom filter size, set to the size of a tiny Bloom filter block
-  public static final int MINIMUM_BYTES = 32;
+  // Default minimum Bloom filter size, set to the size of a tiny Bloom filter block
+  public static final int DEFAULT_MINIMUM_BYTES = 32;
 
-  // Maximum Bloom filter size, set to the default HDFS block size for upper boundary check
-  // This should be re-consider when implementing write side logic.
-  public static final int MAXIMUM_BYTES = 128 * 1024 * 1024;
+  // Default Maximum Bloom filter size, set to 1MB which should cover most cases.
+  public static final int DEFAULT_MAXIMUM_BYTES = 1024 * 1024;
 
   // The number of bits to set in a tiny Bloom filter
   private static final int BITS_SET_PER_BLOCK = 8;
 
-  // The metadata in the header of a serialized Bloom filter is three four-byte values: the number of bytes,
-  // the filter algorithm, and the hash algorithm.
-  public static final int HEADER_SIZE = 12;
+  // The metadata in the header of a serialized Bloom filter is four four-byte values: the number of bytes,
+  // the filter algorithm, the hash algorithm, and the compression.
+  public static final int HEADER_SIZE = 16;
 
   // The default false positive probability value
   public static final double DEFAULT_FPP = 0.01;
 
   // Hash strategy used in this Bloom filter.
-  public final HashStrategy hashStrategy;
+  private final HashStrategy hashStrategy;
 
   // The underlying byte array for Bloom filter bitset.
   private byte[] bitset;
@@ -74,51 +71,84 @@ public class BlockSplitBloomFilter implements BloomFilter {
   // Hash function use to compute hash for column value.
   private HashFunction hashFunction;
 
+  private int maximumBytes = DEFAULT_MAXIMUM_BYTES;
+  private int minimumBytes = DEFAULT_MINIMUM_BYTES;
+
   // The block-based algorithm needs 8 odd SALT values to calculate eight indexes
   // of bits to set, one per 32-bit word.
-  private static final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+  private static final int[] SALT = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
     0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
 
   /**
-   * Constructor of Bloom filter.
+   * Constructor of block-based Bloom filter.
    *
    * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
-   *                 [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down
+   *                 [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
    *                 to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
-   *                 of 2. It uses murmur3_x64_128 as its default hash function.
+   *                 of 2. It uses XXH64 as its default hash function.
    */
   public BlockSplitBloomFilter(int numBytes) {
-    this(numBytes, HashStrategy.MURMUR3_X64_128);
+    this(numBytes, DEFAULT_MAXIMUM_BYTES, HashStrategy.XXH64);
   }
 
   /**
-   * Constructor of block-based Bloom filter. It uses murmur3_x64_128 as its default hash
-   * function.
+   * Constructor of block-based Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
+   *                 to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
+   *                 of 2. It uses XXH64 as its default hash function.
+   * @param maximumBytes The maximum bytes of the Bloom filter.
+   */
+  public BlockSplitBloomFilter(int numBytes, int maximumBytes) {
+    this(numBytes, maximumBytes, HashStrategy.XXH64);
+  }
+
+  /**
+   * Constructor of block-based Bloom filter.
    *
    * @param numBytes The number of bytes for Bloom filter bitset
    * @param hashStrategy The hash strategy of Bloom filter.
    */
   private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) {
+    this(numBytes, DEFAULT_MAXIMUM_BYTES, hashStrategy);
+  }
+
+  /**
+   * Constructor of block-based Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [DEFAULT_MINIMUM_BYTES, maximumBytes], it will be rounded up/down to lower/upper bound if
+   *                 num_bytes is out of range. It will also be rounded up to a power of 2.
+   * @param maximumBytes The maximum bytes of the Bloom filter.
+   * @param hashStrategy The adopted hash strategy of the Bloom filter.
+   */
+  public BlockSplitBloomFilter(int numBytes, int maximumBytes, HashStrategy hashStrategy) {
+    if (maximumBytes > DEFAULT_MINIMUM_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
     initBitset(numBytes);
+
     switch (hashStrategy) {
-      case MURMUR3_X64_128:
+      case XXH64:
         this.hashStrategy = hashStrategy;
-        hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+        hashFunction = new XxHash();
         break;
       default:
         throw new RuntimeException("Unsupported hash strategy");
     }
   }
 
+
   /**
    * Construct the Bloom filter with given bitset, it is used when reconstructing
-   * Bloom filter from parquet file. It use murmur3_x64_128 as its default hash
+   * Bloom filter from parquet file. It use XXH64 as its default hash
    * function.
    *
    * @param bitset The given bitset to construct Bloom filter.
    */
   public BlockSplitBloomFilter(byte[] bitset) {
-    this(bitset, HashStrategy.MURMUR3_X64_128);
+    this(bitset, HashStrategy.XXH64);
   }
 
   /**
@@ -136,9 +166,9 @@ public class BlockSplitBloomFilter implements BloomFilter {
     this.bitset = bitset;
     this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
     switch (hashStrategy) {
-      case MURMUR3_X64_128:
+      case XXH64:
         this.hashStrategy = hashStrategy;
-        hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+        hashFunction = new XxHash();
         break;
       default:
         throw new RuntimeException("Unsupported hash strategy");
@@ -149,21 +179,21 @@ public class BlockSplitBloomFilter implements BloomFilter {
    * Create a new bitset for Bloom filter.
    *
    * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
-   *                 [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down
+   *                 [minimumBytes, maximumBytes], it will be rounded up/down
    *                 to lower/upper bound if num_bytes is out of range and also will rounded up to a power
-   *                 of 2. It uses murmur3_x64_128 as its default hash function and block-based algorithm
+   *                 of 2. It uses XXH64 as its default hash function and block-based algorithm
    *                 as default algorithm.
    */
   private void initBitset(int numBytes) {
-    if (numBytes < MINIMUM_BYTES) {
-      numBytes = MINIMUM_BYTES;
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
     }
     // Get next power of 2 if it is not power of 2.
     if ((numBytes & (numBytes - 1)) != 0) {
       numBytes = Integer.highestOneBit(numBytes) << 1;
     }
-    if (numBytes > MAXIMUM_BYTES || numBytes < 0) {
-      numBytes = MAXIMUM_BYTES;
+    if (numBytes > maximumBytes || numBytes < 0) {
+      numBytes = maximumBytes;
     }
     this.bitset = new byte[numBytes];
     this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
@@ -177,12 +207,14 @@ public class BlockSplitBloomFilter implements BloomFilter {
     out.write(BytesUtils.intToBytes(hashStrategy.value));
     // Write algorithm
     out.write(BytesUtils.intToBytes(Algorithm.BLOCK.value));
+    // Write compression
+    out.write(BytesUtils.intToBytes(Compression.UNCOMPRESSED.value));
     // Write bitset
     out.write(bitset);
   }
 
   private int[] setMask(int key) {
-    int mask[] = new int[BITS_SET_PER_BLOCK];
+    int[] mask = new int[BITS_SET_PER_BLOCK];
 
     for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
       mask[i] = key * SALT[i];
@@ -199,27 +231,31 @@ public class BlockSplitBloomFilter implements BloomFilter {
 
   @Override
   public void insertHash(long hash) {
-    int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1);
+    long numBlocks = bitset.length / BYTES_PER_BLOCK;
+    long lowHash = hash >>> 32;
+    int blockIndex = (int)((lowHash * numBlocks) >> 32);
     int key = (int)hash;
 
     // Calculate mask for bucket.
-    int mask[] = setMask(key);
+    int[] mask = setMask(key);
     for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
-      int value = intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i);
+      int value = intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i);
       value |= mask[i];
-      intBuffer.put(bucketIndex * (BYTES_PER_BLOCK / 4) + i, value);
+      intBuffer.put(blockIndex * (BYTES_PER_BLOCK / 4) + i, value);
     }
   }
 
   @Override
   public boolean findHash(long hash) {
-    int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1);
+    long numBlocks = bitset.length / BYTES_PER_BLOCK;
+    long lowHash = hash >>> 32;
+    int blockIndex = (int)((lowHash * numBlocks) >> 32);
     int key = (int)hash;
 
     // Calculate mask for the tiny Bloom filter.
-    int mask[] = setMask(key);
+    int[] mask = setMask(key);
     for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
-      if (0 == (intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) {
+      if (0 == (intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) {
         return false;
       }
     }
@@ -238,19 +274,19 @@ public class BlockSplitBloomFilter implements BloomFilter {
     Preconditions.checkArgument((p > 0.0 && p < 1.0),
       "FPP should be less than 1.0 and great than 0.0");
     final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8));
-    final double MAX = MAXIMUM_BYTES << 3;
+    final double MAX = DEFAULT_MAXIMUM_BYTES << 3;
     int numBits = (int)m;
 
     // Handle overflow.
     if (m > MAX || m < 0) {
       numBits = (int)MAX;
     }
-    // Get next power of 2 if bits is not power of 2.
-    if ((numBits & (numBits - 1)) != 0) {
-      numBits = Integer.highestOneBit(numBits) << 1;
-    }
-    if (numBits < (MINIMUM_BYTES << 3)) {
-      numBits = MINIMUM_BYTES << 3;
+
+    // Round to BITS_PER_BLOCK
+    numBits = (numBits + BITS_PER_BLOCK -1) & ~BITS_PER_BLOCK;
+
+    if (numBits < (DEFAULT_MINIMUM_BYTES << 3)) {
+      numBits = DEFAULT_MINIMUM_BYTES << 3;
     }
 
     return numBits;
@@ -266,58 +302,58 @@ public class BlockSplitBloomFilter implements BloomFilter {
     ByteBuffer plain;
 
     if (value instanceof Binary) {
-      return hashFunction.hashBytes(((Binary) value).getBytes()).asLong();
+      return hashFunction.hashBytes(((Binary) value).getBytes());
     }
 
     if (value instanceof Integer) {
       plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
-      plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value).intValue());
+      plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value));
     } else if (value instanceof Long) {
       plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
-      plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value).longValue());
+      plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value));
     } else if (value instanceof Float) {
       plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
-      plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value).floatValue());
+      plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value));
     } else if (value instanceof Double) {
       plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE);
-      plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value).doubleValue());
+      plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value));
     } else {
       throw new RuntimeException("Parquet Bloom filter: Not supported type");
     }
 
-    return hashFunction.hashBytes(plain.array()).asLong();
+    return hashFunction.hashByteBuffer(plain);
   }
 
   @Override
   public long hash(int value) {
     ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
     plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
-    return hashFunction.hashBytes(plain.array()).asLong();
+    return hashFunction.hashByteBuffer(plain);
   }
 
   @Override
   public long hash(long value) {
     ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
     plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value);
-    return hashFunction.hashBytes(plain.array()).asLong();
+    return hashFunction.hashByteBuffer(plain);
   }
 
   @Override
   public long hash(double value) {
     ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE);
     plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value);
-    return hashFunction.hashBytes(plain.array()).asLong();
+    return hashFunction.hashByteBuffer(plain);
   }
 
   @Override
   public long hash(float value) {
     ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
     plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value);
-    return hashFunction.hashBytes(plain.array()).asLong();
+    return hashFunction.hashByteBuffer(plain);
   }
 
   @Override
   public long hash(Binary value) {
-    return hashFunction.hashBytes(value.getBytes()).asLong();
+    return hashFunction.hashBytes(value.getBytes());
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
index a6e548f..8b26c97 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -29,9 +29,15 @@ import java.io.OutputStream;
  * a hash strategy and a Bloom filter algorithm.
  */
 public interface BloomFilter {
-  // Bloom filter Hash strategy.
+  /* Bloom filter Hash strategy.
+   *
+   * xxHash is an extremely fast hash algorithm, running at RAM speed limits. It successfully
+   * completes the SMHasher test suite which evaluates collision, dispersion and randomness qualities
+   * of hash functions. It shows good performance advantage from its benchmark result.
+   * (see https://github.com/Cyan4973/xxHash).
+   */
   enum HashStrategy {
-    MURMUR3_X64_128(0);
+    XXH64(0);
     HashStrategy(int value) {
       this.value = value;
     }
@@ -47,6 +53,15 @@ public interface BloomFilter {
     int value;
   }
 
+  // Bloom filter compression.
+  enum Compression {
+    UNCOMPRESSED(0);
+    Compression(int value) {
+      this.value = value;
+    }
+    int value;
+  }
+
   /**
    * Write the Bloom filter to an output stream. It writes the Bloom filter header including the
    * bitset's length in bytes, the hash strategy, the algorithm, and the bitset.
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
index 0fab73b..e2504d8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
@@ -1,5 +1,3 @@
-
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
similarity index 65%
copy from parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
copy to parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
index 0fab73b..2043934 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
@@ -1,5 +1,3 @@
-
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,16 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.parquet.column.values.bloomfilter;
 
-public interface BloomFilterWriter {
+import java.nio.ByteBuffer;
+
+/**
+ * A interface contains a set of hash functions used by Bloom filter.
+ */
+public interface HashFunction {
+
   /**
-   * Write a Bloom filter
-   *
-   * @param bloomFilter the Bloom filter to write
-   *
+   * compute the hash value for a byte array.
+   * @param input the input byte array
+   * @return a result of long value.
    */
-  void writeBloomFilter(BloomFilter bloomFilter);
-}
+  long hashBytes(byte[] input);
 
+  /**
+   * compute the hash value for a ByteBuffer.
+   * @param input the input ByteBuffer
+   * @return a result of long value.
+   */
+  long hashByteBuffer(ByteBuffer input);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
similarity index 65%
copy from parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
copy to parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
index 0fab73b..6c52b3c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
@@ -1,5 +1,3 @@
-
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,13 +19,22 @@
 
 package org.apache.parquet.column.values.bloomfilter;
 
-public interface BloomFilterWriter {
-  /**
-   * Write a Bloom filter
-   *
-   * @param bloomFilter the Bloom filter to write
-   *
-   */
-  void writeBloomFilter(BloomFilter bloomFilter);
-}
+import net.openhft.hashing.LongHashFunction;
+
+import java.nio.ByteBuffer;
 
+/**
+ * The implementation of HashFunction interface. The XxHash uses XXH64 version xxHash
+ * with a seed of 0.
+ */
+public class XxHash implements HashFunction {
+  @Override
+  public long hashBytes(byte[] input) {
+    return LongHashFunction.xx(0).hashBytes(input);
+  }
+
+  @Override
+  public long hashByteBuffer(ByteBuffer input) {
+    return LongHashFunction.xx(0).hashBytes(input);
+  }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
index 8dbb0ba..d75c0e2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -37,11 +37,11 @@ import static org.junit.Assert.assertTrue;
 
 public class TestBlockSplitBloomFilter {
   @Test
-  public void testConstructor () throws IOException {
+  public void testConstructor () {
     BloomFilter bloomFilter1 = new BlockSplitBloomFilter(0);
-    assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.MINIMUM_BYTES);
-    BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.MAXIMUM_BYTES + 1);
-    assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.MAXIMUM_BYTES);
+    assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MINIMUM_BYTES);
+    BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES + 1);
+    assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES);
     BloomFilter bloomFilter3 = new BlockSplitBloomFilter(1000);
     assertEquals(bloomFilter3.getBitsetSize(), 1024);
   }
@@ -55,7 +55,7 @@ public class TestBlockSplitBloomFilter {
    */
   @Test
   public void testBasic () throws IOException {
-    final String testStrings[] = {"hello", "parquet", "bloom", "filter"};
+    final String[] testStrings = {"hello", "parquet", "bloom", "filter"};
     BloomFilter bloomFilter = new BlockSplitBloomFilter(1024);
 
     for(int i = 0; i < testStrings.length; i++) {
@@ -75,17 +75,21 @@ public class TestBlockSplitBloomFilter {
 
     fileInputStream.read(value);
     int hash = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
-    assertEquals(hash, BloomFilter.HashStrategy.MURMUR3_X64_128.ordinal());
+    assertEquals(hash, BloomFilter.HashStrategy.XXH64.ordinal());
 
     fileInputStream.read(value);
     int algorithm = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
     assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal());
 
+    fileInputStream.read(value);
+    int compression = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(compression, BloomFilter.Compression.UNCOMPRESSED.ordinal());
+
     byte[] bitset = new byte[length];
     fileInputStream.read(bitset);
     bloomFilter = new BlockSplitBloomFilter(bitset);
-    for(int i = 0; i < testStrings.length; i++) {
-      assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testStrings[i]))));
+    for (String testString : testStrings) {
+      assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testString))));
     }
   }
 
diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml
index a5dbabb..a05cd2b 100644
--- a/parquet-format-structures/pom.xml
+++ b/parquet-format-structures/pom.xml
@@ -70,7 +70,7 @@
       <plugin>
         <groupId>org.apache.thrift</groupId>
         <artifactId>thrift-maven-plugin</artifactId>
-        <version>${thrift-maven-plugin.version}</version>        
+        <version>${thrift-maven-plugin.version}</version>
         <configuration>
           <thriftSourceRoot>${parquet.thrift.path}</thriftSourceRoot>
           <thriftExecutable>${format.thrift.executable}</thriftExecutable>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 897c7c2..d2e4c96 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -38,6 +38,7 @@ import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
 import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
@@ -49,7 +50,7 @@ import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class ColumnChunkPageWriteStore implements PageWriteStore {
+class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore {
   private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class);
 
   private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
@@ -321,6 +322,11 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     return writers.get(path);
   }
 
+  @Override
+  public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) {
+    return writers.get(path);
+  }
+
   public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
     for (ColumnDescriptor path : schema.getColumns()) {
       ColumnChunkPageWriter pageWriter = writers.get(path);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index c3da323..18ee788 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -64,6 +65,7 @@ class InternalParquetRecordWriter<T> {
 
   private ColumnWriteStore columnStore;
   private ColumnChunkPageWriteStore pageStore;
+  private BloomFilterWriteStore bloomFilterWriteStore;
   private RecordConsumer recordConsumer;
 
   /**
@@ -101,9 +103,12 @@ class InternalParquetRecordWriter<T> {
   }
 
   private void initStore() {
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
-        props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled());
-    columnStore = props.newColumnWriteStore(schema, pageStore);
+    ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor,
+        schema, props.getAllocator(), props.getColumnIndexTruncateLength());
+    pageStore = columnChunkPageWriteStore;
+    bloomFilterWriteStore = columnChunkPageWriteStore;
+
+    columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
     this.recordConsumer = columnIO.getRecordWriter(columnStore);
     writeSupport.prepareForWrite(recordConsumer);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index ba15558..4cd846c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1074,12 +1074,12 @@ public class ParquetFileReader implements Closeable {
     ByteBuffer bloomHeader = ByteBuffer.wrap(bytes);
     IntBuffer headerBuffer = bloomHeader.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
     int numBytes = headerBuffer.get();
-    if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.MAXIMUM_BYTES) {
+    if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES) {
       return null;
     }
 
     BloomFilter.HashStrategy hash = BloomFilter.HashStrategy.values()[headerBuffer.get()];
-    if (hash != BlockSplitBloomFilter.HashStrategy.MURMUR3_X64_128) {
+    if (hash != BlockSplitBloomFilter.HashStrategy.XXH64) {
       return null;
     }
 
@@ -1088,6 +1088,11 @@ public class ParquetFileReader implements Closeable {
       return null;
     }
 
+    BloomFilter.Compression compression = BloomFilter.Compression.values()[headerBuffer.get()];
+    if (compression != BlockSplitBloomFilter.Compression.UNCOMPRESSED) {
+      return null;
+    }
+
     byte[] bitset = new byte[numBytes];
     f.readFully(bitset);
     return new BlockSplitBloomFilter(bitset);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 8741aee..4a72134 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -23,8 +23,11 @@ import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
 import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -148,6 +151,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
   public static final String BLOOM_FILTER_COLUMN_NAMES = "parquet.bloom.filter.column.names";
   public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
+  public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
   public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
   public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
 
@@ -215,6 +219,20 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     return getEnableDictionary(getConfiguration(jobContext));
   }
 
+  public static int getBloomFilterMaxBytes(Configuration conf) {
+    return conf.getInt(BLOOM_FILTER_MAX_BYTES,
+      ParquetProperties.DEFAULT_MAX_BLOOM_FILTER_BYTES);
+  }
+
+  public static Set<String> getBloomFilterColumns(Configuration conf) {
+    String columnNames = conf.get(BLOOM_FILTER_COLUMN_NAMES);
+    if (columnNames != null) {
+      return new HashSet<>(Arrays.asList(columnNames.split(",")));
+    } else {
+      return new HashSet<>();
+    }
+  }
+
   public static Map<String, Long> getBloomFilterColumnExpectedNDVs(Configuration conf) {
     Map<String, Long> kv = new HashMap<>();
     String columnNamesConf = conf.get(BLOOM_FILTER_COLUMN_NAMES);
@@ -443,7 +461,9 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
         .withPageSize(getPageSize(conf))
         .withDictionaryPageSize(getDictionaryPageSize(conf))
         .withDictionaryEncoding(getEnableDictionary(conf))
-        .withBloomFilterInfo(getBloomFilterColumnExpectedNDVs(conf))
+        .withBloomFilterColumnNames(getBloomFilterColumns(conf))
+        .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
+        .withBloomFilterColumnNdvs(getBloomFilterColumnExpectedNDVs(conf))
         .withWriterVersion(getWriterVersion(conf))
         .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
         .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
@@ -469,7 +489,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
       LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
       LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
       LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
-      LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumnExpectedNDVs().keySet());
+      LOG.info("Max Bloom filter size for a column is {}", props.getMaxBloomFilterBytes());
+      LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumns());
       LOG.info("Bloom filter enabled column expected number of distinct values are: {}",
         props.getBloomFilterColumnExpectedNDVs().values());
       LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 7fb7186..638d4e7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,8 @@ package org.apache.parquet.hadoop;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -538,6 +540,21 @@ public class ParquetWriter<T> implements Closeable {
     }
 
     /**
+     * Enables bloom filter column names for the constructed writer.
+     *
+     * @return this builder for method chaining.
+     */
+    public SELF withBloomFilterColumnNames(String... columnNames) {
+      if (columnNames != null) {
+        encodingPropsBuilder.withBloomFilterColumnNames(
+          new HashSet<>(Arrays.asList(columnNames))
+        );
+      }
+
+      return self();
+    }
+
+    /**
      * Set a property that will be available to the read path. For writers that use a Hadoop
      * configuration, this is the recommended way to add configuration values.
      *
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index dd84e63..3de4524 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -222,14 +222,14 @@ public class TestParquetFileWriter {
   }
 
   @Test
-  public void testBloomWriteRead() throws Exception {
+  public void testBloomFilterWriteRead() throws Exception {
     MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
     File testFile = temp.newFile();
     testFile.delete();
     Path path = new Path(testFile.toURI());
     Configuration configuration = new Configuration();
-    configuration.set("parquet.bloomFilter.filter.column.names", "foo");
-    String colPath[] = {"foo"};
+    configuration.set("parquet.bloom.filter.column.names", "foo");
+    String[] colPath = {"foo"};
     ColumnDescriptor col = schema.getColumnDescription(colPath);
     BinaryStatistics stats1 = new BinaryStatistics();
     ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
@@ -239,19 +239,19 @@ public class TestParquetFileWriter {
     w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
     w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
     w.endColumn();
-    BloomFilter bloomData = new BlockSplitBloomFilter(0);
-    bloomData.insertHash(bloomData.hash(Binary.fromString("hello")));
-    bloomData.insertHash(bloomData.hash(Binary.fromString("world")));
-    w.writeBloomFilter(bloomData);
+    BloomFilter blockSplitBloomFilter = new BlockSplitBloomFilter(0);
+    blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("hello")));
+    blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("world")));
+    w.writeBloomFilter(blockSplitBloomFilter);
     w.endBlock();
-    w.end(new HashMap<String, String>());
+    w.end(new HashMap<>());
     ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
     ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
       Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
     BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
     BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
-    assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("hello"))));
-    assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("world"))));
+    assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))));
+    assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world"))));
   }
 
   @Test
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 25c9608..343b1fa 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,8 @@
 package org.apache.parquet.hadoop;
 
 import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -45,8 +47,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import net.openhft.hashing.LongHashFunction;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.example.data.GroupFactory;
 import org.apache.parquet.hadoop.example.ExampleInputFormat;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.schema.GroupType;
@@ -207,4 +212,43 @@ public class TestParquetWriter {
       assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount);
     }
   }
+
+  @Test
+  public void testParquetFileWithBloomFilter() throws IOException {
+    MessageType schema = Types.buildMessage().
+      required(BINARY).as(stringType()).named("name").named("msg");
+
+    String[] testNames = {"hello", "parquet", "bloom", "filter"};
+
+    final int recordCount = testNames.length;
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    GroupFactory factory = new SimpleGroupFactory(schema);
+    File file = temp.newFile();
+    file.delete();
+    Path path = new Path(file.getAbsolutePath());
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+      .withPageRowCountLimit(10)
+      .withConf(conf)
+      .withDictionaryEncoding(false)
+      .withBloomFilterColumnNames("name")
+      .build()) {
+      for (String testName : testNames) {
+        writer.write(factory.newGroup().append("name", testName));
+      }
+    }
+
+    ParquetMetadata footer = readFooter(conf, path, NO_FILTER);
+    ParquetFileReader reader = new ParquetFileReader(
+      conf, footer.getFileMetaData(), path, footer.getBlocks(), schema.getColumns());
+
+    BloomFilter bloomFilter = reader.getBloomFilterDataReader(footer.getBlocks().get(0))
+      .readBloomFilter(footer.getBlocks().get(0).getColumns().get(0));
+
+    for (String name: testNames) {
+      assertTrue(bloomFilter.findHash(
+        LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
+    }
+  }
 }
diff --git a/pom.xml b/pom.xml
index 5dd4aae..6ea28ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@
     <hadoop1.version>1.2.1</hadoop1.version>
     <cascading.version>2.7.1</cascading.version>
     <cascading3.version>3.1.2</cascading3.version>
-    <parquet.format.version>2.7.0-SNAPSHOT</parquet.format.version>
+    <parquet.format.version>2.7.0</parquet.format.version>
     <previous.version>1.7.0</previous.version>
     <thrift.executable>thrift</thrift.executable>
     <format.thrift.executable>thrift</format.thrift.executable>