You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/08 00:35:00 UTC

[GitHub] leventov commented on a change in pull request #6584: fix druid-bloom-filter thread-safety

leventov commented on a change in pull request #6584: fix druid-bloom-filter thread-safety
URL: https://github.com/apache/incubator-druid/pull/6584#discussion_r231730162
 
 

 ##########
 File path: extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java
 ##########
 @@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.hive.common.util.Murmur3;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+/**
+ * This is a direct modification of the Apache Hive 'BloomKFilter', found at:
+ * https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
+ * modified to store variables which are re-used instead of re-allocated per call as {@link ThreadLocal} so multiple
+ * threads can share the same filter object. Note that this is snapshot at hive-storag-api version 2.7.0, 3.2.0+ break
+ * compatibility with how int/float are stored in a bloom filter in this commit:
+ * https://github.com/apache/hive/commit/87ce36b458350db141c4cb4b6336a9a01796370f#diff-e65fc506757ee058dc951d15a9a526c3L238
+ * as part of this issue https://issues.apache.org/jira/browse/HIVE-20101
+ * begin copy-pasta:
+ *
+ * BloomKFilter is variation of {@link org.apache.hive.common.util.BloomFilter}. Unlike BloomFilter, BloomKFilter will spread
+ * 'k' hash bits within same cache line for better L1 cache performance. The way it works is,
+ * First hash code is computed from key which is used to locate the block offset (n-longs in bitset constitute a block)
+ * Subsequent 'k' hash codes are used to spread hash bits within the block. By default block size is chosen as 8,
+ * which is to match cache line size (8 longs = 64 bytes = cache line size).
+ * Refer {@link BloomKFilter#addBytes(byte[])} for more info.
+ *
+ * This implementation has much lesser L1 data cache misses than {@link org.apache.hive.common.util.BloomFilter}.
+ */
+public class BloomKFilter
+{
+  public static final float DEFAULT_FPP = 0.05f;
+  // Given a byte array consisting of a serialized BloomKFilter, gives the offset (from 0)
+  // for the start of the serialized long values that make up the bitset.
+  // NumHashFunctions (1 byte) + bitset array length (4 bytes)
+  public static final int START_OF_SERIALIZED_LONGS = 5;
+  private static final int DEFAULT_BLOCK_SIZE = 8;
+  private static final int DEFAULT_BLOCK_SIZE_BITS = (int) (Math.log(DEFAULT_BLOCK_SIZE) / Math.log(2));
+  private static final int DEFAULT_BLOCK_OFFSET_MASK = DEFAULT_BLOCK_SIZE - 1;
+  private static final int DEFAULT_BIT_OFFSET_MASK = Long.SIZE - 1;
+  private final ThreadLocal<byte[]> BYTE_ARRAY_4 = ThreadLocal.withInitial(() -> new byte[4]);
+  private final ThreadLocal<long[]> masks = ThreadLocal.withInitial(() -> new long[DEFAULT_BLOCK_SIZE]);
+  private final BitSet bitSet;
+  private final int m;
+  private final int k;
+  // spread k-1 bits to adjacent longs, default is 8
+  // spreading hash bits within blockSize * longs will make bloom filter L1 cache friendly
+  // default block size is set to 8 as most cache line sizes are 64 bytes and also AVX512 friendly
+  private final int totalBlockCount;
+
+  public BloomKFilter(long maxNumEntries)
+  {
+    checkArgument(maxNumEntries > 0, "expectedEntries should be > 0");
+    long numBits = optimalNumOfBits(maxNumEntries, DEFAULT_FPP);
+    this.k = optimalNumOfHashFunctions(maxNumEntries, numBits);
+    int nLongs = (int) Math.ceil((double) numBits / (double) Long.SIZE);
+    // additional bits to pad long array to block size
+    int padLongs = DEFAULT_BLOCK_SIZE - nLongs % DEFAULT_BLOCK_SIZE;
+    this.m = (nLongs + padLongs) * Long.SIZE;
+    this.bitSet = new BitSet(m);
+    checkArgument((bitSet.data.length % DEFAULT_BLOCK_SIZE) == 0, "bitSet has to be block aligned");
+    this.totalBlockCount = bitSet.data.length / DEFAULT_BLOCK_SIZE;
+  }
+
+  /**
+   * A constructor to support rebuilding the BloomFilter from a serialized representation.
+   *
+   * @param bits
+   * @param numFuncs
+   */
+  public BloomKFilter(long[] bits, int numFuncs)
+  {
+    super();
+    bitSet = new BitSet(bits);
+    this.m = bits.length * Long.SIZE;
+    this.k = numFuncs;
+    checkArgument((bitSet.data.length % DEFAULT_BLOCK_SIZE) == 0, "bitSet has to be block aligned");
+    this.totalBlockCount = bitSet.data.length / DEFAULT_BLOCK_SIZE;
+  }
+
+  static void checkArgument(boolean expression, String message)
+  {
+    if (!expression) {
+      throw new IllegalArgumentException(message);
+    }
+  }
+
+  static int optimalNumOfHashFunctions(long n, long m)
+  {
+    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
+  }
+
+  static long optimalNumOfBits(long n, double p)
+  {
+    return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
+  }
+
+  /**
+   * Serialize a bloom filter
+   *
+   * @param out         output stream to write to
+   * @param bloomFilter BloomKFilter that needs to be seralized
+   */
+  public static void serialize(OutputStream out, BloomKFilter bloomFilter) throws IOException
+  {
+    /**
+     * Serialized BloomKFilter format:
+     * 1 byte for the number of hash functions.
+     * 1 big endian int(That is how OutputStream works) for the number of longs in the bitset
+     * big endina longs in the BloomKFilter bitset
+     */
+    DataOutputStream dataOutputStream = new DataOutputStream(out);
+    dataOutputStream.writeByte(bloomFilter.k);
+    dataOutputStream.writeInt(bloomFilter.getBitSet().length);
+    for (long value : bloomFilter.getBitSet()) {
+      dataOutputStream.writeLong(value);
+    }
+  }
+
+  /**
+   * Deserialize a bloom filter
+   * Read a byte stream, which was written by {@linkplain #serialize(OutputStream, BloomKFilter)}
+   * into a {@code BloomKFilter}
+   *
+   * @param in input bytestream
+   *
+   * @return deserialized BloomKFilter
+   */
+  public static BloomKFilter deserialize(InputStream in) throws IOException
+  {
+    if (in == null) {
+      throw new IOException("Input stream is null");
+    }
+
+    try {
+      DataInputStream dataInputStream = new DataInputStream(in);
+      int numHashFunc = dataInputStream.readByte();
+      int bitsetArrayLen = dataInputStream.readInt();
+      long[] data = new long[bitsetArrayLen];
+      for (int i = 0; i < bitsetArrayLen; i++) {
+        data[i] = dataInputStream.readLong();
+      }
+      return new BloomKFilter(data, numHashFunc);
+    }
+    catch (RuntimeException e) {
+      IOException io = new IOException("Unable to deserialize BloomKFilter");
+      io.initCause(e);
+      throw io;
+    }
+  }
+
+  /**
+   * Merges BloomKFilter bf2 into bf1.
+   * Assumes 2 BloomKFilters with the same size/hash functions are serialized to byte arrays
+   *
+   * @param bf1Bytes
+   * @param bf1Start
+   * @param bf1Length
+   * @param bf2Bytes
+   * @param bf2Start
+   * @param bf2Length
+   */
+  public static void mergeBloomFilterBytes(
+      byte[] bf1Bytes,
+      int bf1Start,
+      int bf1Length,
+      byte[] bf2Bytes,
+      int bf2Start,
+      int bf2Length
+  )
+  {
+    if (bf1Length != bf2Length) {
+      throw new IllegalArgumentException("bf1Length " + bf1Length + " does not match bf2Length " + bf2Length);
+    }
+
+    // Validation on the bitset size/3 hash functions.
+    for (int idx = 0; idx < START_OF_SERIALIZED_LONGS; ++idx) {
+      if (bf1Bytes[bf1Start + idx] != bf2Bytes[bf2Start + idx]) {
+        throw new IllegalArgumentException("bf1 NumHashFunctions/NumBits does not match bf2");
+      }
+    }
+
+    // Just bitwise-OR the bits together - size/# functions should be the same,
+    // rest of the data is serialized long values for the bitset which are supposed to be bitwise-ORed.
+    for (int idx = START_OF_SERIALIZED_LONGS; idx < bf1Length; ++idx) {
+      bf1Bytes[bf1Start + idx] |= bf2Bytes[bf2Start + idx];
+    }
+  }
+
+  public void add(byte[] val)
+  {
+    addBytes(val);
+  }
+
+  public void addBytes(byte[] val, int offset, int length)
+  {
+    // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter"
+    // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively
+    // implement a Bloom filter without any loss in the asymptotic false positive probability'
+
+    // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned
+    // in the above paper
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+                  Murmur3.hash64(val, offset, length);
+    addHash(hash64);
+  }
+
+  public void addBytes(byte[] val)
+  {
+    addBytes(val, 0, val.length);
+  }
+
+  private void addHash(long hash64)
+  {
+    final int hash1 = (int) hash64;
+    final int hash2 = (int) (hash64 >>> 32);
+
+    int firstHash = hash1 + hash2;
+    // hashcode should be positive, flip all the bits if it's negative
+    if (firstHash < 0) {
+      firstHash = ~firstHash;
+    }
+
+    // first hash is used to locate start of the block (blockBaseOffset)
+    // subsequent K hashes are used to generate K bits within a block of words
+    final int blockIdx = firstHash % totalBlockCount;
+    final int blockBaseOffset = blockIdx << DEFAULT_BLOCK_SIZE_BITS;
+    for (int i = 1; i <= k; i++) {
+      int combinedHash = hash1 + ((i + 1) * hash2);
+      // hashcode should be positive, flip all the bits if it's negative
+      if (combinedHash < 0) {
+        combinedHash = ~combinedHash;
+      }
+      // LSB 3 bits is used to locate offset within the block
+      final int absOffset = blockBaseOffset + (combinedHash & DEFAULT_BLOCK_OFFSET_MASK);
+      // Next 6 bits are used to locate offset within a long/word
+      final int bitPos = (combinedHash >>> DEFAULT_BLOCK_SIZE_BITS) & DEFAULT_BIT_OFFSET_MASK;
+      bitSet.data[absOffset] |= (1L << bitPos);
+    }
+  }
+
+  public void addString(String val)
+  {
+    addBytes(StringUtils.toUtf8(val));
+  }
+
+  public void addByte(byte val)
+  {
+    addBytes(new byte[]{val});
+  }
+
+  public void addInt(int val)
+  {
+    // puts int in little endian order
+    addBytes(intToByteArrayLE(val));
+  }
+
+  public void addLong(long val)
+  {
+    // puts long in little endian order
+    addHash(Murmur3.hash64(val));
+  }
+
+  public void addFloat(float val)
+  {
+    addInt(Float.floatToIntBits(val));
+  }
+
+  public void addDouble(double val)
+  {
+    addLong(Double.doubleToLongBits(val));
+  }
+
+  public boolean test(byte[] val)
+  {
+    return testBytes(val);
+  }
+
+  public boolean testBytes(byte[] val)
+  {
+    return testBytes(val, 0, val.length);
+  }
+
+  public boolean testBytes(byte[] val, int offset, int length)
+  {
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
 
 Review comment:
   Particular reason why Murmur3 is used? xxHash from https://github.com/OpenHFT/Zero-Allocation-Hashing is much faster and allows to hash for example integers directly garbage-free, without resorting to ThreadLocals such as `BYTE_ARRAY_4`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org