You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/10/08 04:36:26 UTC

[incubator-pinot] 01/01: Implement off-heap bloom filter reader

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch offheap_bloom_filter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 979828bfbaaeb1a9d4707b53541782fee0bb4eec
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Wed Oct 7 21:24:55 2020 -0700

    Implement off-heap bloom filter reader
---
 .../org/apache/pinot/core/bloom/BloomFilter.java   |  74 -------------
 .../apache/pinot/core/bloom/BloomFilterUtil.java   |  57 -----------
 .../pinot/core/bloom/GuavaOnHeapBloomFilter.java   |  76 --------------
 .../core/bloom/SegmentBloomFilterFactory.java      |  51 ---------
 .../indexsegment/mutable/MutableSegmentImpl.java   |   7 ++
 .../query/pruner/ColumnValueSegmentPruner.java     |   2 +-
 .../creator/BloomFilterCreator.java}               |  42 +++-----
 .../creator/impl/bloom/BloomFilterCreator.java     |  68 ------------
 .../impl/bloom/OnHeapGuavaBloomFilterCreator.java  |  69 +++++++++++++
 .../index/column/PhysicalColumnIndexContainer.java |  16 +--
 .../segment/index/loader/IndexLoadingConfig.java   |  19 ++--
 .../loader/bloomfilter/BloomFilterHandler.java     |  58 +++++------
 .../segment/index/readers/BloomFilterReader.java   |  43 +++-----
 .../readers/bloom/BloomFilterReaderFactory.java}   |  39 ++-----
 .../bloom/GuavaBloomFilterReaderUtils.java}        |  41 +++-----
 .../bloom/OffHeapGuavaBloomFilterReader.java       |  81 +++++++++++++++
 .../apache/pinot/core/util/TableConfigUtils.java   |  18 ++--
 .../index/creator/BloomFilterCreatorTest.java      | 114 +++++----------------
 .../pinot/spi/config/table/BloomFilterConfig.java  |  40 +++-----
 .../pinot/spi/config/table/IndexingConfig.java     |  10 ++
 20 files changed, 320 insertions(+), 605 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilter.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilter.java
deleted file mode 100644
index b4aab57..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilter.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.bloom;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-
-/**
- * Interface for bloom filter
- */
-public interface BloomFilter {
-
-  /**
-   * Get the version of bloom filter implementation
-   * @return a version
-   */
-  int getVersion();
-
-  /**
-   * Get the type of the bloom filter
-   * @return a bloom filter type
-   */
-  BloomFilterType getBloomFilterType();
-
-  /**
-   * Add element to bloom filter
-   *
-   * @param input input object
-   */
-  void add(Object input);
-
-  /**
-   * Check if the input element may exist or not
-   *
-   * @param input input object for testing
-   * @return true if the input may exist, false if it does not exist
-   */
-  boolean mightContain(Object input);
-
-  /**
-   * Serialize bloom filter to output stream.
-   *
-   * @param out output stream
-   * @throws IOException
-   */
-  void writeTo(OutputStream out)
-      throws IOException;
-
-  /**
-   * Deserialize the bloom filter from input stream.
-   * @param in input stream
-   * @throws IOException
-   */
-  void readFrom(InputStream in)
-      throws IOException;
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterUtil.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterUtil.java
deleted file mode 100644
index fc83b01..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterUtil.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.bloom;
-
-/**
- * Util class for bloom filter
- */
-public class BloomFilterUtil {
-
-  private BloomFilterUtil() {
-  }
-
-  public static long computeNumBits(long cardinality, double maxFalsePosProbability) {
-    return (long) (Math
-        .ceil((cardinality * Math.log(maxFalsePosProbability)) / Math.log(1.0 / Math.pow(2.0, Math.log(2.0)))));
-  }
-
-  public static int computeNumberOfHashFunctions(long cardinality, long numBits) {
-    return (int) Math.max(1.0, Math.round(((double) numBits / cardinality) * Math.log(2.0)));
-  }
-
-  public static double computeMaxFalsePosProbability(long cardinality, int numHashFunction, long numBits) {
-    return Math.pow(1.0 - Math.exp(-1.0 * numHashFunction / ((double) numBits / cardinality)), numHashFunction);
-  }
-
-  public static double computeMaxFalsePositiveProbabilityForNumBits(long cardinality, long maxNumBits,
-      double defaultMaxFalsePosProbability) {
-    // Get the number of bits required for achieving default false positive probability
-    long numBitsRequired = BloomFilterUtil.computeNumBits(cardinality, defaultMaxFalsePosProbability);
-
-    // If the size of bloom filter is smaller than 1MB, use default max false positive probability
-    if (numBitsRequired <= maxNumBits) {
-      return defaultMaxFalsePosProbability;
-    }
-
-    // If the size of bloom filter is larger than 1MB, compute the maximum false positive probability within
-    // storage limit
-    int numHashFunction = BloomFilterUtil.computeNumberOfHashFunctions(cardinality, maxNumBits);
-    return BloomFilterUtil.computeMaxFalsePosProbability(cardinality, numHashFunction, maxNumBits);
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/GuavaOnHeapBloomFilter.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/GuavaOnHeapBloomFilter.java
deleted file mode 100644
index 1339516..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/GuavaOnHeapBloomFilter.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.bloom;
-
-import com.google.common.hash.Funnels;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-
-
-/**
- * Bloom filter implementation with guava library
- */
-public class GuavaOnHeapBloomFilter implements BloomFilter {
-  // Increment the version when the bloom filter implementation becomes backward incompatible
-  private static final int VERSION = 1;
-
-  private com.google.common.hash.BloomFilter _bloomFilter;
-
-  public GuavaOnHeapBloomFilter() {
-  }
-
-  public GuavaOnHeapBloomFilter(int cardinality, double maxFalsePosProbability) {
-    _bloomFilter = com.google.common.hash.BloomFilter
-        .create(Funnels.stringFunnel(Charset.forName("UTF-8")), cardinality, maxFalsePosProbability);
-  }
-
-  @Override
-  public int getVersion() {
-    return VERSION;
-  }
-
-  @Override
-  public BloomFilterType getBloomFilterType() {
-    return BloomFilterType.GUAVA_ON_HEAP;
-  }
-
-  @Override
-  public void add(Object input) {
-    _bloomFilter.put(input.toString());
-  }
-
-  @Override
-  public boolean mightContain(Object input) {
-    return _bloomFilter.mightContain(input.toString());
-  }
-
-  @Override
-  public void writeTo(OutputStream out)
-      throws IOException {
-    _bloomFilter.writeTo(out);
-  }
-
-  @Override
-  public void readFrom(InputStream in)
-      throws IOException {
-    _bloomFilter = com.google.common.hash.BloomFilter.readFrom(in, Funnels.stringFunnel(Charset.forName("UTF-8")));
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/SegmentBloomFilterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/SegmentBloomFilterFactory.java
deleted file mode 100644
index 4d6dbd9..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/SegmentBloomFilterFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.bloom;
-
-/**
- * Factory for bloom filter
- */
-public class SegmentBloomFilterFactory {
-
-  /**
-   * Factory used when creating a new bloom filter
-   *
-   * @param cardinality cardinality of column
-   * @param maxFalsePosProbability maximum false positive probability
-   * @return a bloom filter
-   */
-  public static BloomFilter createSegmentBloomFilter(int cardinality, double maxFalsePosProbability) {
-    // TODO: when we add more types of bloom filter, we will need to add a new config and wire in here
-    return new GuavaOnHeapBloomFilter(cardinality, maxFalsePosProbability);
-  }
-
-  /**
-   * Factory used when deserializing a bloom filter
-   *
-   * @param type a bloom filter type
-   * @return a bloom filter based on the given type
-   */
-  public static BloomFilter createSegmentBloomFilter(BloomFilterType type) {
-    switch (type) {
-      case GUAVA_ON_HEAP:
-        return new GuavaOnHeapBloomFilter();
-    }
-    throw new RuntimeException("Invalid bloom filter type: " + type.toString());
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 57534cf..38e061f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -1067,6 +1067,13 @@ public class MutableSegmentImpl implements MutableSegment {
           _logger.error("Caught exception while closing text index for column: {}, continuing with error", column, e);
         }
       }
+      if (_bloomFilter != null) {
+        try {
+          _bloomFilter.close();
+        } catch (Exception e) {
+          _logger.error("Caught exception while closing bloom filter for column: {}, continuing with error", column, e);
+        }
+      }
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index 2704dc6..98eb36e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -155,7 +155,7 @@ public class ColumnValueSegmentPruner implements SegmentPruner {
     // Check bloom filter
     BloomFilterReader bloomFilter = dataSource.getBloomFilter();
     if (bloomFilter != null) {
-      if (!bloomFilter.mightContain(value)) {
+      if (!bloomFilter.mightContain(value.toString())) {
         return true;
       }
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/BloomFilterCreator.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/creator/BloomFilterCreator.java
index ac6efd8..7647b96 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/BloomFilterCreator.java
@@ -16,38 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.bloom;
+package org.apache.pinot.core.segment.creator;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.io.Closeable;
+import java.io.IOException;
 
 
-/**
- * Enum for bloom filter type
- */
-public enum BloomFilterType {
-  // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
-  // when serializing/deserializing a bloom filter
-  GUAVA_ON_HEAP(1);
-
-  private int _value;
-  private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
-
-  BloomFilterType(int value) {
-    _value = value;
-  }
-
-  static {
-    for (BloomFilterType pageType : BloomFilterType.values()) {
-      _bloomFilterTypeMap.put(pageType._value, pageType);
-    }
-  }
+public interface BloomFilterCreator extends Closeable {
 
-  public static BloomFilterType valueOf(int pageType) {
-    return _bloomFilterTypeMap.get(pageType);
-  }
+  /**
+   * Adds a value to the bloom filter.
+   */
+  void add(String value);
 
-  public int getValue() {
-    return _value;
-  }
+  /**
+   * Seals the index and flushes it to disk.
+   */
+  void seal()
+      throws IOException;
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
deleted file mode 100644
index 7c1a6c4..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.creator.impl.bloom;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import org.apache.pinot.core.bloom.BloomFilter;
-import org.apache.pinot.core.bloom.BloomFilterUtil;
-import org.apache.pinot.core.bloom.SegmentBloomFilterFactory;
-import org.apache.pinot.core.segment.creator.impl.V1Constants;
-
-
-/**
- * Bloom filter creator
- *
- * Note:
- * 1. Currently, we limit the filter size to 1MB to avoid the heap overhead. We can remove it once we have the offheap
- *    implementation of the bloom filter.
- * 2. When capping the bloom filter to 1MB, max false pos steeply grows from 1 million cardinality. If the column has
- *    larger than "5 million" cardinality, it is not recommended to use bloom filter since maxFalsePosProb is already
- *    0.45 when the filter size is 1MB.
- */
-public class BloomFilterCreator implements AutoCloseable {
-  private static double DEFAULT_MAX_FALSE_POS_PROBABILITY = 0.05;
-  private static int MB_IN_BITS = 8388608;
-
-  private BloomFilter _bloomFilter;
-  private File _bloomFilterFile;
-
-  public BloomFilterCreator(File indexDir, String columnName, int cardinality) {
-    _bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
-    double maxFalsePosProbability = BloomFilterUtil
-        .computeMaxFalsePositiveProbabilityForNumBits(cardinality, MB_IN_BITS, DEFAULT_MAX_FALSE_POS_PROBABILITY);
-    _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(cardinality, maxFalsePosProbability);
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    try (DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(_bloomFilterFile))) {
-      outputStream.writeInt(_bloomFilter.getBloomFilterType().getValue());
-      outputStream.writeInt(_bloomFilter.getVersion());
-      _bloomFilter.writeTo(outputStream);
-    }
-  }
-
-  public void add(Object input) {
-    _bloomFilter.add(input.toString());
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/OnHeapGuavaBloomFilterCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/OnHeapGuavaBloomFilterCreator.java
new file mode 100644
index 0000000..fec9a76
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/OnHeapGuavaBloomFilterCreator.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.bloom;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.core.segment.creator.BloomFilterCreator;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
+
+
+/**
+ * On-heap creator for guava bloom filter.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OnHeapGuavaBloomFilterCreator implements BloomFilterCreator {
+  public static final int TYPE_VALUE = 1;
+  public static final int VERSION = 1;
+
+  private final File _bloomFilterFile;
+  private final BloomFilter<String> _bloomFilter;
+
+  public OnHeapGuavaBloomFilterCreator(File indexDir, String columnName, int cardinality,
+      BloomFilterConfig bloomFilterConfig) {
+    _bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+    _bloomFilter =
+        BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8), cardinality, bloomFilterConfig.getFpp());
+  }
+
+  @Override
+  public void add(String value) {
+    _bloomFilter.put(value);
+  }
+
+  @Override
+  public void seal()
+      throws IOException {
+    try (DataOutputStream out = new DataOutputStream(new FileOutputStream(_bloomFilterFile))) {
+      out.writeInt(TYPE_VALUE);
+      out.writeInt(VERSION);
+      _bloomFilter.writeTo(out);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index 8405cf3..0573afe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -44,6 +44,7 @@ import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
 import org.apache.pinot.core.segment.index.readers.SortedIndexReader;
 import org.apache.pinot.core.segment.index.readers.StringDictionary;
 import org.apache.pinot.core.segment.index.readers.TextIndexReader;
+import org.apache.pinot.core.segment.index.readers.bloom.BloomFilterReaderFactory;
 import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader;
 import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReader;
 import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
@@ -66,7 +67,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
   private final InvertedIndexReader<?> _rangeIndex;
   private final TextIndexReader _textIndex;
   private final BaseImmutableDictionary _dictionary;
-  private final BloomFilterReader _bloomFilterReader;
+  private final BloomFilterReader _bloomFilter;
   private final NullValueVectorReaderImpl _nullValueVectorReader;
 
   public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata,
@@ -82,7 +83,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
       loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName);
       loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
-      loadBloomFilter = indexLoadingConfig.getBloomFilterColumns().contains(columnName);
+      loadBloomFilter = indexLoadingConfig.getBloomFilterConfigs().containsKey(columnName);
       loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName);
     }
 
@@ -108,9 +109,9 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       //bloom filter
       if (loadBloomFilter) {
         PinotDataBuffer bloomFilterBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER);
-        _bloomFilterReader = new BloomFilterReader(bloomFilterBuffer);
+        _bloomFilter = BloomFilterReaderFactory.getBloomFilterReader(bloomFilterBuffer);
       } else {
-        _bloomFilterReader = null;
+        _bloomFilter = null;
       }
       // Dictionary-based index
       _dictionary = loadDictionary(segmentReader.getIndexFor(columnName, ColumnIndexType.DICTIONARY), metadata,
@@ -150,7 +151,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       // Raw index
       _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
       _dictionary = null;
-      _bloomFilterReader = null;
+      _bloomFilter = null;
       _rangeIndex = null;
       _invertedIndex = null;
     }
@@ -183,7 +184,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
 
   @Override
   public BloomFilterReader getBloomFilter() {
-    return _bloomFilterReader;
+    return _bloomFilter;
   }
 
   @Override
@@ -265,5 +266,8 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
     if (_textIndex != null) {
       _textIndex.close();
     }
+    if (_bloomFilter != null) {
+      _bloomFilter.close();
+    }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index e296da2..798ce61 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -52,7 +53,7 @@ public class IndexLoadingConfig {
   private Map<String, String> _noDictionaryConfig = new HashMap<>();
   private Set<String> _varLengthDictionaryColumns = new HashSet<>();
   private Set<String> _onHeapDictionaryColumns = new HashSet<>();
-  private Set<String> _bloomFilterColumns = new HashSet<>();
+  private Map<String, BloomFilterConfig> _bloomFilterConfigs = new HashMap<>();
   private boolean _enableDynamicStarTreeCreation;
   private List<StarTreeIndexConfig> _starTreeIndexConfigs;
   private boolean _enableDefaultStarTree;
@@ -98,7 +99,13 @@ public class IndexLoadingConfig {
 
     List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
     if (bloomFilterColumns != null) {
-      _bloomFilterColumns.addAll(bloomFilterColumns);
+      for (String bloomFilterColumn : bloomFilterColumns) {
+        _bloomFilterConfigs.put(bloomFilterColumn, new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP));
+      }
+    }
+    Map<String, BloomFilterConfig> bloomFilterConfigs = indexingConfig.getBloomFilterConfigs();
+    if (bloomFilterConfigs != null) {
+      _bloomFilterConfigs.putAll(bloomFilterConfigs);
     }
 
     List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
@@ -265,8 +272,8 @@ public class IndexLoadingConfig {
   }
 
   @VisibleForTesting
-  public void setBloomFilterColumns(Set<String> bloomFilterColumns) {
-    _bloomFilterColumns = bloomFilterColumns;
+  public void setBloomFilterConfigs(Map<String, BloomFilterConfig> bloomFilterConfigs) {
+    _bloomFilterConfigs = bloomFilterConfigs;
   }
 
   @VisibleForTesting
@@ -290,8 +297,8 @@ public class IndexLoadingConfig {
     return _onHeapDictionaryColumns;
   }
 
-  public Set<String> getBloomFilterColumns() {
-    return _bloomFilterColumns;
+  public Map<String, BloomFilterConfig> getBloomFilterConfigs() {
+    return _bloomFilterConfigs;
   }
 
   public boolean isEnableDynamicStarTreeCreation() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
index 6c6ec81..1e15e79 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -21,17 +21,20 @@ package org.apache.pinot.core.segment.index.loader.bloomfilter;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.segment.creator.BloomFilterCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
-import org.apache.pinot.core.segment.creator.impl.bloom.BloomFilterCreator;
+import org.apache.pinot.core.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.core.segment.index.loader.LoaderUtils;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.index.readers.BytesDictionary;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.core.segment.index.readers.DoubleDictionary;
 import org.apache.pinot.core.segment.index.readers.FloatDictionary;
 import org.apache.pinot.core.segment.index.readers.IntDictionary;
@@ -40,6 +43,7 @@ import org.apache.pinot.core.segment.index.readers.StringDictionary;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.core.segment.store.ColumnIndexType;
 import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,16 +56,18 @@ public class BloomFilterHandler {
   private final SegmentDirectory.Writer _segmentWriter;
   private final String _segmentName;
   private final SegmentVersion _segmentVersion;
+  private final Map<String, BloomFilterConfig> _bloomFilterConfigs;
   private final Set<ColumnMetadata> _bloomFilterColumns = new HashSet<>();
 
-  public BloomFilterHandler(@Nonnull File indexDir, @Nonnull SegmentMetadataImpl segmentMetadata,
-      @Nonnull IndexLoadingConfig indexLoadingConfig, @Nonnull SegmentDirectory.Writer segmentWriter) {
+  public BloomFilterHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+      SegmentDirectory.Writer segmentWriter) {
     _indexDir = indexDir;
     _segmentWriter = segmentWriter;
     _segmentName = segmentMetadata.getName();
     _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+    _bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs();
 
-    for (String column : indexLoadingConfig.getBloomFilterColumns()) {
+    for (String column : _bloomFilterConfigs.keySet()) {
       ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
       if (columnMetadata != null) {
         _bloomFilterColumns.add(columnMetadata);
@@ -75,6 +81,7 @@ public class BloomFilterHandler {
       if (columnMetadata.hasDictionary()) {
         createBloomFilterForColumn(columnMetadata);
       }
+      // TODO: Support raw index
     }
   }
 
@@ -102,19 +109,17 @@ public class BloomFilterHandler {
     }
 
     // Create new bloom filter for the column.
-    LOGGER.info("Creating new bloom filter for segment: {}, column: {}", _segmentName, columnName);
-    try (BloomFilterCreator creator = new BloomFilterCreator(_indexDir, columnName, columnMetadata.getCardinality())) {
-      if (columnMetadata.hasDictionary()) {
-        // Read dictionary
-        try (BaseImmutableDictionary dictionaryReader = getDictionaryReader(columnMetadata, _segmentWriter)) {
-          for (int i = 0; i < dictionaryReader.length(); i++) {
-            creator.add(dictionaryReader.get(i));
-          }
-        }
-      } else {
-        // Read the forward index
-        throw new UnsupportedOperationException("Bloom filters not supported for no dictionary columns");
+    BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
+    LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", _segmentName, columnName,
+        bloomFilterConfig);
+    try (BloomFilterCreator bloomFilterCreator = new OnHeapGuavaBloomFilterCreator(_indexDir, columnName,
+        columnMetadata.getCardinality(), bloomFilterConfig);
+        Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter)) {
+      int length = dictionary.length();
+      for (int i = 0; i < length; i++) {
+        bloomFilterCreator.add(dictionary.getStringValue(i));
       }
+      bloomFilterCreator.seal();
     }
 
     // For v3, write the generated bloom filter file into the single file and remove it.
@@ -133,29 +138,24 @@ public class BloomFilterHandler {
     PinotDataBuffer dictionaryBuffer =
         segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
     int cardinality = columnMetadata.getCardinality();
-    BaseImmutableDictionary dictionaryReader;
     DataType dataType = columnMetadata.getDataType();
     switch (dataType) {
       case INT:
-        dictionaryReader = new IntDictionary(dictionaryBuffer, cardinality);
-        break;
+        return new IntDictionary(dictionaryBuffer, cardinality);
       case LONG:
-        dictionaryReader = new LongDictionary(dictionaryBuffer, cardinality);
-        break;
+        return new LongDictionary(dictionaryBuffer, cardinality);
       case FLOAT:
-        dictionaryReader = new FloatDictionary(dictionaryBuffer, cardinality);
-        break;
+        return new FloatDictionary(dictionaryBuffer, cardinality);
       case DOUBLE:
-        dictionaryReader = new DoubleDictionary(dictionaryBuffer, cardinality);
-        break;
+        return new DoubleDictionary(dictionaryBuffer, cardinality);
       case STRING:
-        dictionaryReader = new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength(),
+        return new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength(),
             (byte) columnMetadata.getPaddingCharacter());
-        break;
+      case BYTES:
+        return new BytesDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength());
       default:
         throw new IllegalStateException(
             "Unsupported data type: " + dataType + " for column: " + columnMetadata.getColumnName());
     }
-    return dictionaryReader;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java
index 63bebde..09a1dcf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java
@@ -18,40 +18,23 @@
  */
 package org.apache.pinot.core.segment.index.readers;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import org.apache.pinot.core.bloom.BloomFilter;
-import org.apache.pinot.core.bloom.BloomFilterType;
-import org.apache.pinot.core.bloom.SegmentBloomFilterFactory;
-import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import java.io.Closeable;
 
 
 /**
- * Bloom filter reader
+ * Interface for bloom filter reader.
  */
-public class BloomFilterReader {
+public interface BloomFilterReader extends Closeable {
 
-  private BloomFilter _bloomFilter;
+  /**
+   * Returns {@code true} if the given value might have been put in this bloom filer, {@code false} otherwise.
+   */
+  boolean mightContain(String value);
 
-  public BloomFilterReader(PinotDataBuffer bloomFilterBuffer)
-      throws IOException {
-    byte[] buffer = new byte[(int) bloomFilterBuffer.size()];
-    bloomFilterBuffer.copyTo(0, buffer);
-
-    try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(buffer))) {
-      BloomFilterType bloomFilterType = BloomFilterType.valueOf(in.readInt());
-      int version = in.readInt();
-      _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(bloomFilterType);
-      if (version != _bloomFilter.getVersion()) {
-        throw new IOException(
-            "Unexpected bloom filter version (type: " + bloomFilterType.toString() + ", version: " + version);
-      }
-      _bloomFilter.readFrom(in);
-    }
-  }
-
-  public boolean mightContain(Object key) {
-    return _bloomFilter.mightContain(key.toString());
-  }
+  /**
+   * Returns {@code true} if the value with the given hash might have been put in this bloom filer, {@code false}
+   * otherwise.
+   * <p>This method is provided to prevent hashing the same value multiple times.
+   */
+  boolean mightContain(byte[] hash);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/BloomFilterReaderFactory.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/BloomFilterReaderFactory.java
index ac6efd8..e507b3d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/BloomFilterReaderFactory.java
@@ -16,38 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.bloom;
+package org.apache.pinot.core.segment.index.readers.bloom;
 
-import java.util.HashMap;
-import java.util.Map;
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 
 
-/**
- * Enum for bloom filter type
- */
-public enum BloomFilterType {
-  // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
-  // when serializing/deserializing a bloom filter
-  GUAVA_ON_HEAP(1);
-
-  private int _value;
-  private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
-
-  BloomFilterType(int value) {
-    _value = value;
-  }
-
-  static {
-    for (BloomFilterType pageType : BloomFilterType.values()) {
-      _bloomFilterTypeMap.put(pageType._value, pageType);
-    }
-  }
-
-  public static BloomFilterType valueOf(int pageType) {
-    return _bloomFilterTypeMap.get(pageType);
+public class BloomFilterReaderFactory {
+  private BloomFilterReaderFactory() {
   }
 
-  public int getValue() {
-    return _value;
+  public static BloomFilterReader getBloomFilterReader(PinotDataBuffer dataBuffer) {
+    int typeValue = dataBuffer.getInt(0);
+    int version = dataBuffer.getInt(4);
+    Preconditions.checkState(typeValue == 1 && version == 1);
+    return new OffHeapGuavaBloomFilterReader(dataBuffer.view(8, dataBuffer.size()));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
index ac6efd8..648fdff 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
@@ -16,38 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.bloom;
+package org.apache.pinot.core.segment.index.readers.bloom;
 
-import java.util.HashMap;
-import java.util.Map;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.pinot.spi.utils.StringUtils;
 
 
-/**
- * Enum for bloom filter type
- */
-public enum BloomFilterType {
-  // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
-  // when serializing/deserializing a bloom filter
-  GUAVA_ON_HEAP(1);
-
-  private int _value;
-  private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
-
-  BloomFilterType(int value) {
-    _value = value;
+@SuppressWarnings("UnstableApiUsage")
+public class GuavaBloomFilterReaderUtils {
+  private GuavaBloomFilterReaderUtils() {
   }
 
-  static {
-    for (BloomFilterType pageType : BloomFilterType.values()) {
-      _bloomFilterTypeMap.put(pageType._value, pageType);
-    }
-  }
-
-  public static BloomFilterType valueOf(int pageType) {
-    return _bloomFilterTypeMap.get(pageType);
-  }
+  // DO NOT change the hash function. It has to be aligned with the bloom filter creator.
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
 
-  public int getValue() {
-    return _value;
+  /**
+   * Returns the hash of the given value as a byte array.
+   */
+  public static byte[] hash(String value) {
+    return HASH_FUNCTION.hashBytes(StringUtils.encodeUtf8(value)).asBytes();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/OffHeapGuavaBloomFilterReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/OffHeapGuavaBloomFilterReader.java
new file mode 100644
index 0000000..19a632f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/OffHeapGuavaBloomFilterReader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers.bloom;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+
+/**
+ * Off-heap reader for guava bloom filter.
+ * <p>The behavior should be aligned with {@link com.google.common.hash.BloomFilter}.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OffHeapGuavaBloomFilterReader implements BloomFilterReader {
+  private final int _numHashFunctions;
+  private final long _numBits;
+  private final PinotDataBuffer _valueBuffer;
+
+  /**
+   * Format of the data buffer header:
+   *   - Strategy ordinal: 1 byte
+   *   - Number of hash functions: 1 byte
+   *   - Number of long values: 4 bytes
+   */
+  public OffHeapGuavaBloomFilterReader(PinotDataBuffer dataBuffer) {
+    byte strategyOrdinal = dataBuffer.getByte(0);
+    Preconditions.checkState(strategyOrdinal == 1, "Invalid strategy ordinal: %s", strategyOrdinal);
+    _numHashFunctions = dataBuffer.getByte(1) & 0xFF;
+    _numBits = (long) dataBuffer.getInt(2) * Long.SIZE;
+    _valueBuffer = dataBuffer.view(6, dataBuffer.size());
+  }
+
+  @Override
+  public boolean mightContain(String value) {
+    return mightContain(GuavaBloomFilterReaderUtils.hash(value));
+  }
+
+  @Override
+  public boolean mightContain(byte[] hash) {
+    long hash1 = Longs.fromBytes(hash[7], hash[6], hash[5], hash[4], hash[3], hash[2], hash[1], hash[0]);
+    long hash2 = Longs.fromBytes(hash[15], hash[14], hash[13], hash[12], hash[11], hash[10], hash[9], hash[8]);
+    long combinedHash = hash1;
+    for (int i = 0; i < _numHashFunctions; i++) {
+      long bitIndex = (combinedHash & Long.MAX_VALUE) % _numBits;
+      // NOTE: Guava bloom filter stores bits in a long array. Inside each long value, the bits are stored in the
+      //       reverse order (the first bit is stored as the right most bit of the long).
+      int longIndex = (int) (bitIndex >>> 6);
+      int bitIndexInLong = (int) (bitIndex & 0x3F);
+      int byteIndex = (longIndex << 3) | (7 - (bitIndexInLong >>> 3));
+      if ((_valueBuffer.getByte(byteIndex) & (1 << (bitIndexInLong & 7))) == 0) {
+        return false;
+      }
+      combinedHash += hash2;
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
+    // caller is responsible of closing the PinotDataBuffer.
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
index d793f34..18267f8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
@@ -40,7 +40,6 @@ import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
@@ -294,14 +293,19 @@ public final class TableConfigUtils {
         noDictionaryColumnsSet.add(columnName);
       }
     }
+    Set<String> bloomFilterColumns = new HashSet<>();
     if (indexingConfig.getBloomFilterColumns() != null) {
-      for (String columnName : indexingConfig.getBloomFilterColumns()) {
-        if (noDictionaryColumnsSet.contains(columnName)) {
-          throw new IllegalStateException(
-              "Cannot create a Bloom Filter on column " + columnName + " specified in the noDictionaryColumns config");
-        }
-        columnNameToConfigMap.put(columnName, "Bloom Filter Config");
+      bloomFilterColumns.addAll(indexingConfig.getBloomFilterColumns());
+    }
+    if (indexingConfig.getBloomFilterConfigs() != null) {
+      bloomFilterColumns.addAll(indexingConfig.getBloomFilterConfigs().keySet());
+    }
+    for (String bloomFilterColumn : bloomFilterColumns) {
+      if (noDictionaryColumnsSet.contains(bloomFilterColumn)) {
+        throw new IllegalStateException("Cannot create a Bloom Filter on column " + bloomFilterColumn
+            + " specified in the noDictionaryColumns config");
       }
+      columnNameToConfigMap.put(bloomFilterColumn, "Bloom Filter Config");
     }
     if (indexingConfig.getInvertedIndexColumns() != null) {
       for (String columnName : indexingConfig.getInvertedIndexColumns()) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
index e89b36d..4ecc586 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
@@ -18,120 +18,54 @@
  */
 package org.apache.pinot.core.segment.index.creator;
 
-import com.google.common.base.Preconditions;
-import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.pinot.core.bloom.BloomFilterType;
-import org.apache.pinot.core.bloom.BloomFilterUtil;
-import org.apache.pinot.core.bloom.GuavaOnHeapBloomFilter;
+import org.apache.pinot.core.segment.creator.BloomFilterCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
-import org.apache.pinot.core.segment.creator.impl.bloom.BloomFilterCreator;
+import org.apache.pinot.core.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
+import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
+import org.apache.pinot.core.segment.index.readers.bloom.BloomFilterReaderFactory;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
 public class BloomFilterCreatorTest {
   private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "BloomFilterCreatorTest");
-  private static int MB_IN_BYTES = 1024 * 1024;
 
+  @BeforeClass
   public void setUp()
       throws Exception {
-    if (TEMP_DIR.exists()) {
-      FileUtils.deleteQuietly(TEMP_DIR);
-    }
-    TEMP_DIR.mkdir();
-  }
-
-  @Test
-  public void testBloomFilterUtil() {
-    // Test against the known results
-    Assert.assertEquals(BloomFilterUtil.computeNumBits(1000000, 0.03), 7298441);
-    Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.03), 72984409);
-    Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.1), 47925292);
-
-    Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(1000000, 7298441), 5);
-    Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 72984409), 5);
-    Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 47925292), 3);
-
-    double threshold = 0.001;
-    Assert
-        .assertTrue(compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(1000000, 5, 7298441), 0.03, threshold));
-    Assert.assertTrue(
-        compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 5, 72984409), 0.03, threshold));
-    Assert.assertTrue(
-        compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 3, 47925292), 0.1, threshold));
-  }
-
-  private boolean compareDouble(double a, double b, double threshold) {
-    if (Math.abs(a - b) < threshold) {
-      return true;
-    }
-    return false;
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
   }
 
   @Test
   public void testBloomFilterCreator()
       throws Exception {
-    // Create bloom filter directory
-    File bloomFilterDir = new File(TEMP_DIR, "bloomFilterDir");
-    bloomFilterDir.mkdirs();
-
-    // Create a bloom filter and serialize it to a file
+    // Create the bloom filter
     int cardinality = 10000;
     String columnName = "testColumn";
-    BloomFilterCreator bloomFilterCreator = new BloomFilterCreator(bloomFilterDir, columnName, cardinality);
-    for (int i = 0; i < 5; i++) {
-      bloomFilterCreator.add(Integer.toString(i));
-    }
-    bloomFilterCreator.close();
-
-    // Deserialize the bloom filter and validate
-    File bloomFilterFile = new File(bloomFilterDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
-
-    try (DataInputStream in = new DataInputStream(new FileInputStream(bloomFilterFile))) {
-      BloomFilterType type = BloomFilterType.valueOf(in.readInt());
-      int version = in.readInt();
-      GuavaOnHeapBloomFilter bloomFilter = new GuavaOnHeapBloomFilter();
-
-      Assert.assertEquals(type, bloomFilter.getBloomFilterType());
-      Assert.assertEquals(version, bloomFilter.getVersion());
-
-      bloomFilter.readFrom(in);
+    try (BloomFilterCreator bloomFilterCreator = new OnHeapGuavaBloomFilterCreator(TEMP_DIR, columnName, cardinality,
+        new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP))) {
       for (int i = 0; i < 5; i++) {
-        Assert.assertTrue(bloomFilter.mightContain(Integer.toString(i)));
-      }
-      for (int j = 5; j < 10; j++) {
-        Assert.assertFalse(bloomFilter.mightContain(Integer.toString(j)));
+        bloomFilterCreator.add(Integer.toString(i));
       }
+      bloomFilterCreator.seal();
     }
-  }
-
-  @Test
-  public void testBloomFilterSize()
-      throws Exception {
-    int cardinalityArray[] = new int[]{10, 100, 1000, 100000, 100000, 1000000, 5000000, 10000000};
-    for (int cardinality : cardinalityArray) {
-      FileUtils.deleteQuietly(TEMP_DIR);
-      File indexDir = new File(TEMP_DIR, "testBloomFilterSize");
-      Preconditions.checkState(indexDir.mkdirs());
-
-      String columnName = "testSize";
-      BloomFilterCreator bloomFilterCreator = new BloomFilterCreator(indexDir, columnName, cardinality);
-      bloomFilterCreator.close();
 
-      File bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
-
-      try (InputStream inputStream = new FileInputStream(bloomFilterFile)) {
-        byte[] bloomFilterBytes = IOUtils.toByteArray(inputStream);
-        long actualBloomFilterSize = bloomFilterBytes.length;
-        // Check if the size of bloom filter does not go beyond 1MB. Note that guava bloom filter has 11-12 bytes of
-        // overhead
-        Assert.assertTrue(actualBloomFilterSize < MB_IN_BYTES + 12);
+    // Read the bloom filter
+    File bloomFilterFile = new File(TEMP_DIR, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+    try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(bloomFilterFile);
+        BloomFilterReader bloomFilterReader = BloomFilterReaderFactory.getBloomFilterReader(dataBuffer)) {
+      for (int i = 0; i < 5; i++) {
+        Assert.assertTrue(bloomFilterReader.mightContain(Integer.toString(i)));
+      }
+      for (int i = 5; i < 10; i++) {
+        Assert.assertFalse(bloomFilterReader.mightContain(Integer.toString(i)));
       }
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/BloomFilterConfig.java
similarity index 50%
rename from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/config/table/BloomFilterConfig.java
index ac6efd8..d488ece 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/BloomFilterConfig.java
@@ -16,38 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.bloom;
+package org.apache.pinot.spi.config.table;
 
-import java.util.HashMap;
-import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.pinot.spi.config.BaseJsonConfig;
 
 
-/**
- * Enum for bloom filter type
- */
-public enum BloomFilterType {
-  // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
-  // when serializing/deserializing a bloom filter
-  GUAVA_ON_HEAP(1);
-
-  private int _value;
-  private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
+public class BloomFilterConfig extends BaseJsonConfig {
+  public static final double DEFAULT_FPP = 0.05;
 
-  BloomFilterType(int value) {
-    _value = value;
-  }
-
-  static {
-    for (BloomFilterType pageType : BloomFilterType.values()) {
-      _bloomFilterTypeMap.put(pageType._value, pageType);
-    }
-  }
+  private final double _fpp;
 
-  public static BloomFilterType valueOf(int pageType) {
-    return _bloomFilterTypeMap.get(pageType);
+  @JsonCreator
+  public BloomFilterConfig(@JsonProperty(value = "fpp", required = true) double fpp) {
+    Preconditions.checkArgument(fpp > 0.0 && fpp < 1.0, "Invalid fpp (false positive probability): %s", fpp);
+    _fpp = fpp;
   }
 
-  public int getValue() {
-    return _value;
+  public double getFpp() {
+    return _fpp;
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 3081e28..3dd137b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -31,6 +31,7 @@ public class IndexingConfig extends BaseJsonConfig {
   private boolean _createInvertedIndexDuringSegmentGeneration;
   private List<String> _sortedColumn;
   private List<String> _bloomFilterColumns;
+  private Map<String, BloomFilterConfig> _bloomFilterConfigs;
   private String _loadMode;
   private Map<String, String> _streamConfigs;
   private String _segmentFormatVersion;
@@ -105,6 +106,15 @@ public class IndexingConfig extends BaseJsonConfig {
   }
 
   @Nullable
+  public Map<String, BloomFilterConfig> getBloomFilterConfigs() {
+    return _bloomFilterConfigs;
+  }
+
+  public void setBloomFilterConfigs(Map<String, BloomFilterConfig> bloomFilterConfigs) {
+    _bloomFilterConfigs = bloomFilterConfigs;
+  }
+
+  @Nullable
   public String getLoadMode() {
     return _loadMode;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org