You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/07 01:45:00 UTC

[jira] [Commented] (PARQUET-1342) Add bloom filter utility class

    [ https://issues.apache.org/jira/browse/PARQUET-1342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16677510#comment-16677510 ] 

ASF GitHub Bot commented on PARQUET-1342:
-----------------------------------------

cjjnjust closed pull request #425: PARQUET-1342:Add bloom filter utility class
URL: https://github.com/apache/parquet-mr/pull/425
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Expressions.java b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Expressions.java
index 06b28b46a..d18ef559f 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Expressions.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Expressions.java
@@ -19,7 +19,7 @@
 
 package org.apache.parquet.cli.util;
 
-import com.google.common.base.Objects;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.avro.Schema;
@@ -385,7 +385,7 @@ public int hashCode() {
 
     @Override
     public String toString() {
-      return Objects.toStringHelper(this)
+      return MoreObjects.toStringHelper(this)
           .add("type", type)
           .add("value", value)
           .add("children", children)
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
new file mode 100644
index 000000000..8d9c1d9fb
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * A Bloom filter is a compact structure to indicate whether an item is not in a set or probably
+ * in a set. BloomFilter class stores a bit set represents a elements set, a hash strategy and a
+ * Bloom filter algorithm.
+ *
+ * This Bloom filter is implemented using block-based Bloom filter algorithm from Putze et al.'s
+ * "Cache-, Hash- and Space-Efficient Bloom filters". The basic idea is to hash the item to a tiny
+ * Bloom filter which size fit a single cache line or smaller. This implementation sets 8 bits in
+ * each tiny Bloom filter. Each tiny Bloom filter is 32 bytes to take advantage of 32-byte SIMD
+ * instruction.
+ */
+
+public class BloomFilter {
+  // Bloom filter Hash strategy .
+  public enum HashStrategy {
+    MURMUR3_X64_128,
+  }
+
+  // Bloom filter algorithm.
+  public enum Algorithm {
+    BLOCK,
+  }
+
+  // Bytes in a tiny Bloom filter block.
+  private static final int BYTES_PER_FILTER_BLOCK = 32;
+
+  // Default seed for hash function, it comes from System.nanoTime().
+  private static final int DEFAULT_SEED = 1361930890;
+
+  // Minimum Bloom filter size, set to size of a tiny Bloom filter block
+  public static final int MINIMUM_BLOOM_FILTER_BYTES = 32;
+
+  // Maximum Bloom filter size, it sets to default HDFS block size for upper boundary check
+  // This should be re-consider when implementing write side logic.
+  public static final int MAXIMUM_BLOOM_FILTER_BYTES = 128 * 1024 * 1024;
+
+  // The number of bits to set in a tiny Bloom filter
+  private static final int BITS_SET_PER_BLOCK = 8;
+
+  // Hash strategy used in this Bloom filter.
+  public final HashStrategy hashStrategy;
+
+  // Algorithm used in this Bloom filter.
+  public final Algorithm algorithm;
+
+  // The underlying byte array for Bloom filter bitset.
+  private byte[] bitset;
+
+  // A integer array buffer of underlying bitset to help setting bits.
+  private IntBuffer intBuffer;
+
+  // Hash function use to compute hash for column value.
+  private HashFunction hashFunction;
+
+  // The block-based algorithm needs 8 odd SALT values to calculate eight index
+  // of bit to set, one bit in 32-bit word.
+  private static final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+    0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
+
+  /**
+   * Constructor of Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [MINIMUM_BLOOM_FILTER_BYTES, MAXIMUM_BLOOM_FILTER_BYTES], it will be rounded up/down
+   *                 to lower/upper bound if num_bytes is out of range and also will rounded up to a power
+   *                 of 2. It uses murmur3_x64_128 as its default hash function and block-based algorithm
+   *                 as default algorithm.
+   */
+  public BloomFilter(int numBytes) {
+    this(numBytes, HashStrategy.MURMUR3_X64_128, Algorithm.BLOCK);
+  }
+
+  /**
+   * Constructor of Bloom filter. It uses murmur3_x64_128 as its default hash
+   * function and block-based algorithm as its default algorithm.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset
+   * @param hashStrategy The hash strategy of Bloom filter.
+   * @param algorithm The algorithm of Bloom filter.
+   */
+  private BloomFilter(int numBytes, HashStrategy hashStrategy, Algorithm algorithm) {
+    initBitset(numBytes);
+
+    switch (hashStrategy) {
+      case MURMUR3_X64_128:
+        this.hashStrategy = hashStrategy;
+        hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+        break;
+      default:
+        throw new RuntimeException("Not supported hash strategy");
+    }
+
+    this.algorithm = algorithm;
+  }
+
+
+  /**
+   * Construct the Bloom filter with given bitset, it is used when reconstructing
+   * Bloom filter from parquet file. It use murmur3_x64_128 as its default hash
+   * function and block-based algorithm as default algorithm.
+   *
+   * @param bitset The given bitset to construct Bloom filter.
+   */
+  public BloomFilter(byte[] bitset) {
+    this(bitset, HashStrategy.MURMUR3_X64_128, Algorithm.BLOCK);
+  }
+
+  /**
+   * Construct the Bloom filter with given bitset, it is used when reconstructing
+   * Bloom filter from parquet file.
+   *
+   * @param bitset The given bitset to construct Bloom filter.
+   * @param hashStrategy The hash strategy Bloom filter apply.
+   * @param algorithm The algorithm of Bloom filter.
+   */
+  private BloomFilter(byte[] bitset, HashStrategy hashStrategy, Algorithm algorithm) {
+    if (bitset == null) {
+      throw new RuntimeException("Given bitset is null");
+    }
+    this.bitset = bitset;
+    this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+
+    switch (hashStrategy) {
+      case MURMUR3_X64_128:
+        this.hashStrategy = hashStrategy;
+        hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+        break;
+      default:
+        throw new RuntimeException("Not supported hash strategy");
+    }
+    this.algorithm = algorithm;
+  }
+
+  /**
+   * Create a new bitset for Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [MINIMUM_BLOOM_FILTER_BYTES, MAXIMUM_BLOOM_FILTER_BYTES], it will be rounded up/down
+   *                 to lower/upper bound if num_bytes is out of range and also will rounded up to a power
+   *                 of 2. It uses murmur3_x64_128 as its default hash function and block-based algorithm
+   *                 as default algorithm.
+   */
+  private void initBitset(int numBytes) {
+    if (numBytes < MINIMUM_BLOOM_FILTER_BYTES) {
+      numBytes = MINIMUM_BLOOM_FILTER_BYTES;
+    }
+
+    // Get next power of 2 if it is not power of 2.
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes) << 1;
+    }
+
+    if (numBytes > MAXIMUM_BLOOM_FILTER_BYTES || numBytes < 0) {
+      numBytes = MAXIMUM_BLOOM_FILTER_BYTES;
+    }
+
+    this.bitset = new byte[numBytes];
+    this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+  }
+
+  /**
+   * Write the Bloom filter to an output stream. It writes the Bloom filter header includes the
+   * bitset's length in size of byte, the hash strategy, the algorithm, and the bitset.
+   *
+   * @param out the output stream to write
+   */
+  public void writeTo(OutputStream out) throws IOException {
+    // Write number of bytes of bitset.
+    out.write(BytesUtils.intToBytes(bitset.length));
+
+    // Write hash strategy
+    out.write(BytesUtils.intToBytes(this.hashStrategy.ordinal()));
+
+    // Write algorithm
+    out.write(BytesUtils.intToBytes(this.algorithm.ordinal()));
+
+    // Write bitset
+    out.write(bitset);
+  }
+
+  private int[] setMask(int key) {
+    int mask[] = new int[BITS_SET_PER_BLOCK];
+
+    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+      mask[i] = key * SALT[i];
+    }
+
+    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+      mask[i] = mask[i] >>> 27;
+    }
+
+    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+      mask[i] = 0x1 << mask[i];
+    }
+
+    return mask;
+  }
+
+  /**
+   * Add an element to Bloom filter, the element content is represented by
+   * the hash value of its plain encoding result.
+   *
+   * @param hash the hash result of element.
+   */
+  public void insert(long hash) {
+    int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_FILTER_BLOCK - 1);
+    int key = (int)hash;
+
+    // Calculate mask for bucket.
+    int mask[] = setMask(key);
+
+    for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+      int value = intBuffer.get(bucketIndex * (BYTES_PER_FILTER_BLOCK / 4) + i);
+      value |= mask[i];
+      intBuffer.put(bucketIndex * (BYTES_PER_FILTER_BLOCK / 4) + i, value);
+    }
+  }
+
+  /**
+   * Determine whether an element is in set or not.
+   *
+   * @param hash the hash value of element plain encoding result.
+   * @return false if element is must not in set, true if element probably in set.
+   */
+  public boolean find(long hash) {
+    int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_FILTER_BLOCK - 1);
+    int key = (int)hash;
+
+    // Calculate mask for the tiny Bloom filter.
+    int mask[] = setMask(key);
+
+    for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+      if (0 == (intBuffer.get(bucketIndex * (BYTES_PER_FILTER_BLOCK / 4) + i) & mask[i])) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Calculate optimal size according to the number of distinct values and false positive probability.
+   *
+   * @param n: The number of distinct values.
+   * @param p: The false positive probability.
+   * @return optimal number of bits of given n and p.
+   */
+  public static int optimalNumOfBits(long n, double p) {
+    Preconditions.checkArgument((p > 0.0 && p < 1.0),
+      "FPP should be less than 1.0 and great than 0.0");
+
+    final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8));
+    final double MAX = MAXIMUM_BLOOM_FILTER_BYTES << 3;
+    int numBits = (int)m;
+
+    // Handle overflow.
+    if (m > MAX || m < 0) {
+      numBits = (int)MAX;
+    }
+
+    // Get next power of 2 if bits is not power of 2.
+    if ((numBits & (numBits - 1)) != 0) {
+      numBits = Integer.highestOneBit(numBits) << 1;
+    }
+
+    if (numBits < (MINIMUM_BLOOM_FILTER_BYTES << 3)) {
+      numBits = MINIMUM_BLOOM_FILTER_BYTES << 3;
+    }
+
+    return numBits;
+  }
+
+  /**
+   * Get the number of bytes for bitset in this Bloom filter.
+   *
+   * @return The number of bytes for bitset in this Bloom filter.
+   */
+  public long getBitsetSize() {
+    return bitset.length;
+  }
+
+  /**
+   * Compute hash for int value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  public long hash(int value) {
+    ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
+    return hashFunction.hashBytes(plain.array()).asLong();
+  }
+
+  /**
+   * Compute hash for long value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  public long hash(long value) {
+    ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value);
+    return hashFunction.hashBytes(plain.array()).asLong();
+  }
+
+  /**
+   * Compute hash for double value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  public long hash(double value) {
+    ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value);
+    return hashFunction.hashBytes(plain.array()).asLong();
+  }
+
+  /**
+   * Compute hash for float value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  public long hash(float value) {
+    ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value);
+    return hashFunction.hashBytes(plain.array()).asLong();
+  }
+
+  /**
+   * Compute hash for Binary value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  public long hash(Binary value) {
+      return hashFunction.hashBytes(value.toByteBuffer()).asLong();
+  }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBloomFilter.java
new file mode 100644
index 000000000..ab4d89bdd
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBloomFilter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.parquet.column.values.RandomStr;
+import org.apache.parquet.io.api.Binary;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBloomFilter {
+
+  @Test
+  public void testConstructor () throws IOException {
+    BloomFilter bloomFilter1 = new BloomFilter(0);
+    assertEquals(bloomFilter1.getBitsetSize(), BloomFilter.MINIMUM_BLOOM_FILTER_BYTES);
+
+    BloomFilter bloomFilter2 = new BloomFilter(256 * 1024 * 1024);
+    assertEquals(bloomFilter2.getBitsetSize(), BloomFilter.MAXIMUM_BLOOM_FILTER_BYTES);
+
+    BloomFilter bloomFilter3 = new BloomFilter(1000);
+    assertEquals(bloomFilter3.getBitsetSize(), 1024);
+  }
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+  /*
+   * This test is used to test basic operations including inserting, finding and
+   * serializing and de-serializing.
+   */
+  @Test
+  public void testBasic () throws IOException {
+    final String testStrings[] = {"hello", "parquet", "bloom", "filter"};
+    BloomFilter bloomFilter = new BloomFilter(1024);
+
+    for(int i = 0; i < testStrings.length; i++) {
+      bloomFilter.insert(bloomFilter.hash(Binary.fromString(testStrings[i])));
+    }
+
+    File testFile = temp.newFile();
+    FileOutputStream fileOutputStream = new FileOutputStream(testFile);
+    bloomFilter.writeTo(fileOutputStream);
+    fileOutputStream.close();
+
+    FileInputStream fileInputStream = new FileInputStream(testFile);
+
+    byte[] value = new byte[4];
+
+    fileInputStream.read(value);
+    int length = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(length, 1024);
+
+    fileInputStream.read(value);
+    int hash = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(hash, BloomFilter.HashStrategy.MURMUR3_X64_128.ordinal());
+
+    fileInputStream.read(value);
+    int algorithm = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal());
+
+    byte[] bitset = new byte[length];
+    fileInputStream.read(bitset);
+    bloomFilter = new BloomFilter(bitset);
+
+    for(int i = 0; i < testStrings.length; i++) {
+      assertTrue(bloomFilter.find(bloomFilter.hash(Binary.fromString(testStrings[i]))));
+    }
+  }
+
+  @Test
+  public void testFPP() throws IOException {
+    final int totalCount = 100000;
+    final double FPP = 0.01;
+    final long SEED = 104729;
+
+    BloomFilter bloomFilter = new BloomFilter(BloomFilter.optimalNumOfBits(totalCount, FPP));
+    List<String> strings = new ArrayList<>();
+    RandomStr randomStr = new RandomStr(new Random(SEED));
+    for(int i = 0; i < totalCount; i++) {
+      String str = randomStr.get(10);
+      strings.add(str);
+      bloomFilter.insert(bloomFilter.hash(Binary.fromString(str)));
+    }
+
+    // The exist counts the number of times FindHash returns true.
+    int exist = 0;
+    for (int i = 0; i < totalCount; i++) {
+      String str = randomStr.get(8);
+      if (bloomFilter.find(bloomFilter.hash(Binary.fromString(str)))) {
+        exist ++;
+      }
+    }
+
+    // The exist should be probably less than 1000 according FPP 0.01.
+    assertTrue(exist < totalCount * FPP);
+  }
+}
diff --git a/pom.xml b/pom.xml
index 7b3f36fe5..21c399d3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,7 @@
     <semver.api.version>0.9.33</semver.api.version>
     <slf4j.version>1.7.22</slf4j.version>
     <avro.version>1.8.2</avro.version>
-    <guava.version>20.0</guava.version>
+    <guava.version>23.0</guava.version>
     <brotli-codec.version>0.1.1</brotli-codec.version>
     <mockito.version>1.10.19</mockito.version>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add bloom filter utility class
> ------------------------------
>
>                 Key: PARQUET-1342
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1342
>             Project: Parquet
>          Issue Type: Sub-task
>          Components: parquet-mr
>            Reporter: Junjie Chen
>            Assignee: Junjie Chen
>            Priority: Major
>              Labels: pull-request-available
>
> This Jira is used to track bloom filter utility class in parquet-mr.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)