You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/11/13 22:29:43 UTC

[incubator-druid] branch 0.13.0-incubating updated: fix druid-bloom-filter thread-safety (#6584) (#6619)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.13.0-incubating by this push:
     new 1887e75  fix druid-bloom-filter thread-safety (#6584) (#6619)
1887e75 is described below

commit 1887e752fc4ad7dbd2aca2f6bf331143e93d020e
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Tue Nov 13 14:29:35 2018 -0800

    fix druid-bloom-filter thread-safety (#6584) (#6619)
    
    * use BloomFilter instead of BloomKFilter since the latters test method is not threadsafe
    
    * fix formatting
    
    * style and forbidden api
    
    * remove redundant hive notice entry
    
    * add todo with note to delete copied implementation and link to related hive jira
    
    * better fix for masks than ThreadLocal
---
 NOTICE                                             |   4 +-
 .../druid/guice/BloomFilterSerializersModule.java  |   2 +-
 .../apache/druid/query/filter/BloomDimFilter.java  |   1 -
 .../apache/druid/query/filter/BloomKFilter.java    | 524 +++++++++++++++++++++
 .../druid/query/filter/BloomKFilterHolder.java     |   1 -
 .../druid/query/filter/BloomDimFilterTest.java     |  74 ++-
 6 files changed, 600 insertions(+), 6 deletions(-)

diff --git a/NOTICE b/NOTICE
index 92d1328..8207dac 100644
--- a/NOTICE
+++ b/NOTICE
@@ -12,7 +12,7 @@ This product contains a modified version of Andrew Duffy's java-alphanum library
   * HOMEPAGE:
     * https://github.com/amjjd/java-alphanum
 
-This product contains conjunctive normal form conversion code and a variance aggregator algorithm adapted from Apache Hive
+This product contains conjunctive normal form conversion code, a variance aggregator algorithm, and bloom filter adapted from Apache Hive
   * LICENSE:
     * https://github.com/apache/hive/blob/branch-2.0/LICENSE (Apache License, Version 2.0)
   * HOMEPAGE:
@@ -91,4 +91,4 @@ This product contains modified versions of the Dockerfile and related configurat
   * HOMEPAGE:
     * https://github.com/sequenceiq/hadoop-docker/
   * COMMIT TAG:
-    * update this when this patch is committed
\ No newline at end of file
+    * update this when this patch is committed
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java
index 011e091..7318ea4 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java
@@ -27,8 +27,8 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import org.apache.druid.query.filter.BloomDimFilter;
+import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.query.filter.BloomKFilterHolder;
-import org.apache.hive.common.util.BloomKFilter;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java
index 383488c..f235b59 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java
@@ -30,7 +30,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.segment.filter.DimensionPredicateFilter;
-import org.apache.hive.common.util.BloomKFilter;
 
 import java.util.HashSet;
 
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java
new file mode 100644
index 0000000..3733145
--- /dev/null
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.hive.common.util.Murmur3;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+/**
+ * This is a direct modification of the Apache Hive 'BloomKFilter', found at:
+ * https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
+ * modified to store variables which are re-used instead of re-allocated per call as {@link ThreadLocal} so multiple
+ * threads can share the same filter object. Note that this is snapshot at hive-storag-api version 2.7.0, latest
+ * versions break compatibility with how int/float are stored in a bloom filter in this commit:
+ * https://github.com/apache/hive/commit/87ce36b458350db141c4cb4b6336a9a01796370f#diff-e65fc506757ee058dc951d15a9a526c3L238
+ * and this linked issue https://issues.apache.org/jira/browse/HIVE-20101.
+ *
+ * Todo: remove this and begin using hive-storage-api version again once https://issues.apache.org/jira/browse/HIVE-20893 is released
+ *
+ * begin copy-pasta:
+ *
+ * BloomKFilter is variation of {@link org.apache.hive.common.util.BloomFilter}. Unlike BloomFilter, BloomKFilter will spread
+ * 'k' hash bits within same cache line for better L1 cache performance. The way it works is,
+ * First hash code is computed from key which is used to locate the block offset (n-longs in bitset constitute a block)
+ * Subsequent 'k' hash codes are used to spread hash bits within the block. By default block size is chosen as 8,
+ * which is to match cache line size (8 longs = 64 bytes = cache line size).
+ * Refer {@link BloomKFilter#addBytes(byte[])} for more info.
+ *
+ * This implementation has much lesser L1 data cache misses than {@link org.apache.hive.common.util.BloomFilter}.
+ */
+public class BloomKFilter
+{
+  public static final float DEFAULT_FPP = 0.05f;
+  // Given a byte array consisting of a serialized BloomKFilter, gives the offset (from 0)
+  // for the start of the serialized long values that make up the bitset.
+  // NumHashFunctions (1 byte) + bitset array length (4 bytes)
+  public static final int START_OF_SERIALIZED_LONGS = 5;
+  private static final int DEFAULT_BLOCK_SIZE = 8;
+  private static final int DEFAULT_BLOCK_SIZE_BITS = (int) (Math.log(DEFAULT_BLOCK_SIZE) / Math.log(2));
+  private static final int DEFAULT_BLOCK_OFFSET_MASK = DEFAULT_BLOCK_SIZE - 1;
+  private static final int DEFAULT_BIT_OFFSET_MASK = Long.SIZE - 1;
+  private final ThreadLocal<byte[]> BYTE_ARRAY_4 = ThreadLocal.withInitial(() -> new byte[4]);
+  private final BitSet bitSet;
+  private final int m;
+  private final int k;
+  // spread k-1 bits to adjacent longs, default is 8
+  // spreading hash bits within blockSize * longs will make bloom filter L1 cache friendly
+  // default block size is set to 8 as most cache line sizes are 64 bytes and also AVX512 friendly
+  private final int totalBlockCount;
+
+  public BloomKFilter(long maxNumEntries)
+  {
+    checkArgument(maxNumEntries > 0, "expectedEntries should be > 0");
+    long numBits = optimalNumOfBits(maxNumEntries, DEFAULT_FPP);
+    this.k = optimalNumOfHashFunctions(maxNumEntries, numBits);
+    int nLongs = (int) Math.ceil((double) numBits / (double) Long.SIZE);
+    // additional bits to pad long array to block size
+    int padLongs = DEFAULT_BLOCK_SIZE - nLongs % DEFAULT_BLOCK_SIZE;
+    this.m = (nLongs + padLongs) * Long.SIZE;
+    this.bitSet = new BitSet(m);
+    checkArgument((bitSet.data.length % DEFAULT_BLOCK_SIZE) == 0, "bitSet has to be block aligned");
+    this.totalBlockCount = bitSet.data.length / DEFAULT_BLOCK_SIZE;
+  }
+
+  /**
+   * A constructor to support rebuilding the BloomFilter from a serialized representation.
+   *
+   * @param bits
+   * @param numFuncs
+   */
+  public BloomKFilter(long[] bits, int numFuncs)
+  {
+    super();
+    bitSet = new BitSet(bits);
+    this.m = bits.length * Long.SIZE;
+    this.k = numFuncs;
+    checkArgument((bitSet.data.length % DEFAULT_BLOCK_SIZE) == 0, "bitSet has to be block aligned");
+    this.totalBlockCount = bitSet.data.length / DEFAULT_BLOCK_SIZE;
+  }
+
+  static void checkArgument(boolean expression, String message)
+  {
+    if (!expression) {
+      throw new IllegalArgumentException(message);
+    }
+  }
+
+  static int optimalNumOfHashFunctions(long n, long m)
+  {
+    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
+  }
+
+  static long optimalNumOfBits(long n, double p)
+  {
+    return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
+  }
+
+  /**
+   * Serialize a bloom filter
+   *
+   * @param out         output stream to write to
+   * @param bloomFilter BloomKFilter that needs to be seralized
+   */
+  public static void serialize(OutputStream out, BloomKFilter bloomFilter) throws IOException
+  {
+    /**
+     * Serialized BloomKFilter format:
+     * 1 byte for the number of hash functions.
+     * 1 big endian int(That is how OutputStream works) for the number of longs in the bitset
+     * big endina longs in the BloomKFilter bitset
+     */
+    DataOutputStream dataOutputStream = new DataOutputStream(out);
+    dataOutputStream.writeByte(bloomFilter.k);
+    dataOutputStream.writeInt(bloomFilter.getBitSet().length);
+    for (long value : bloomFilter.getBitSet()) {
+      dataOutputStream.writeLong(value);
+    }
+  }
+
+  /**
+   * Deserialize a bloom filter
+   * Read a byte stream, which was written by {@linkplain #serialize(OutputStream, BloomKFilter)}
+   * into a {@code BloomKFilter}
+   *
+   * @param in input bytestream
+   *
+   * @return deserialized BloomKFilter
+   */
+  public static BloomKFilter deserialize(InputStream in) throws IOException
+  {
+    if (in == null) {
+      throw new IOException("Input stream is null");
+    }
+
+    try {
+      DataInputStream dataInputStream = new DataInputStream(in);
+      int numHashFunc = dataInputStream.readByte();
+      int bitsetArrayLen = dataInputStream.readInt();
+      long[] data = new long[bitsetArrayLen];
+      for (int i = 0; i < bitsetArrayLen; i++) {
+        data[i] = dataInputStream.readLong();
+      }
+      return new BloomKFilter(data, numHashFunc);
+    }
+    catch (RuntimeException e) {
+      IOException io = new IOException("Unable to deserialize BloomKFilter");
+      io.initCause(e);
+      throw io;
+    }
+  }
+
+  /**
+   * Merges BloomKFilter bf2 into bf1.
+   * Assumes 2 BloomKFilters with the same size/hash functions are serialized to byte arrays
+   *
+   * @param bf1Bytes
+   * @param bf1Start
+   * @param bf1Length
+   * @param bf2Bytes
+   * @param bf2Start
+   * @param bf2Length
+   */
+  public static void mergeBloomFilterBytes(
+      byte[] bf1Bytes,
+      int bf1Start,
+      int bf1Length,
+      byte[] bf2Bytes,
+      int bf2Start,
+      int bf2Length
+  )
+  {
+    if (bf1Length != bf2Length) {
+      throw new IllegalArgumentException("bf1Length " + bf1Length + " does not match bf2Length " + bf2Length);
+    }
+
+    // Validation on the bitset size/3 hash functions.
+    for (int idx = 0; idx < START_OF_SERIALIZED_LONGS; ++idx) {
+      if (bf1Bytes[bf1Start + idx] != bf2Bytes[bf2Start + idx]) {
+        throw new IllegalArgumentException("bf1 NumHashFunctions/NumBits does not match bf2");
+      }
+    }
+
+    // Just bitwise-OR the bits together - size/# functions should be the same,
+    // rest of the data is serialized long values for the bitset which are supposed to be bitwise-ORed.
+    for (int idx = START_OF_SERIALIZED_LONGS; idx < bf1Length; ++idx) {
+      bf1Bytes[bf1Start + idx] |= bf2Bytes[bf2Start + idx];
+    }
+  }
+
+  public void add(byte[] val)
+  {
+    addBytes(val);
+  }
+
+  public void addBytes(byte[] val, int offset, int length)
+  {
+    // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter"
+    // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively
+    // implement a Bloom filter without any loss in the asymptotic false positive probability'
+
+    // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned
+    // in the above paper
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+                  Murmur3.hash64(val, offset, length);
+    addHash(hash64);
+  }
+
+  public void addBytes(byte[] val)
+  {
+    addBytes(val, 0, val.length);
+  }
+
+  private void addHash(long hash64)
+  {
+    final int hash1 = (int) hash64;
+    final int hash2 = (int) (hash64 >>> 32);
+
+    int firstHash = hash1 + hash2;
+    // hashcode should be positive, flip all the bits if it's negative
+    if (firstHash < 0) {
+      firstHash = ~firstHash;
+    }
+
+    // first hash is used to locate start of the block (blockBaseOffset)
+    // subsequent K hashes are used to generate K bits within a block of words
+    final int blockIdx = firstHash % totalBlockCount;
+    final int blockBaseOffset = blockIdx << DEFAULT_BLOCK_SIZE_BITS;
+    for (int i = 1; i <= k; i++) {
+      int combinedHash = hash1 + ((i + 1) * hash2);
+      // hashcode should be positive, flip all the bits if it's negative
+      if (combinedHash < 0) {
+        combinedHash = ~combinedHash;
+      }
+      // LSB 3 bits is used to locate offset within the block
+      final int absOffset = blockBaseOffset + (combinedHash & DEFAULT_BLOCK_OFFSET_MASK);
+      // Next 6 bits are used to locate offset within a long/word
+      final int bitPos = (combinedHash >>> DEFAULT_BLOCK_SIZE_BITS) & DEFAULT_BIT_OFFSET_MASK;
+      bitSet.data[absOffset] |= (1L << bitPos);
+    }
+  }
+
+  public void addString(String val)
+  {
+    addBytes(StringUtils.toUtf8(val));
+  }
+
+  public void addByte(byte val)
+  {
+    addBytes(new byte[]{val});
+  }
+
+  public void addInt(int val)
+  {
+    // puts int in little endian order
+    addBytes(intToByteArrayLE(val));
+  }
+
+  public void addLong(long val)
+  {
+    // puts long in little endian order
+    addHash(Murmur3.hash64(val));
+  }
+
+  public void addFloat(float val)
+  {
+    addInt(Float.floatToIntBits(val));
+  }
+
+  public void addDouble(double val)
+  {
+    addLong(Double.doubleToLongBits(val));
+  }
+
+  public boolean test(byte[] val)
+  {
+    return testBytes(val);
+  }
+
+  public boolean testBytes(byte[] val)
+  {
+    return testBytes(val, 0, val.length);
+  }
+
+  public boolean testBytes(byte[] val, int offset, int length)
+  {
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+                  Murmur3.hash64(val, offset, length);
+    return testHash(hash64);
+  }
+
+  private boolean testHash(long hash64)
+  {
+    final int hash1 = (int) hash64;
+    final int hash2 = (int) (hash64 >>> 32);
+
+    int firstHash = hash1 + hash2;
+    // hashcode should be positive, flip all the bits if it's negative
+    if (firstHash < 0) {
+      firstHash = ~firstHash;
+    }
+
+    // first hash is used to locate start of the block (blockBaseOffset)
+    // subsequent K hashes are used to generate K bits within a block of words
+    // To avoid branches during probe, a separate masks array is used for each longs/words within a block.
+    // data array and masks array are then traversed together and checked for corresponding set bits.
+    final int blockIdx = firstHash % totalBlockCount;
+    final int blockBaseOffset = blockIdx << DEFAULT_BLOCK_SIZE_BITS;
+
+
+    // iterate and update masks array
+    for (int i = 1; i <= k; i++) {
+      int combinedHash = hash1 + ((i + 1) * hash2);
+      // hashcode should be positive, flip all the bits if it's negative
+      if (combinedHash < 0) {
+        combinedHash = ~combinedHash;
+      }
+      // LSB 3 bits is used to locate offset within the block
+      final int wordOffset = combinedHash & DEFAULT_BLOCK_OFFSET_MASK;
+      final int absOffset = blockBaseOffset + wordOffset;
+
+      // Next 6 bits are used to locate offset within a long/word
+      final int bitPos = (combinedHash >>> DEFAULT_BLOCK_SIZE_BITS) & DEFAULT_BIT_OFFSET_MASK;
+      final long bloomWord = bitSet.data[absOffset];
+      if (0 == (bloomWord & (1L << bitPos))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public boolean testString(String val)
+  {
+    return testBytes(StringUtils.toUtf8(val));
+  }
+
+  public boolean testByte(byte val)
+  {
+    return testBytes(new byte[]{val});
+  }
+
+  public boolean testInt(int val)
+  {
+    return testBytes(intToByteArrayLE(val));
+  }
+
+  public boolean testLong(long val)
+  {
+    return testHash(Murmur3.hash64(val));
+  }
+
+  public boolean testFloat(float val)
+  {
+    return testInt(Float.floatToIntBits(val));
+  }
+
+  public boolean testDouble(double val)
+  {
+    return testLong(Double.doubleToLongBits(val));
+  }
+
+  private byte[] intToByteArrayLE(int val)
+  {
+    byte[] bytes = BYTE_ARRAY_4.get();
+    bytes[0] = (byte) (val >> 0);
+    bytes[1] = (byte) (val >> 8);
+    bytes[2] = (byte) (val >> 16);
+    bytes[3] = (byte) (val >> 24);
+    return bytes;
+  }
+
+  public long sizeInBytes()
+  {
+    return getBitSize() / 8;
+  }
+
+  public int getBitSize()
+  {
+    return bitSet.getData().length * Long.SIZE;
+  }
+
+  public int getNumHashFunctions()
+  {
+    return k;
+  }
+
+  public int getNumBits()
+  {
+    return m;
+  }
+
+  public long[] getBitSet()
+  {
+    return bitSet.getData();
+  }
+
+  @Override
+  public String toString()
+  {
+    return "m: " + m + " k: " + k;
+  }
+
+  /**
+   * Merge the specified bloom filter with current bloom filter.
+   *
+   * @param that - bloom filter to merge
+   */
+  public void merge(BloomKFilter that)
+  {
+    if (this != that && this.m == that.m && this.k == that.k) {
+      this.bitSet.putAll(that.bitSet);
+    } else {
+      throw new IllegalArgumentException("BloomKFilters are not compatible for merging." +
+                                         " this - " + this.toString() + " that - " + that.toString());
+    }
+  }
+
+  public void reset()
+  {
+    this.bitSet.clear();
+  }
+
+  /**
+   * Bare metal bit set implementation. For performance reasons, this implementation does not check
+   * for index bounds nor expand the bit set size if the specified index is greater than the size.
+   */
+  public static class BitSet
+  {
+    private final long[] data;
+
+    public BitSet(long bits)
+    {
+      this(new long[(int) Math.ceil((double) bits / (double) Long.SIZE)]);
+    }
+
+    /**
+     * Deserialize long array as bit set.
+     *
+     * @param data - bit array
+     */
+    public BitSet(long[] data)
+    {
+      assert data.length > 0 : "data length is zero!";
+      this.data = data;
+    }
+
+    /**
+     * Sets the bit at specified index.
+     *
+     * @param index - position
+     */
+    public void set(int index)
+    {
+      data[index >>> 6] |= (1L << index);
+    }
+
+    /**
+     * Returns true if the bit is set in the specified index.
+     *
+     * @param index - position
+     *
+     * @return - value at the bit position
+     */
+    public boolean get(int index)
+    {
+      return (data[index >>> 6] & (1L << index)) != 0;
+    }
+
+    /**
+     * Number of bits
+     */
+    public int bitSize()
+    {
+      return data.length * Long.SIZE;
+    }
+
+    public long[] getData()
+    {
+      return data;
+    }
+
+    /**
+     * Combines the two BitArrays using bitwise OR.
+     */
+    public void putAll(BloomKFilter.BitSet array)
+    {
+      assert data.length == array.data.length :
+          "BitArrays must be of equal length (" + data.length + "!= " + array.data.length + ")";
+      for (int i = 0; i < data.length; i++) {
+        data[i] |= array.data[i];
+      }
+    }
+
+    /**
+     * Clear the bit set.
+     */
+    public void clear()
+    {
+      Arrays.fill(data, 0);
+    }
+  }
+}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilterHolder.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilterHolder.java
index e06632f..c32038e 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilterHolder.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilterHolder.java
@@ -22,7 +22,6 @@ package org.apache.druid.query.filter;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
 import org.apache.druid.guice.BloomFilterSerializersModule;
-import org.apache.hive.common.util.BloomKFilter;
 
 import java.io.IOException;
 import java.util.Objects;
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java
index 81a272f..fb8d31a 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java
@@ -41,7 +41,6 @@ import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.filter.BaseFilterTest;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.hive.common.util.BloomKFilter;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -49,6 +48,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
@@ -360,6 +360,78 @@ public class BloomDimFilterTest extends BaseFilterTest
     Assert.assertTrue(actualSize < 100);
   }
 
+  @Test
+  public void testStringHiveCompat() throws IOException
+  {
+    org.apache.hive.common.util.BloomKFilter hiveFilter =
+        new org.apache.hive.common.util.BloomKFilter(1500);
+    hiveFilter.addString("myTestString");
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    org.apache.hive.common.util.BloomKFilter.serialize(byteArrayOutputStream, hiveFilter);
+    byte[] bytes = byteArrayOutputStream.toByteArray();
+
+    BloomKFilter druidFilter = BloomFilterSerializersModule.bloomKFilterFromBytes(bytes);
+
+    Assert.assertTrue(druidFilter.testString("myTestString"));
+    Assert.assertFalse(druidFilter.testString("not_match"));
+  }
+
+
+  @Test
+  public void testFloatHiveCompat() throws IOException
+  {
+    org.apache.hive.common.util.BloomKFilter hiveFilter =
+        new org.apache.hive.common.util.BloomKFilter(1500);
+    hiveFilter.addFloat(32.0F);
+    hiveFilter.addFloat(66.4F);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    org.apache.hive.common.util.BloomKFilter.serialize(byteArrayOutputStream, hiveFilter);
+    byte[] bytes = byteArrayOutputStream.toByteArray();
+
+    BloomKFilter druidFilter = BloomFilterSerializersModule.bloomKFilterFromBytes(bytes);
+
+    Assert.assertTrue(druidFilter.testFloat(32.0F));
+    Assert.assertTrue(druidFilter.testFloat(66.4F));
+    Assert.assertFalse(druidFilter.testFloat(0.3F));
+  }
+
+
+  @Test
+  public void testDoubleHiveCompat() throws IOException
+  {
+    org.apache.hive.common.util.BloomKFilter hiveFilter =
+        new org.apache.hive.common.util.BloomKFilter(1500);
+    hiveFilter.addDouble(32.0D);
+    hiveFilter.addDouble(66.4D);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    org.apache.hive.common.util.BloomKFilter.serialize(byteArrayOutputStream, hiveFilter);
+    byte[] bytes = byteArrayOutputStream.toByteArray();
+
+    BloomKFilter druidFilter = BloomFilterSerializersModule.bloomKFilterFromBytes(bytes);
+
+    Assert.assertTrue(druidFilter.testDouble(32.0D));
+    Assert.assertTrue(druidFilter.testDouble(66.4D));
+    Assert.assertFalse(druidFilter.testDouble(0.3D));
+  }
+
+  @Test
+  public void testLongHiveCompat() throws IOException
+  {
+    org.apache.hive.common.util.BloomKFilter hiveFilter =
+        new org.apache.hive.common.util.BloomKFilter(1500);
+    hiveFilter.addLong(32L);
+    hiveFilter.addLong(664L);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    org.apache.hive.common.util.BloomKFilter.serialize(byteArrayOutputStream, hiveFilter);
+    byte[] bytes = byteArrayOutputStream.toByteArray();
+
+    BloomKFilter druidFilter = BloomFilterSerializersModule.bloomKFilterFromBytes(bytes);
+
+    Assert.assertTrue(druidFilter.testLong(32L));
+    Assert.assertTrue(druidFilter.testLong(664L));
+    Assert.assertFalse(druidFilter.testLong(3L));
+  }
+
   private static BloomKFilterHolder bloomKFilter(int expectedEntries, String... values) throws IOException
   {
     BloomKFilter filter = new BloomKFilter(expectedEntries);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org