You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/12/01 02:52:07 UTC

[incubator-pinot] branch master updated: Adding support for bloom filter (#3528)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 75a4f85  Adding support for bloom filter (#3528)
75a4f85 is described below

commit 75a4f8541d26043d22a11fa8ef7e6c40a9a7589b
Author: Kishore Gopalakrishna <g....@gmail.com>
AuthorDate: Fri Nov 30 18:52:01 2018 -0800

    Adding support for bloom filter (#3528)
    
    * Adding support for bloom filter
    
    * Fixing failing test cases
    
    * Updated bloom filter implementation
    1. Changed the implementation to use Guava BloomFilter library
    2. Added unit test for bloom filter
    3. Enforcing the maxmimum size of bloom filter to 1MB
    
    * Adding bloom filter to integration tests
---
 .../pinot/common/config/IndexingConfig.java        |  12 ++
 .../linkedin/pinot/common/config/TableConfig.java  |   7 +
 .../pinot/common/segment/SegmentMetadata.java      |   2 +
 .../com/linkedin/pinot/core/bloom/BloomFilter.java |  69 +++++++++
 .../linkedin/pinot/core/bloom/BloomFilterType.java |  50 +++++++
 .../linkedin/pinot/core/bloom/BloomFilterUtil.java |  54 +++++++
 .../pinot/core/bloom/GuavaOnHeapBloomFilter.java   |  74 ++++++++++
 .../core/bloom/SegmentBloomFilterFactory.java      |  48 +++++++
 .../com/linkedin/pinot/core/common/DataSource.java |   3 +
 .../indexsegment/mutable/MutableSegmentImpl.java   |   5 +-
 .../core/query/pruner/AbstractSegmentPruner.java   |  17 ++-
 .../query/pruner/ColumnValueSegmentPruner.java     |  44 ++++--
 .../core/query/pruner/PartitionSegmentPruner.java  |  10 +-
 .../core/segment/creator/impl/V1Constants.java     |   1 +
 .../creator/impl/bloom/BloomFilterCreator.java     |  66 +++++++++
 .../core/segment/index/SegmentMetadataImpl.java    |   5 +
 .../segment/index/column/ColumnIndexContainer.java |   3 +
 .../index/column/PhysicalColumnIndexContainer.java |  21 +++
 .../index/data/source/ColumnDataSource.java        |  17 ++-
 .../segment/index/loader/IndexLoadingConfig.java   |  17 +++
 .../segment/index/loader/SegmentPreProcessor.java  |   7 +-
 .../loader/bloomfilter/BloomFilterHandler.java     | 160 +++++++++++++++++++++
 .../segment/index/readers/BloomFilterReader.java   |  54 +++++++
 .../core/segment/store/ColumnIndexDirectory.java   |  18 ++-
 .../pinot/core/segment/store/ColumnIndexType.java  |   3 +-
 .../core/segment/store/FilePerIndexDirectory.java  |  17 +++
 .../segment/store/SegmentLocalFSDirectory.java     |   5 +
 .../segment/store/SingleFileIndexDirectory.java    |  12 ++
 .../virtualcolumn/VirtualColumnIndexContainer.java |   6 +
 .../v2/store/StarTreeDimensionDataSource.java      |   6 +
 .../v2/store/StarTreeMetricDataSource.java         |   6 +
 .../core/common/RealtimeNoDictionaryTest.java      |   8 +-
 .../index/creator/BloomFilterCreatorTest.java      | 150 +++++++++++++++++++
 .../store/ColumnIndexDirectoryTestHelper.java      |  15 ++
 .../query/pruner/ColumnValueSegmentPrunerTest.java |   4 +-
 .../tests/BaseClusterIntegrationTest.java          |   6 +
 .../pinot/integration/tests/ClusterTest.java       |  26 ++--
 ...mentBuildPushOfflineClusterIntegrationTest.java |   2 +-
 .../tests/HybridClusterIntegrationTest.java        |   4 +-
 ...ridClusterIntegrationTestCommandLineRunner.java |   2 +-
 .../tests/OfflineClusterIntegrationTest.java       |  39 ++++-
 .../tests/RealtimeClusterIntegrationTest.java      |   4 +-
 .../tests/SimpleMinionClusterIntegrationTest.java  |   6 +-
 .../linkedin/pinot/perf/PerfBenchmarkRunner.java   |  71 ---------
 .../com/linkedin/pinot/perf/PerfBenchmarkTest.java |  77 ----------
 .../pinot/tools/perf/PerfBenchmarkDriver.java      |   6 +-
 .../pinot/tools/perf/PerfBenchmarkRunner.java      |  16 ++-
 pom.xml                                            |   1 +
 48 files changed, 1044 insertions(+), 212 deletions(-)

diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
index b57825c..35de1e3 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
@@ -43,6 +43,9 @@ public class IndexingConfig {
   @ConfigKey("sortedColumn")
   private List<String> _sortedColumn = new ArrayList<>();
 
+  @ConfigKey("bloomFilterColumns")
+  private List<String> _bloomFilterColumns = new ArrayList<>();
+
   @ConfigKey("loadMode")
   private String _loadMode;
 
@@ -113,6 +116,15 @@ public class IndexingConfig {
     _sortedColumn = sortedColumn;
   }
 
+  
+  public List<String> getBloomFilterColumns() {
+    return _bloomFilterColumns;
+  }
+
+  public void setBloomFilterColumns(List<String> _bloomFilterColumns) {
+    this._bloomFilterColumns = _bloomFilterColumns;
+  }
+
   public String getLoadMode() {
     return _loadMode;
   }
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
index 960c7f0..373f369 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
@@ -394,6 +394,7 @@ public class TableConfig {
     private List<String> _invertedIndexColumns;
     private List<String> _noDictionaryColumns;
     private List<String> _onHeapDictionaryColumns;
+    private List<String> _bloomFilterColumns;
     private Map<String, String> _streamConfigs;
     private String _streamPartitionAssignmentStrategy = DEFAULT_STREAM_PARTITION_ASSIGNMENT_STRATEGY;
 
@@ -508,6 +509,11 @@ public class TableConfig {
       return this;
     }
 
+    public Builder setBloomFilterColumns(List<String> bloomFilterColumns) {
+      _bloomFilterColumns = bloomFilterColumns;
+      return this;
+    }
+
     public Builder setNoDictionaryColumns(List<String> noDictionaryColumns) {
       _noDictionaryColumns = noDictionaryColumns;
       return this;
@@ -583,6 +589,7 @@ public class TableConfig {
       indexingConfig.setNoDictionaryColumns(_noDictionaryColumns);
       indexingConfig.setOnHeapDictionaryColumns(_onHeapDictionaryColumns);
       indexingConfig.setStreamConfigs(_streamConfigs);
+      indexingConfig.setBloomFilterColumns(_bloomFilterColumns);
       StreamConsumptionConfig streamConsumptionConfig = new StreamConsumptionConfig();
       streamConsumptionConfig.setStreamPartitionAssignmentStrategy(_streamPartitionAssignmentStrategy);
       indexingConfig.setStreamConsumptionConfig(streamConsumptionConfig);
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java b/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
index 3a2d1bb..b1e4962 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
@@ -82,6 +82,8 @@ public interface SegmentMetadata {
 
   String getBitmapInvertedIndexFileName(String column);
 
+  String getBloomFilterFileName(String column);
+
   String getCreatorName();
 
   char getPaddingCharacter();
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilter.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilter.java
new file mode 100644
index 0000000..9c9e62c
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilter.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+
+/**
+ * Interface for bloom filter
+ */
+public interface BloomFilter {
+
+  /**
+   * Get the version of bloom filter implementation
+   * @return a version
+   */
+  int getVersion();
+
+  /**
+   * Get the type of the bloom filter
+   * @return a bloom filter type
+   */
+  BloomFilterType getBloomFilterType();
+
+  /**
+   * Add element to bloom filter
+   *
+   * @param input input object
+   */
+  void add(Object input);
+
+  /**
+   * Check if the input element may exist or not
+   *
+   * @param input input object for testing
+   * @return true if the input may exist, false if it does not exist
+   */
+  boolean mightContain(Object input);
+
+  /**
+   * Serialize bloom filter to output stream.
+   *
+   * @param out output stream
+   * @throws IOException
+   */
+  void writeTo(OutputStream out) throws IOException;
+
+  /**
+   * Deserialize the bloom filter from input stream.
+   * @param in input stream
+   * @throws IOException
+   */
+  void readFrom(InputStream in) throws IOException;
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterType.java
new file mode 100644
index 0000000..75c9b1e
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterType.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Enum for bloom filter type
+ */
+public enum BloomFilterType {
+  // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
+  // when serializing/deserializing a bloom filter
+  GUAVA_ON_HEAP(1);
+
+  private int _value;
+  private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
+
+  BloomFilterType(int value) {
+    _value = value;
+  }
+
+  static {
+    for (BloomFilterType pageType : BloomFilterType.values()) {
+      _bloomFilterTypeMap.put(pageType._value, pageType);
+    }
+  }
+
+  public static BloomFilterType valueOf(int pageType) {
+    return _bloomFilterTypeMap.get(pageType);
+  }
+
+  public int getValue() {
+    return _value;
+  }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterUtil.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterUtil.java
new file mode 100644
index 0000000..7e240b0
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterUtil.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+/**
+ * Util class for bloom filter
+ */
+public class BloomFilterUtil {
+
+  private BloomFilterUtil() {
+  }
+
+  public static long computeNumBits(long cardinality, double maxFalsePosProbability) {
+    return (long) (Math.ceil(
+        (cardinality * Math.log(maxFalsePosProbability)) / Math.log(1.0 / Math.pow(2.0, Math.log(2.0)))));
+  }
+
+  public static int computeNumberOfHashFunctions(long cardinality, long numBits) {
+    return (int) Math.max(1.0, Math.round(((double) numBits / cardinality) * Math.log(2.0)));
+  }
+
+  public static double computeMaxFalsePosProbability(long cardinality, int numHashFunction, long numBits) {
+    return Math.pow(1.0 - Math.exp(-1.0 * numHashFunction / ((double) numBits / cardinality)), numHashFunction);
+  }
+
+  public static double computeMaxFalsePositiveProbabilityForNumBits(long cardinality, long maxNumBits,
+      double defaultMaxFalsePosProbability) {
+    // Get the number of bits required for achieving default false positive probability
+    long numBitsRequired = BloomFilterUtil.computeNumBits(cardinality, defaultMaxFalsePosProbability);
+
+    // If the size of bloom filter is smaller than 1MB, use default max false positive probability
+    if (numBitsRequired <= maxNumBits) {
+      return defaultMaxFalsePosProbability;
+    }
+
+    // If the size of bloom filter is larger than 1MB, compute the maximum false positive probability within
+    // storage limit
+    int numHashFunction = BloomFilterUtil.computeNumberOfHashFunctions(cardinality, maxNumBits);
+    return BloomFilterUtil.computeMaxFalsePosProbability(cardinality, numHashFunction, maxNumBits);
+  }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/GuavaOnHeapBloomFilter.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/GuavaOnHeapBloomFilter.java
new file mode 100644
index 0000000..881803d
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/GuavaOnHeapBloomFilter.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import com.google.common.hash.Funnels;
+
+
+/**
+ * Bloom filter implementation with guava library
+ */
+public class GuavaOnHeapBloomFilter implements BloomFilter {
+  // Increment the version when the bloom filter implementation becomes backward incompatible
+  private static final int VERSION = 1;
+
+  private com.google.common.hash.BloomFilter _bloomFilter;
+
+  public GuavaOnHeapBloomFilter() {
+  }
+
+  public GuavaOnHeapBloomFilter(int cardinality, double maxFalsePosProbability) {
+    _bloomFilter =
+        com.google.common.hash.BloomFilter.create(Funnels.stringFunnel(Charset.forName("UTF-8")), cardinality,
+            maxFalsePosProbability);
+  }
+
+  @Override
+  public int getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public BloomFilterType getBloomFilterType() {
+    return BloomFilterType.GUAVA_ON_HEAP;
+  }
+
+  @Override
+  public void add(Object input) {
+    _bloomFilter.put(input.toString());
+  }
+
+  @Override
+  public boolean mightContain(Object input) {
+    return _bloomFilter.mightContain(input.toString());
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    _bloomFilter.writeTo(out);
+  }
+
+  @Override
+  public void readFrom(InputStream in) throws IOException {
+    _bloomFilter =
+        com.google.common.hash.BloomFilter.readFrom(in, Funnels.stringFunnel(Charset.forName("UTF-8")));
+  }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/SegmentBloomFilterFactory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/SegmentBloomFilterFactory.java
new file mode 100644
index 0000000..c8c580c
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/SegmentBloomFilterFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+/**
+ * Factory for bloom filter
+ */
+public class SegmentBloomFilterFactory {
+
+  /**
+   * Factory used when creating a new bloom filter
+   *
+   * @param cardinality cardinality of column
+   * @param maxFalsePosProbability maximum false positive probability
+   * @return a bloom filter
+   */
+  public static BloomFilter createSegmentBloomFilter(int cardinality, double maxFalsePosProbability) {
+    // TODO: when we add more types of bloom filter, we will need to add a new config and wire in here
+    return new GuavaOnHeapBloomFilter(cardinality, maxFalsePosProbability);
+  }
+
+  /**
+   * Factory used when deserializing a bloom filter
+   *
+   * @param type a bloom filter type
+   * @return a bloom filter based on the given type
+   */
+  public static BloomFilter createSegmentBloomFilter(BloomFilterType type) {
+    switch (type) {
+      case GUAVA_ON_HEAP:
+        return new GuavaOnHeapBloomFilter();
+    }
+    throw new RuntimeException("Invalid bloom filter type: " + type.toString());
+  }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java
index b44d721..6397024 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.core.common;
 
 import com.linkedin.pinot.core.operator.BaseOperator;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 
@@ -26,4 +27,6 @@ public abstract class DataSource extends BaseOperator {
   public abstract InvertedIndexReader getInvertedIndex();
 
   public abstract Dictionary getDictionary();
+  
+  public abstract BloomFilterReader getBloomFilter();  
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index a32068a..9bd9de5 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -35,6 +35,7 @@ import com.linkedin.pinot.core.realtime.impl.invertedindex.RealtimeInvertedIndex
 import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
 import com.linkedin.pinot.core.segment.index.data.source.ColumnDataSource;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnContext;
 import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProvider;
 import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
@@ -79,6 +80,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, DataFileReader> _indexReaderWriterMap = new HashMap<>();
   private final Map<String, Integer> _maxNumValuesMap = new HashMap<>();
   private final Map<String, RealtimeInvertedIndexReader> _invertedIndexMap = new HashMap<>();
+  private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
   private final IdMap<FixedIntArray> _recordIdMap;
   private boolean _aggregateMetrics;
 
@@ -386,7 +388,8 @@ public class MutableSegmentImpl implements MutableSegment {
   public ColumnDataSource getDataSource(String columnName) {
     if (!_schema.isVirtualColumn(columnName)) {
       return new ColumnDataSource(_schema.getFieldSpecFor(columnName), _numDocsIndexed, _maxNumValuesMap.get(columnName),
-          _indexReaderWriterMap.get(columnName), _invertedIndexMap.get(columnName), _dictionaryMap.get(columnName));
+          _indexReaderWriterMap.get(columnName), _invertedIndexMap.get(columnName), _dictionaryMap.get(columnName),
+          _bloomFilterMap.get(columnName));
     } else {
       return getVirtualDataSource(columnName);
     }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
index 258fae9..7d61461 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
@@ -15,13 +15,15 @@
  */
 package com.linkedin.pinot.core.query.pruner;
 
-import java.util.List;
-import java.util.Map;
 import com.linkedin.pinot.common.data.FieldSpec;
 import com.linkedin.pinot.common.request.FilterOperator;
 import com.linkedin.pinot.common.utils.request.FilterQueryTree;
 import com.linkedin.pinot.core.query.exception.BadQueryRequestException;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+
+import java.util.List;
+import java.util.Map;
 import javax.annotation.Nonnull;
 
 
@@ -32,7 +34,8 @@ import javax.annotation.Nonnull;
  */
 public abstract class AbstractSegmentPruner implements SegmentPruner {
 
-  public abstract boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap);
+  public abstract boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap,
+      Map<String, BloomFilterReader> bloomFilterMap);
 
   /**
    * Given a non leaf filter query tree node prunes it as follows:
@@ -46,8 +49,8 @@ public abstract class AbstractSegmentPruner implements SegmentPruner {
    *
    * @return True to prune, false otherwise
    */
-  protected boolean pruneNonLeaf(@Nonnull FilterQueryTree filterQueryTree,
-      @Nonnull Map<String, ColumnMetadata> columnMetadataMap) {
+  protected boolean pruneNonLeaf(@Nonnull FilterQueryTree filterQueryTree, @Nonnull Map<String, ColumnMetadata> columnMetadataMap,
+      Map<String, BloomFilterReader> bloomFilterMap) {
     List<FilterQueryTree> children = filterQueryTree.getChildren();
 
     if (children.isEmpty()) {
@@ -58,7 +61,7 @@ public abstract class AbstractSegmentPruner implements SegmentPruner {
     switch (filterOperator) {
       case AND:
         for (FilterQueryTree child : children) {
-          if (pruneSegment(child, columnMetadataMap)) {
+          if (pruneSegment(child, columnMetadataMap, bloomFilterMap)) {
             return true;
           }
         }
@@ -66,7 +69,7 @@ public abstract class AbstractSegmentPruner implements SegmentPruner {
 
       case OR:
         for (FilterQueryTree child : children) {
-          if (!pruneSegment(child, columnMetadataMap)) {
+          if (!pruneSegment(child, columnMetadataMap, bloomFilterMap)) {
             return false;
           }
         }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index a2953ce..e94af5c 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -23,8 +23,13 @@ import com.linkedin.pinot.core.indexsegment.IndexSegment;
 import com.linkedin.pinot.core.query.request.ServerQueryRequest;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
 import javax.annotation.Nonnull;
 import org.apache.commons.configuration.Configuration;
 
@@ -48,7 +53,17 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
     // For realtime segment, this map can be null.
     Map<String, ColumnMetadata> columnMetadataMap =
         ((SegmentMetadataImpl) segment.getSegmentMetadata()).getColumnMetadataMap();
-    return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap);
+
+    Map<String, BloomFilterReader> bloomFilterMap = new HashMap<>();
+    if (columnMetadataMap != null) {
+      for (String column : columnMetadataMap.keySet()) {
+        BloomFilterReader bloomFilterReader = segment.getDataSource(column).getBloomFilter();
+        if (bloomFilterReader != null) {
+          bloomFilterMap.put(column, bloomFilterReader);
+        }
+      }
+    }
+    return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap, bloomFilterMap);
   }
 
   @Override
@@ -74,7 +89,7 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
   @SuppressWarnings("unchecked")
   @Override
   public boolean pruneSegment(@Nonnull FilterQueryTree filterQueryTree,
-      @Nonnull Map<String, ColumnMetadata> columnMetadataMap) {
+      @Nonnull Map<String, ColumnMetadata> columnMetadataMap, Map<String, BloomFilterReader> bloomFilterMap) {
     FilterOperator filterOperator = filterQueryTree.getOperator();
     List<FilterQueryTree> children = filterQueryTree.getChildren();
 
@@ -86,7 +101,8 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
         return false;
       }
 
-      ColumnMetadata columnMetadata = columnMetadataMap.get(filterQueryTree.getColumn());
+      String column = filterQueryTree.getColumn();
+      ColumnMetadata columnMetadata = columnMetadataMap.get(column);
       if (columnMetadata == null) {
         // Should not reach here after DataSchemaSegmentPruner
         return true;
@@ -97,16 +113,22 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
 
       if (filterOperator == FilterOperator.EQUALITY) {
         // EQUALITY
+        boolean pruneSegment = false;
+        FieldSpec.DataType dataType = columnMetadata.getDataType();
+        Comparable value = getValue(filterQueryTree.getValue().get(0), dataType);
 
-        // Doesn't have min/max value set in metadata
-        if ((minValue == null) || (maxValue == null)) {
-          return false;
+        // Check if the value is in the min/max range
+        if (minValue != null && maxValue != null) {
+          pruneSegment = (value.compareTo(minValue) < 0) || (value.compareTo(maxValue) > 0);
         }
 
-        // Check if the value is in the min/max range
-        FieldSpec.DataType dataType = columnMetadata.getDataType();
-        Comparable value = getValue(filterQueryTree.getValue().get(0), dataType);
-        return (value.compareTo(minValue) < 0) || (value.compareTo(maxValue) > 0);
+        // If the bloom filter is available for the column, check if the value may exist
+        if (!pruneSegment && bloomFilterMap.containsKey(column)) {
+          BloomFilterReader bloomFilterReader = bloomFilterMap.get(column);
+          pruneSegment = !bloomFilterReader.mightContain(value);
+        }
+
+        return pruneSegment;
       } else {
         // RANGE
 
@@ -170,7 +192,7 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
       }
     } else {
       // Parent node
-      return pruneNonLeaf(filterQueryTree, columnMetadataMap);
+      return pruneNonLeaf(filterQueryTree, columnMetadataMap, bloomFilterMap);
     }
   }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
index f8e1cd9..5f35b7f 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
@@ -22,6 +22,9 @@ import com.linkedin.pinot.core.indexsegment.IndexSegment;
 import com.linkedin.pinot.core.query.request.ServerQueryRequest;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.configuration.Configuration;
@@ -59,7 +62,7 @@ public class PartitionSegmentPruner extends AbstractSegmentPruner {
     Map<String, ColumnMetadata> columnMetadataMap =
         ((SegmentMetadataImpl) segment.getSegmentMetadata()).getColumnMetadataMap();
 
-    return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap);
+    return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap, Collections.emptyMap());
   }
 
   /**
@@ -74,12 +77,13 @@ public class PartitionSegmentPruner extends AbstractSegmentPruner {
    * @return True if segment can be pruned, false otherwise
    */
   @Override
-  public boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap) {
+  public boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap,
+      Map<String, BloomFilterReader> bloomFilterMap) {
     List<FilterQueryTree> children = filterQueryTree.getChildren();
 
     // Non-leaf node
     if (children != null && !children.isEmpty()) {
-      return pruneNonLeaf(filterQueryTree, columnMetadataMap);
+      return pruneNonLeaf(filterQueryTree, columnMetadataMap, bloomFilterMap);
     }
 
     // TODO: Enhance partition based pruning for RANGE operator.
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
index 772d5a3..c3facda 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
@@ -57,6 +57,7 @@ public class V1Constants {
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
+    public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
   }
 
   public static class MetadataKeys {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
new file mode 100644
index 0000000..c7d1293
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.segment.creator.impl.bloom;
+
+import com.linkedin.pinot.core.bloom.BloomFilterUtil;
+import com.linkedin.pinot.core.bloom.BloomFilter;
+import com.linkedin.pinot.core.bloom.SegmentBloomFilterFactory;
+import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+
+/**
+ * Bloom filter creator
+ *
+ * Note:
+ * 1. Currently, we limit the filter size to 1MB to avoid the heap overhead. We can remove it once we have the offheap
+ *    implementation of the bloom filter.
+ * 2. When capping the bloom filter to 1MB, max false pos steeply grows from 1 million cardinality. If the column has
+ *    larger than "5 million" cardinality, it is not recommended to use bloom filter since maxFalsePosProb is already
+ *    0.45 when the filter size is 1MB.
+ */
+public class BloomFilterCreator implements AutoCloseable {
+  private static double DEFAULT_MAX_FALSE_POS_PROBABILITY = 0.05;
+  private static int MB_IN_BITS = 8388608;
+
+  private BloomFilter _bloomFilter;
+  private File _bloomFilterFile;
+
+  public BloomFilterCreator(File indexDir, String columnName, int cardinality) {
+    _bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+    double maxFalsePosProbability =
+        BloomFilterUtil.computeMaxFalsePositiveProbabilityForNumBits(cardinality, MB_IN_BITS,
+            DEFAULT_MAX_FALSE_POS_PROBABILITY);
+    _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(cardinality, maxFalsePosProbability);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try (DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(_bloomFilterFile))) {
+      outputStream.writeInt(_bloomFilter.getBloomFilterType().getValue());
+      outputStream.writeInt(_bloomFilter.getVersion());
+      _bloomFilter.writeTo(outputStream);
+    }
+  }
+
+  public void add(Object input) {
+    _bloomFilter.add(input.toString());
+  }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
index 4b1c8af..5d8f164 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
@@ -555,6 +555,11 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     return column + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
   }
 
+  @Override
+  public String getBloomFilterFileName(String column) {
+    return column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION;
+  }
+
   @Nullable
   @Override
   public String getCreatorName() {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
index 8850ed2..6e4e515 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.core.segment.index.column;
 
 import com.linkedin.pinot.core.io.reader.DataFileReader;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 
@@ -39,4 +40,6 @@ public interface ColumnIndexContainer {
    * Returns the dictionary for the column, or {@code null} if it does not exist.
    */
   Dictionary getDictionary();
+  
+  BloomFilterReader getBloomFilter();
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index fb78751..29ef748 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -27,6 +27,7 @@ import com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.loader.IndexLoadingConfig;
 import com.linkedin.pinot.core.segment.index.readers.BitmapInvertedIndexReader;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.BytesDictionary;
 import com.linkedin.pinot.core.segment.index.readers.DoubleDictionary;
 import com.linkedin.pinot.core.segment.index.readers.FloatDictionary;
@@ -44,6 +45,7 @@ import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
 import com.linkedin.pinot.core.segment.store.ColumnIndexType;
 import com.linkedin.pinot.core.segment.store.SegmentDirectory;
 import java.io.IOException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,18 +56,29 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
   private final DataFileReader _forwardIndex;
   private final InvertedIndexReader _invertedIndex;
   private final ImmutableDictionaryReader _dictionary;
+  private final BloomFilterReader _bloomFilterReader;
 
   public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata,
       IndexLoadingConfig indexLoadingConfig) throws IOException {
     String columnName = metadata.getColumnName();
     boolean loadInvertedIndex = false;
     boolean loadOnHeapDictionary = false;
+    boolean loadBloomFilter = false;
     if (indexLoadingConfig != null) {
       loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
       loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
+      loadBloomFilter = indexLoadingConfig.getBloomFilterColumns().contains(columnName);
     }
     PinotDataBuffer fwdIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX);
+
     if (metadata.hasDictionary()) {
+      //bloom filter
+      if (loadBloomFilter) {
+        PinotDataBuffer bloomFilterBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER);
+        _bloomFilterReader = new BloomFilterReader(bloomFilterBuffer);
+      } else {
+        _bloomFilterReader = null;
+      }
       // Dictionary-based index
       _dictionary = loadDictionary(segmentReader.getIndexFor(columnName, ColumnIndexType.DICTIONARY), metadata,
           loadOnHeapDictionary);
@@ -100,6 +113,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
       _invertedIndex = null;
       _dictionary = null;
+      _bloomFilterReader = null;
     }
   }
 
@@ -118,6 +132,12 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
     return _dictionary;
   }
 
+  @Override
+  public BloomFilterReader getBloomFilter() {
+    return _bloomFilterReader;
+  }
+
+
   private static ImmutableDictionaryReader loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata,
       boolean loadOnHeap) {
     FieldSpec.DataType dataType = metadata.getDataType();
@@ -175,4 +195,5 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
         throw new IllegalStateException("Illegal data type for raw forward index: " + dataType);
     }
   }
+
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
index 16c79f4..6f89e71 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
@@ -30,6 +30,7 @@ import com.linkedin.pinot.core.operator.blocks.SingleValueBlock;
 import com.linkedin.pinot.core.realtime.impl.dictionary.MutableDictionary;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.column.ColumnIndexContainer;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 
@@ -44,6 +45,7 @@ public final class ColumnDataSource extends DataSource {
   private final DataFileReader _forwardIndex;
   private final InvertedIndexReader _invertedIndex;
   private final Dictionary _dictionary;
+  private final BloomFilterReader _bloomFilter;
   private final int _cardinality;
   private final DataSourceMetadata _metadata;
 
@@ -53,21 +55,22 @@ public final class ColumnDataSource extends DataSource {
   public ColumnDataSource(ColumnIndexContainer indexContainer, ColumnMetadata metadata) {
     this(metadata.getColumnName(), metadata.getDataType(), metadata.isSingleValue(), metadata.isSorted(),
         metadata.getTotalDocs(), metadata.getMaxNumberOfMultiValues(), indexContainer.getForwardIndex(),
-        indexContainer.getInvertedIndex(), indexContainer.getDictionary(), metadata.getCardinality());
+        indexContainer.getInvertedIndex(), indexContainer.getDictionary(), indexContainer.getBloomFilter(),
+        metadata.getCardinality());
   }
 
   /**
    * For REALTIME segment.
    */
   public ColumnDataSource(FieldSpec fieldSpec, int numDocs, int maxNumMultiValues, DataFileReader forwardIndex,
-      InvertedIndexReader invertedIndex, MutableDictionary dictionary) {
+      InvertedIndexReader invertedIndex, MutableDictionary dictionary, BloomFilterReader bloomFilter) {
     this(fieldSpec.getName(), fieldSpec.getDataType(), fieldSpec.isSingleValueField(), false, numDocs,
-        maxNumMultiValues, forwardIndex, invertedIndex, dictionary, Constants.UNKNOWN_CARDINALITY);
+        maxNumMultiValues, forwardIndex, invertedIndex, dictionary, bloomFilter, Constants.UNKNOWN_CARDINALITY);
   }
 
   private ColumnDataSource(String columnName, FieldSpec.DataType dataType, boolean isSingleValue, boolean isSorted,
       int numDocs, int maxNumMultiValues, DataFileReader forwardIndex, InvertedIndexReader invertedIndex,
-      Dictionary dictionary, int cardinality) {
+      Dictionary dictionary, BloomFilterReader bloomFilterReader, int cardinality) {
     // Sanity check
     if (isSingleValue) {
       Preconditions.checkState(forwardIndex instanceof SingleColumnSingleValueReader);
@@ -93,6 +96,7 @@ public final class ColumnDataSource extends DataSource {
     _forwardIndex = forwardIndex;
     _invertedIndex = invertedIndex;
     _dictionary = dictionary;
+    _bloomFilter = bloomFilterReader;
     _cardinality = cardinality;
 
     _metadata = new DataSourceMetadata() {
@@ -154,6 +158,11 @@ public final class ColumnDataSource extends DataSource {
   }
 
   @Override
+  public BloomFilterReader getBloomFilter() {
+    return _bloomFilter;
+  }
+
+  @Override
   protected Block getNextBlock() {
     if (_isSingleValue) {
       return new SingleValueBlock((SingleColumnSingleValueReader) _forwardIndex, _numDocs, _dataType, _dictionary);
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 6381014..7c54c90 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -44,6 +44,8 @@ public class IndexLoadingConfig {
   private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
   private Map<String, String> _noDictionaryConfig = new HashMap<>();
   private Set<String> _onHeapDictionaryColumns = new HashSet<>();
+  private Set<String> _bloomFilterColumns = new HashSet<>();
+
   private SegmentVersion _segmentVersion;
   // This value will remain true only when the empty constructor is invoked.
   private boolean _enableDefaultColumns = true;
@@ -76,6 +78,11 @@ public class IndexLoadingConfig {
       _invertedIndexColumns.addAll(invertedIndexColumns);
     }
 
+    List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
+    if (bloomFilterColumns != null) {
+      _bloomFilterColumns.addAll(bloomFilterColumns);
+    }
+
     List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
     if (noDictionaryColumns != null) {
       _noDictionaryColumns.addAll(noDictionaryColumns);
@@ -164,6 +171,12 @@ public class IndexLoadingConfig {
   }
 
   @VisibleForTesting
+  public void setBloomFilterColumns(@Nonnull Set<String> bloomFilterColumns) {
+    _bloomFilterColumns = bloomFilterColumns;
+  }
+
+
+  @VisibleForTesting
   public void setOnHeapDictionaryColumns(@Nonnull Set<String> onHeapDictionaryColumns) {
     _onHeapDictionaryColumns = onHeapDictionaryColumns;
   }
@@ -183,6 +196,10 @@ public class IndexLoadingConfig {
     return _onHeapDictionaryColumns;
   }
 
+  public Set<String> getBloomFilterColumns() {
+    return _bloomFilterColumns;
+  }
+
   @Nullable
   public SegmentVersion getSegmentVersion() {
     return _segmentVersion;
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
index 6cf064d..7ee16ba 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -19,6 +19,7 @@ import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.segment.ReadMode;
 import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.loader.bloomfilter.BloomFilterHandler;
 import com.linkedin.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGenerator;
 import com.linkedin.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
 import com.linkedin.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
@@ -63,7 +64,6 @@ public class SegmentPreProcessor implements AutoCloseable {
     if (_segmentMetadata.getTotalDocs() == 0) {
       return;
     }
-
     // Remove all the existing inverted index temp files before loading segments.
     // NOTE: This step fixes the issue of temporary files not getting deleted after creating new inverted indexes.
     // In this, we look for all files in the directory and remove the ones with  '.bitmap.inv.tmp' extension.
@@ -91,6 +91,11 @@ public class SegmentPreProcessor implements AutoCloseable {
           new InvertedIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
       invertedIndexHandler.createInvertedIndices();
 
+      // Create bloom filter if required
+      BloomFilterHandler bloomFilterHandler =
+          new BloomFilterHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
+      bloomFilterHandler.createBloomFilters();
+
       // Add min/max value to column metadata according to the prune mode.
       // For star-tree index, because it can only increase the range, so min/max value can still be used in pruner.
       ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode =
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
new file mode 100644
index 0000000..2f6991e
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.segment.index.loader.bloomfilter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.linkedin.pinot.common.data.FieldSpec.DataType;
+import com.linkedin.pinot.core.indexsegment.generator.SegmentVersion;
+import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
+import com.linkedin.pinot.core.segment.creator.impl.bloom.BloomFilterCreator;
+import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.loader.IndexLoadingConfig;
+import com.linkedin.pinot.core.segment.index.loader.LoaderUtils;
+import com.linkedin.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import com.linkedin.pinot.core.segment.index.readers.DoubleDictionary;
+import com.linkedin.pinot.core.segment.index.readers.FloatDictionary;
+import com.linkedin.pinot.core.segment.index.readers.ImmutableDictionaryReader;
+import com.linkedin.pinot.core.segment.index.readers.IntDictionary;
+import com.linkedin.pinot.core.segment.index.readers.LongDictionary;
+import com.linkedin.pinot.core.segment.index.readers.StringDictionary;
+import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
+import com.linkedin.pinot.core.segment.store.ColumnIndexType;
+import com.linkedin.pinot.core.segment.store.SegmentDirectory;
+
+
+public class BloomFilterHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(InvertedIndexHandler.class);
+
+  private final File _indexDir;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final String _segmentName;
+  private final SegmentVersion _segmentVersion;
+  private final Set<ColumnMetadata> _bloomFilterColumns = new HashSet<>();
+
+  public BloomFilterHandler(@Nonnull File indexDir, @Nonnull SegmentMetadataImpl segmentMetadata,
+      @Nonnull IndexLoadingConfig indexLoadingConfig, @Nonnull SegmentDirectory.Writer segmentWriter) {
+    _indexDir = indexDir;
+    _segmentWriter = segmentWriter;
+    _segmentName = segmentMetadata.getName();
+    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+    for (String column : indexLoadingConfig.getBloomFilterColumns()) {
+      ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+      if (columnMetadata != null) {
+        _bloomFilterColumns.add(columnMetadata);
+      }
+    }
+  }
+
+  public void createBloomFilters() throws Exception {
+    for (ColumnMetadata columnMetadata : _bloomFilterColumns) {
+      if (columnMetadata.hasDictionary()) {
+        createBloomFilterForColumn(columnMetadata);
+      }
+    }
+  }
+
+  private void createBloomFilterForColumn(ColumnMetadata columnMetadata) throws Exception {
+    String columnName = columnMetadata.getColumnName();
+
+    File bloomFilterFileInProgress = new File(_indexDir, columnName + ".bloom.inprogress");
+    File bloomFilterFile = new File(_indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+
+    if (!bloomFilterFileInProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+      if (_segmentWriter.hasIndexFor(columnName, ColumnIndexType.BLOOM_FILTER)) {
+        // Skip creating bloom filter index if already exists.
+        LOGGER.info("Found bloom filter for segment: {}, column: {}", _segmentName, columnName);
+        return;
+      }
+      // Create a marker file.
+      FileUtils.touch(bloomFilterFileInProgress);
+    } else {
+      // Marker file exists, which means last run gets interrupted.
+
+      // Remove bloom filter file.
+      FileUtils.deleteQuietly(bloomFilterFile);
+    }
+
+    // Create new bloom filter for the column.
+    LOGGER.info("Creating new bloom filter for segment: {}, column: {}", _segmentName, columnName);
+    try (BloomFilterCreator creator = new BloomFilterCreator(_indexDir, columnName,
+        columnMetadata.getCardinality())) {
+      if (columnMetadata.hasDictionary()) {
+        // Read dictionary
+        try (ImmutableDictionaryReader dictionaryReader = getDictionaryReader(columnMetadata, _segmentWriter)) {
+          for (int i = 0; i < dictionaryReader.length(); i++) {
+            creator.add(dictionaryReader.get(i));
+          }
+        }
+      } else {
+        // Read the forward index
+        throw new UnsupportedOperationException("Bloom filters not supported for no dictionary columns");
+      }
+    }
+
+    // For v3, write the generated bloom filter file into the single file and remove it.
+    if (_segmentVersion == SegmentVersion.v3) {
+      LoaderUtils.writeIndexToV3Format(_segmentWriter, columnName, bloomFilterFile, ColumnIndexType.BLOOM_FILTER);
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(bloomFilterFileInProgress);
+    LOGGER.info("Created bloom filter for segment: {}, column: {}", _segmentName, columnName);
+  }
+
+  private ImmutableDictionaryReader getDictionaryReader(ColumnMetadata columnMetadata,
+      SegmentDirectory.Writer segmentWriter) throws IOException {
+    PinotDataBuffer dictionaryBuffer =
+        segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
+    int cardinality = columnMetadata.getCardinality();
+    ImmutableDictionaryReader dictionaryReader;
+    DataType dataType = columnMetadata.getDataType();
+    switch (dataType) {
+      case INT:
+        dictionaryReader = new IntDictionary(dictionaryBuffer, cardinality);
+        break;
+      case LONG:
+        dictionaryReader = new LongDictionary(dictionaryBuffer, cardinality);
+        break;
+      case FLOAT:
+        dictionaryReader = new FloatDictionary(dictionaryBuffer, cardinality);
+        break;
+      case DOUBLE:
+        dictionaryReader = new DoubleDictionary(dictionaryBuffer, cardinality);
+        break;
+      case STRING:
+        dictionaryReader = new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength(),
+            (byte) columnMetadata.getPaddingCharacter());
+        break;
+      default:
+        throw new IllegalStateException(
+            "Unsupported data type: " + dataType + " for column: " + columnMetadata.getColumnName());
+    }
+    return dictionaryReader;
+  }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java
new file mode 100644
index 0000000..349640f
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.segment.index.readers;
+
+import com.linkedin.pinot.core.bloom.BloomFilterType;
+import com.linkedin.pinot.core.bloom.BloomFilter;
+import com.linkedin.pinot.core.bloom.SegmentBloomFilterFactory;
+import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+
+/**
+ * Bloom filter reader
+ */
+public class BloomFilterReader {
+
+  private BloomFilter _bloomFilter;
+
+  public BloomFilterReader(PinotDataBuffer bloomFilterBuffer) throws IOException {
+    byte[] buffer = new byte[(int) bloomFilterBuffer.size()];
+    bloomFilterBuffer.copyTo(0, buffer);
+
+    try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(buffer))) {
+      BloomFilterType bloomFilterType = BloomFilterType.valueOf(in.readInt());
+      int version = in.readInt();
+      _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(bloomFilterType);
+      if (version != _bloomFilter.getVersion()) {
+        throw new IOException(
+            "Unexpected bloom filter version (type: " + bloomFilterType.toString() + ", version: " + version);
+      }
+      _bloomFilter.readFrom(in);
+    }
+  }
+
+  public boolean mightContain(Object key) {
+    return _bloomFilter.mightContain(key.toString());
+  }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
index 3579f1c..2973969 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
@@ -93,7 +93,14 @@ abstract class ColumnIndexDirectory implements Closeable {
    */
   public abstract PinotDataBuffer getInvertedIndexBufferFor(String column)
       throws IOException;
-
+  /**
+   * Get inverted bloom filter buffer for a column
+   * @param column column name
+   * @return in-memory ByteBuffer like buffer for data
+   * @throws IOException
+   */
+  public abstract PinotDataBuffer getBloomFilterBufferFor(String column)
+      throws IOException;
   /**
    * Allocate a new data buffer of specified sizeBytes in the columnar index directory
    * @param column column name
@@ -121,6 +128,15 @@ abstract class ColumnIndexDirectory implements Closeable {
    */
   public abstract PinotDataBuffer newInvertedIndexBuffer(String column, long sizeBytes)
       throws IOException;
+  /**
+   * Allocate a new data buffer of specified sizeBytes in the columnar index directory
+   * @param column column name
+   * @param sizeBytes sizeBytes for the buffer allocation
+   * @return in-memory ByteBuffer like buffer for data
+   * @throws IOException
+   */
+  public abstract PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes)
+      throws IOException;
 
   /**
    * Check if an index exists for a column
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
index 85d38f7..5f7e008 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
@@ -18,7 +18,8 @@ package com.linkedin.pinot.core.segment.store;
 public enum ColumnIndexType {
   DICTIONARY("dictionary"),
   FORWARD_INDEX("forward_index"),
-  INVERTED_INDEX("inverted_index");
+  INVERTED_INDEX("inverted_index"),
+  BLOOM_FILTER("bloom_filter");
 
   private final String indexName;
   ColumnIndexType(String name) {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
index c6ea2d9..8599cde 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -81,6 +81,20 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
   }
 
   @Override
+  public PinotDataBuffer getBloomFilterBufferFor(String column)
+      throws IOException {
+    IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER);
+    return getReadBufferFor(key);
+  }
+
+  @Override
+  public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes)
+      throws IOException {
+    IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER);
+    return getWriteBufferFor(key, sizeBytes);
+  }
+
+  @Override
   public boolean hasIndexFor(String column, ColumnIndexType type) {
     File indexFile = getFileFor(column, type);
     return indexFile.exists();
@@ -140,6 +154,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
       case INVERTED_INDEX:
         filename = metadata.getBitmapInvertedIndexFileName(column);
         break;
+      case BLOOM_FILTER:
+        filename = metadata.getBloomFilterFileName(column);
+        break;
       default:
         throw new UnsupportedOperationException("Unknown index type: " + indexType.toString());
     }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
index d29c37c..3705a82 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
@@ -233,6 +233,9 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
       case INVERTED_INDEX:
         buffer = columnIndexDirectory.getInvertedIndexBufferFor(column);
         break;
+      case BLOOM_FILTER:
+        buffer = columnIndexDirectory.getBloomFilterBufferFor(column);
+        break;
       default:
         throw new RuntimeException("Unknown index type: " + type.name());
     }
@@ -412,6 +415,8 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
           return columnIndexDirectory.newForwardIndexBuffer(key.name, sizeBytes);
         case INVERTED_INDEX:
           return columnIndexDirectory.newInvertedIndexBuffer(key.name, sizeBytes);
+        case BLOOM_FILTER:
+          return columnIndexDirectory.newBloomFilterBuffer(key.name, sizeBytes);
         default:
           throw new RuntimeException("Unknown index type: " + indexType.name() +
               " for directory: " + segmentDirectory);
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
index cf22cc6..5271985 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
@@ -105,6 +105,12 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
   }
 
   @Override
+  public PinotDataBuffer getBloomFilterBufferFor(String column)
+      throws IOException {
+    return checkAndGetIndexBuffer(column, ColumnIndexType.BLOOM_FILTER);
+  }
+
+  @Override
   public boolean hasIndexFor(String column, ColumnIndexType type) {
     IndexKey key = new IndexKey(column, type);
     return columnEntries.containsKey(key);
@@ -128,6 +134,12 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
     return  allocNewBufferInternal(column, ColumnIndexType.INVERTED_INDEX, sizeBytes, "inverted_index.create");
   }
 
+  @Override
+  public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes)
+      throws IOException {
+    return  allocNewBufferInternal(column, ColumnIndexType.BLOOM_FILTER, sizeBytes, "bloom_filter.create");
+  }
+
   private PinotDataBuffer checkAndGetIndexBuffer(String column, ColumnIndexType type) {
     IndexKey key = new IndexKey(column, type);
     IndexEntry entry = columnEntries.get(key);
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
index aedd6db..f3d1c52 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.core.segment.virtualcolumn;
 import com.linkedin.pinot.core.io.reader.DataFileReader;
 import com.linkedin.pinot.core.segment.index.column.ColumnIndexContainer;
 import com.linkedin.pinot.core.segment.index.column.PhysicalColumnIndexContainer;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 
@@ -51,4 +52,9 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer {
   public Dictionary getDictionary() {
     return _dictionary;
   }
+
+  @Override
+  public BloomFilterReader getBloomFilter() {
+    return null;
+  }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
index b450ff5..8959969 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
@@ -21,6 +21,7 @@ import com.linkedin.pinot.core.common.DataSource;
 import com.linkedin.pinot.core.common.DataSourceMetadata;
 import com.linkedin.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
 import com.linkedin.pinot.core.operator.blocks.SingleValueBlock;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
@@ -100,6 +101,11 @@ public class StarTreeDimensionDataSource extends DataSource {
   }
 
   @Override
+  public BloomFilterReader getBloomFilter() {
+    return null;
+  }
+
+  @Override
   public Dictionary getDictionary() {
     return _dictionary;
   }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
index 31f742a..f9ac54d 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
@@ -24,6 +24,7 @@ import com.linkedin.pinot.core.io.reader.impl.v1.BaseChunkSingleValueReader;
 import com.linkedin.pinot.core.io.reader.impl.v1.FixedByteChunkSingleValueReader;
 import com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
 import com.linkedin.pinot.core.operator.blocks.SingleValueBlock;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.core.segment.index.readers.Dictionary;
 import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
 import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
@@ -110,6 +111,11 @@ public class StarTreeMetricDataSource extends DataSource {
   }
 
   @Override
+  public BloomFilterReader getBloomFilter() {
+    return null;
+  }
+
+  @Override
   protected Block getNextBlock() {
     return new SingleValueBlock(_forwardIndex, _numDocs, _dataType, null);
   }
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
index a278149..52b8542 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -86,10 +86,10 @@ public class RealtimeNoDictionaryTest {
     }
 
     Map<String, DataSource> dataSourceBlock = new HashMap<>();
-    dataSourceBlock.put(INT_COL_NAME, new ColumnDataSource(intSpec, NUM_ROWS, 0, intRawIndex, null, null));
-    dataSourceBlock.put(LONG_COL_NAME, new ColumnDataSource(longSpec, NUM_ROWS, 0, longRawIndex, null, null));
-    dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec, NUM_ROWS, 0, floatRawIndex, null, null));
-    dataSourceBlock.put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, NUM_ROWS, 0, doubleRawIndex, null, null));
+    dataSourceBlock.put(INT_COL_NAME, new ColumnDataSource(intSpec, NUM_ROWS, 0, intRawIndex, null, null, null));
+    dataSourceBlock.put(LONG_COL_NAME, new ColumnDataSource(longSpec, NUM_ROWS, 0, longRawIndex, null, null, null));
+    dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec, NUM_ROWS, 0, floatRawIndex, null, null, null));
+    dataSourceBlock.put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, NUM_ROWS, 0, doubleRawIndex, null, null, null));
 
     return new DataFetcher(dataSourceBlock);
   }
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
new file mode 100644
index 0000000..4a77a5d
--- /dev/null
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
@@ -0,0 +1,150 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.segment.index.creator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.BloomFilter;
+import com.linkedin.pinot.common.data.DimensionFieldSpec;
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.core.bloom.BloomFilterType;
+import com.linkedin.pinot.core.bloom.GuavaOnHeapBloomFilter;
+import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
+import com.linkedin.pinot.core.segment.creator.impl.bloom.BloomFilterCreator;
+import com.linkedin.pinot.core.bloom.BloomFilterUtil;
+
+import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import com.google.common.hash.Funnels;
+
+
+public class BloomFilterCreatorTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "BloomFilterCreatorTest");
+  private static int MB_IN_BYTES = 1024 * 1024;
+
+
+  public void setUp() throws Exception {
+    if (TEMP_DIR.exists()) {
+      FileUtils.deleteQuietly(TEMP_DIR);
+    }
+    TEMP_DIR.mkdir();
+  }
+
+  @Test
+  public void testBloomFilterUtil() {
+    // Test against the known results
+    Assert.assertEquals(BloomFilterUtil.computeNumBits(1000000, 0.03), 7298441);
+    Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.03), 72984409);
+    Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.1), 47925292);
+
+    Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(1000000, 7298441), 5);
+    Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 72984409), 5);
+    Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 47925292), 3);
+
+    double threshold = 0.001;
+    Assert.assertTrue(
+        compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(1000000, 5, 7298441), 0.03, threshold));
+    Assert.assertTrue(
+        compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 5, 72984409), 0.03, threshold));
+    Assert.assertTrue(
+        compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 3, 47925292), 0.1, threshold));
+  }
+
+  private boolean compareDouble(double a, double b, double threshold) {
+    if (Math.abs(a - b) < threshold) {
+      return true;
+    }
+    return false;
+  }
+
+  @Test
+  public void testBloomFilterCreator() throws Exception {
+    // Create bloom filter directory
+    File bloomFilterDir = new File(TEMP_DIR, "bloomFilterDir");
+    bloomFilterDir.mkdirs();
+
+    // Create a bloom filter and serialize it to a file
+    int cardinality = 10000;
+    String columnName = "testColumn";
+    BloomFilterCreator
+        bloomFilterCreator = new BloomFilterCreator(bloomFilterDir, columnName, cardinality);
+    for (int i = 0; i < 5; i++) {
+      bloomFilterCreator.add(Integer.toString(i));
+    }
+    bloomFilterCreator.close();
+
+    // Deserialize the bloom filter and validate
+    File bloomFilterFile = new File(bloomFilterDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+
+    try (DataInputStream in = new DataInputStream(new FileInputStream(bloomFilterFile))) {
+      BloomFilterType type = BloomFilterType.valueOf(in.readInt());
+      int version = in.readInt();
+      GuavaOnHeapBloomFilter bloomFilter = new GuavaOnHeapBloomFilter();
+
+      Assert.assertEquals(type, bloomFilter.getBloomFilterType());
+      Assert.assertEquals(version, bloomFilter.getVersion());
+
+      bloomFilter.readFrom(in);
+      for (int i = 0; i < 5; i++) {
+        Assert.assertTrue(bloomFilter.mightContain(Integer.toString(i)));
+      }
+      for (int j = 5; j < 10; j++) {
+        Assert.assertFalse(bloomFilter.mightContain(Integer.toString(j)));
+      }
+    }
+  }
+
+  @Test
+  public void testBloomFilterSize() throws Exception {
+    int cardinalityArray[] = new int[] { 10, 100, 1000, 100000, 100000 , 1000000, 5000000, 10000000};
+    for (int cardinality : cardinalityArray) {
+      FileUtils.deleteQuietly(TEMP_DIR);
+      File indexDir = new File(TEMP_DIR, "testBloomFilterSize");
+      Preconditions.checkState(indexDir.mkdirs());
+
+      String columnName = "testSize";
+      BloomFilterCreator
+          bloomFilterCreator = new BloomFilterCreator(indexDir, columnName, cardinality);
+      bloomFilterCreator.close();
+
+      File bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+
+      try (InputStream inputStream = new FileInputStream(bloomFilterFile)) {
+        byte[] bloomFilterBytes = IOUtils.toByteArray(inputStream);
+        long actualBloomFilterSize = bloomFilterBytes.length;
+        // Check if the size of bloom filter does not go beyond 1MB. Note that guava bloom filter has 11-12 bytes of
+        // overhead
+        Assert.assertTrue(actualBloomFilterSize < MB_IN_BYTES + 12);
+      }
+    }
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+}
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectoryTestHelper.java b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectoryTestHelper.java
index 7712ec0..a8851d0 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectoryTestHelper.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectoryTestHelper.java
@@ -50,6 +50,9 @@ public class ColumnIndexDirectoryTestHelper {
       case INVERTED_INDEX:
         buf = columnDirectory.newInvertedIndexBuffer(columnName, size);
         break;
+      case BLOOM_FILTER:
+        buf = columnDirectory.newBloomFilterBuffer(columnName, size);
+        break;
     }
     return buf;
   }
@@ -70,6 +73,10 @@ public class ColumnIndexDirectoryTestHelper {
       case INVERTED_INDEX:
         buf = columnDirectory.getInvertedIndexBufferFor(columnName);
         break;
+      case BLOOM_FILTER:
+        buf = columnDirectory.getBloomFilterBufferFor(columnName);
+        break;
+        
     }
     return buf;
   }
@@ -130,6 +137,14 @@ public class ColumnIndexDirectoryTestHelper {
             return invocationOnMock.getArguments()[0] + ".ii";
           }
         });
+    when(meta.getBloomFilterFileName(anyString()))
+    .thenAnswer(new Answer<String>() {
+      @Override
+      public String answer(InvocationOnMock invocationOnMock)
+          throws Throwable {
+        return invocationOnMock.getArguments()[0] + ".bloom";
+      }
+    });
     return meta;
   }
 }
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
index 22fc481..1e3fba1 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
@@ -21,6 +21,7 @@ import com.linkedin.pinot.common.utils.request.FilterQueryTree;
 import com.linkedin.pinot.common.utils.request.RequestUtils;
 import com.linkedin.pinot.core.query.pruner.ColumnValueSegmentPruner;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
 import com.linkedin.pinot.pql.parsers.Pql2Compiler;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,6 +36,7 @@ import org.testng.annotations.Test;
 public class ColumnValueSegmentPrunerTest {
   private static final Pql2Compiler COMPILER = new Pql2Compiler();
   private static final Map<String, ColumnMetadata> COLUMN_METADATA_MAP = new HashMap<>();
+  private static final Map<String, BloomFilterReader> BLOOM_FILTER_MAP = new HashMap<>();
 
   static {
     COLUMN_METADATA_MAP.put("time", new ColumnMetadata.Builder().setColumnName("time")
@@ -95,6 +97,6 @@ public class ColumnValueSegmentPrunerTest {
     BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(query);
     FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
     ColumnValueSegmentPruner pruner = new ColumnValueSegmentPruner();
-    return pruner.pruneSegment(filterQueryTree, COLUMN_METADATA_MAP);
+    return pruner.pruneSegment(filterQueryTree, COLUMN_METADATA_MAP, BLOOM_FILTER_MAP);
   }
 }
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTest.java
index 7c6eec5..01e7a0c 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -58,6 +58,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   private static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter");
   private static final List<String> DEFAULT_RAW_INDEX_COLUMNS =
       Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime");
+  private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin");
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
   protected final File _avroDir = new File(_tempDir, "avroDir");
@@ -152,6 +153,11 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   }
 
   @Nullable
+  protected List<String> getBloomFilterIndexColumns() {
+    return DEFAULT_BLOOM_FILTER_COLUMNS;
+  }
+
+  @Nullable
   protected List<String> getRawIndexColumns() {
     return DEFAULT_RAW_INDEX_COLUMNS;
   }
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
index e931fd0..7803874 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
@@ -279,15 +279,15 @@ public abstract class ClusterTest extends ControllerTest {
   }
 
   protected void addOfflineTable(String tableName, SegmentVersion segmentVersion) throws Exception {
-    addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null);
+    addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null);
   }
 
   protected void addOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
       String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
-      TableTaskConfig taskConfig) throws Exception {
+      List<String> bloomFilterColumns, TableTaskConfig taskConfig) throws Exception {
     TableConfig tableConfig =
         getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
-            invertedIndexColumns, taskConfig);
+            invertedIndexColumns, bloomFilterColumns, taskConfig);
 
     if (!isUsingNewConfigFormat()) {
       sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJSONConfigString());
@@ -298,10 +298,10 @@ public abstract class ClusterTest extends ControllerTest {
 
   protected void updateOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
       String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
-      TableTaskConfig taskConfig) throws Exception {
+      List<String> bloomFilterColumns, TableTaskConfig taskConfig) throws Exception {
     TableConfig tableConfig =
         getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
-            invertedIndexColumns, taskConfig);
+            invertedIndexColumns, bloomFilterColumns, taskConfig);
 
     if (!isUsingNewConfigFormat()) {
       sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJSONConfigString());
@@ -312,7 +312,7 @@ public abstract class ClusterTest extends ControllerTest {
 
   private static TableConfig getOfflineTableConfig(String tableName, String timeColumnName, String timeType,
       String brokerTenant, String serverTenant, String loadMode, SegmentVersion segmentVersion,
-      List<String> invertedIndexColumns, TableTaskConfig taskConfig) throws Exception {
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig) throws Exception {
     return new TableConfig.Builder(Helix.TableType.OFFLINE).setTableName(tableName)
         .setTimeColumnName(timeColumnName)
         .setTimeType(timeType)
@@ -322,6 +322,7 @@ public abstract class ClusterTest extends ControllerTest {
         .setLoadMode(loadMode)
         .setSegmentVersion(segmentVersion.toString())
         .setInvertedIndexColumns(invertedIndexColumns)
+        .setBloomFilterColumns(bloomFilterColumns)
         .setTaskConfig(taskConfig)
         .build();
   }
@@ -374,8 +375,8 @@ public abstract class ClusterTest extends ControllerTest {
   protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
       String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
       String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> noDictionaryColumns, TableTaskConfig taskConfig,
-      String streamConsumerFactoryName) throws Exception {
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
+      TableTaskConfig taskConfig, String streamConsumerFactoryName) throws Exception {
     Map<String, String> streamConfigs = new HashMap<>();
     String streamType = "kafka";
     streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
@@ -417,6 +418,7 @@ public abstract class ClusterTest extends ControllerTest {
         .setLoadMode(loadMode)
         .setSortedColumn(sortedColumn)
         .setInvertedIndexColumns(invertedIndexColumns)
+        .setBloomFilterColumns(bloomFilterColumns)
         .setNoDictionaryColumns(noDictionaryColumns)
         .setStreamConfigs(streamConfigs)
         .setTaskConfig(taskConfig)
@@ -437,13 +439,13 @@ public abstract class ClusterTest extends ControllerTest {
   protected void addHybridTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
       String kafkaTopic, int realtimeSegmentFlushSize, File avroFile, String timeColumnName, String timeType,
       String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> noDictionaryColumns, TableTaskConfig taskConfig,
-      String streamConsumerFactoryName) throws Exception {
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
+      TableTaskConfig taskConfig, String streamConsumerFactoryName) throws Exception {
     addOfflineTable(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, SegmentVersion.v1,
-        invertedIndexColumns, taskConfig);
+        invertedIndexColumns, bloomFilterColumns, taskConfig);
     addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushSize, avroFile,
         timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
-        noDictionaryColumns, taskConfig, streamConsumerFactoryName);
+        bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
   }
 
   protected void createBrokerTenant(String tenantName, int brokerCount) throws Exception {
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 5ad0fcf..3c4323f 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -89,7 +89,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
 
     // Create the table
     addOfflineTable(getTableName(), _schema.getTimeColumnName(), _schema.getOutgoingTimeUnit().toString(), null, null,
-        getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getTaskConfig());
+        getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig());
 
     // Generate and push Pinot segments from Hadoop
     generateAndPushSegmentsFromHadoop();
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java
index 023cae3..3c8fb8f 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -134,8 +134,8 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME,
-        TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getRawIndexColumns(),
-        getTaskConfig(), getStreamConsumerFactoryClassName());
+        TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
+        getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName());
 
     completeTableConfiguration();
   }
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index ec22006..8010a6c 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -289,7 +289,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       String timeType = outgoingTimeUnit.toString();
       addHybridTable(_tableName, _useLlc, KAFKA_BROKER, KAFKA_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(),
           _realtimeAvroFiles.get(0), timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, "MMAP",
-          _sortedColumn, _invertedIndexColumns, null, null, getStreamConsumerFactoryClassName());
+          _sortedColumn, _invertedIndexColumns, null, null, null, getStreamConsumerFactoryClassName());
 
       // Upload all segments
       uploadSegments(_tarDir);
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 383c95b..4e5febf 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -47,6 +47,7 @@ import org.testng.annotations.Test;
 public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
   private static final int NUM_BROKERS = 1;
   private static final int NUM_SERVERS = 1;
+  private static final int NUM_SEGMENTS = 12;
 
   // For inverted index triggering test
   private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
@@ -54,6 +55,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
 
+  private static final List<String> UPDATED_BLOOM_FLITER_COLUMNS = Arrays.asList("Carrier");
+  private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
+
   // For default columns test
   private static final String SCHEMA_WITH_EXTRA_COLUMNS =
       "On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema";
@@ -121,7 +125,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Create the table
     addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, getInvertedIndexColumns(),
-        getTaskConfig());
+        getBloomFilterIndexColumns(), getTaskConfig());
 
     completeTableConfiguration();
 
@@ -149,7 +153,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Update table config and trigger reload
     updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
-        UPDATED_INVERTED_INDEX_COLUMNS, getTaskConfig());
+        UPDATED_INVERTED_INDEX_COLUMNS, null, getTaskConfig());
 
     updateTableConfiguration();
 
@@ -170,6 +174,35 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     }, 600_000L, "Failed to generate inverted index");
   }
 
+  @Test
+  public void testBloomFilterTriggering() throws Exception {
+    final long numTotalDocs = getCountStarResult();
+    JSONObject queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+    Assert.assertEquals(queryResponse.getLong("numSegmentsProcessed"), NUM_SEGMENTS);
+
+    // Update table config and trigger reload
+    updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
+        null, UPDATED_BLOOM_FLITER_COLUMNS, getTaskConfig());
+
+    updateTableConfiguration();
+
+    sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+
+    TestUtils.waitForCondition(new Function<Void, Boolean>() {
+      @Override
+      public Boolean apply(@Nullable Void aVoid) {
+        try {
+          JSONObject queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+          // Total docs should not change during reload
+          Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+          return queryResponse.getLong("numSegmentsProcessed") == 0L;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }, 600_000L, "Failed to generate inverted index");
+  }
+
   /**
    * We will add extra new columns to the schema to test adding new columns with default value to the offline segments.
    * <p>New columns are: (name, field type, data type, single/multi value, default null value)
@@ -362,7 +395,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   public void testBrokerResponseMetadata() throws Exception {
     super.testBrokerResponseMetadata();
   }
-  
+
   @Test
   public void testUDF() throws Exception {
     String pqlQuery = "SELECT COUNT(*) FROM mytable GROUP BY timeConvert(DaysSinceEpoch,'DAYS','SECONDS')";
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 37a117e..d72d41e 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -95,8 +95,8 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
 
     addRealtimeTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, null, null,
-        getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getRawIndexColumns(), getTaskConfig(),
-        getStreamConsumerFactoryClassName());
+        getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(),
+        getTaskConfig(), getStreamConsumerFactoryClassName());
 
     completeTableConfiguration();
   }
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index a2cdb1a..e47c87b 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -79,9 +79,9 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
     taskTypeConfigsMap.put(TestTaskGenerator.TASK_TYPE, Collections.emptyMap());
     taskConfig.setTaskTypeConfigsMap(taskTypeConfigsMap);
-    addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, taskConfig);
-    addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, taskConfig);
-    addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null);
+    addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig);
+    addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig);
+    addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null);
 
     _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
diff --git a/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkRunner.java b/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkRunner.java
deleted file mode 100644
index 03cafee..0000000
--- a/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkRunner.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.perf;
-
-import com.linkedin.pinot.tools.perf.PerfBenchmarkDriver;
-import java.util.Arrays;
-import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>PerfBenchmarkRunner</code> class is a tool to start Pinot cluster with optional preloaded segments.
- */
-public class PerfBenchmarkRunner {
-  private PerfBenchmarkRunner() {
-  }
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(PerfBenchmarkRunner.class);
-
-  /**
-   * Start Pinot server with pre-loaded segments.
-   *
-   * @param dataDir data directory.
-   * @param tableNames list of table names to be loaded.
-   * @param invertedIndexColumns list of inverted index columns.
-   * @throws Exception
-   */
-  public static void startServerWithPreLoadedSegments(String dataDir, List<String> tableNames,
-      List<String> invertedIndexColumns)
-      throws Exception {
-    LOGGER.info("Starting server and uploading segments.");
-    PerfBenchmarkDriver driver = PerfBenchmarkDriver.startComponents(false, false, false, true, dataDir);
-    for (String tableName : tableNames) {
-      com.linkedin.pinot.tools.perf.PerfBenchmarkRunner.loadTable(driver, dataDir, tableName, invertedIndexColumns);
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    if (args.length > 0) {
-      if (args[0].equalsIgnoreCase("startAllButServer") || args[0].equalsIgnoreCase("startAll")) {
-        PerfBenchmarkDriver.startComponents(true, true, true, false, null);
-      }
-
-      if (args[0].equalsIgnoreCase("startServerWithPreLoadedSegments") || args[0].equalsIgnoreCase("startAll")) {
-        String tableNames = args[1];
-        String dataDir = args[2];
-        List<String> invertedIndexColumns = null;
-        if (args.length == 4) {
-          invertedIndexColumns = Arrays.asList(args[3].split(","));
-        }
-        startServerWithPreLoadedSegments(dataDir, Arrays.asList(tableNames.split(",")), invertedIndexColumns);
-      }
-    } else {
-      LOGGER.error("Expected one of [startAll|startAllButServer|StartServerWithPreLoadedSegments]");
-    }
-  }
-}
diff --git a/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkTest.java b/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkTest.java
deleted file mode 100644
index 2c503d4..0000000
--- a/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.perf;
-
-import com.google.common.collect.Lists;
-import com.linkedin.pinot.tools.perf.PerfBenchmarkDriver;
-import com.linkedin.pinot.tools.perf.PerfBenchmarkDriverConf;
-import com.linkedin.pinot.tools.perf.QueryRunner;
-import java.util.ArrayList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PerfBenchmarkTest {
-  private PerfBenchmarkTest() {
-  }
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(PerfBenchmarkTest.class);
-
-  private static void setupCluster(String dataDir, String offlineTableName) throws Exception {
-    LOGGER.info("Setting up cluster");
-    PerfBenchmarkDriver.startComponents(true, true, true, false, null);
-    PerfBenchmarkRunner.startServerWithPreLoadedSegments(dataDir, Lists.newArrayList(offlineTableName),
-        new ArrayList<String>());
-  }
-
-  private static void runQueries(String queryFile) throws Exception {
-    System.out.println("Running queries....");
-    PerfBenchmarkDriverConf conf = new PerfBenchmarkDriverConf();
-    conf.setStartBroker(false);
-    conf.setStartController(false);
-    conf.setStartServer(false);
-    conf.setStartZookeeper(false);
-    conf.setUploadIndexes(false);
-    conf.setRunQueries(true);
-    conf.setConfigureResources(false);
-
-    QueryRunner.singleThreadedQueryRunner(conf, queryFile, 1, 3000, 10);
-    LOGGER.info("Running queries completed.");
-  }
-
-  private static void execute(String dataDir, String offlineTableName, String queryFile) throws Exception {
-    setupCluster(dataDir, offlineTableName);
-    runQueries(queryFile);
-  }
-
-  public static void main(String[] args) {
-    if (args.length != 3) {
-      LOGGER.error("Incorrect number of arguments.");
-      LOGGER.info("PerfBenchmarkTest <DataDir> <offlineTableName> <PathToQueryFile>");
-      return;
-    }
-
-    try {
-      PerfBenchmarkTest.execute(args[0], args[1], args[2]);
-      LOGGER.info("Benchmark test completed");
-      System.exit(0);
-    } catch (Exception e) {
-      LOGGER.error(e.getMessage());
-      e.printStackTrace();
-      System.exit(1);
-    }
-  }
-}
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
index 999f022..2368d8a 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.tools.perf;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
@@ -259,10 +260,10 @@ public class PerfBenchmarkDriver {
 
   public void configureTable(String tableName)
       throws Exception {
-    configureTable(tableName, null);
+    configureTable(tableName, null, null);
   }
 
-  public void configureTable(String tableName, List<String> invertedIndexColumns)
+  public void configureTable(String tableName, List<String> invertedIndexColumns, List<String> bloomFilterColumns)
       throws Exception {
     TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName)
         .setSegmentAssignmentStrategy(_segmentAssignmentStrategy)
@@ -272,6 +273,7 @@ public class PerfBenchmarkDriver {
         .setLoadMode(_loadMode)
         .setSegmentVersion(_segmentFormatVersion)
         .setInvertedIndexColumns(invertedIndexColumns)
+        .setBloomFilterColumns(bloomFilterColumns)
         .build();
     _helixResourceManager.addTable(tableConfig);
   }
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
index ac124fe..17fec66 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
@@ -72,6 +72,10 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command
       usage = "Comma separated inverted index columns to be created (non-batch load).")
   private String _invertedIndexColumns;
 
+  @Option(name = "-bloomFilterColumns", required = false, metaVar = "<String>",
+      usage = "Comma separated bloom filter columns to be created (non-batch load).")
+  private String _bloomFilterColumns;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"},
       usage = "Print this message.")
   private boolean _help = false;
@@ -132,7 +136,7 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command
           @Override
           public void run() {
             try {
-              loadTable(driver, _dataDir, tableName, null);
+              loadTable(driver, _dataDir, tableName, null, null);
             } catch (Exception e) {
               LOGGER.error("Caught exception while loading table: {}", tableName, e);
             }
@@ -146,14 +150,18 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command
       if (_invertedIndexColumns != null) {
         invertedIndexColumns = Arrays.asList(_invertedIndexColumns.split(","));
       }
+      List<String> bloomFilterColumns = null;
+      if (_bloomFilterColumns != null) {
+        bloomFilterColumns = Arrays.asList(_bloomFilterColumns.split(","));
+      }
       for (String tableName : _tableNames.split(",")) {
-        loadTable(driver, _dataDir, tableName, invertedIndexColumns);
+        loadTable(driver, _dataDir, tableName, invertedIndexColumns, bloomFilterColumns);
       }
     }
   }
 
   public static void loadTable(PerfBenchmarkDriver driver, String dataDir, String tableName,
-      List<String> invertedIndexColumns)
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns)
       throws Exception {
     boolean tableConfigured = false;
     File[] segments = new File(dataDir, tableName).listFiles();
@@ -161,7 +169,7 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command
     for (File segment : segments) {
       SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segment);
       if (!tableConfigured) {
-        driver.configureTable(segmentMetadata.getTableName(), invertedIndexColumns);
+        driver.configureTable(segmentMetadata.getTableName(), invertedIndexColumns, bloomFilterColumns);
         tableConfigured = true;
       }
       driver.addSegment(segmentMetadata);
diff --git a/pom.xml b/pom.xml
index 81a6dbb..4d6075e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -874,6 +874,7 @@
           <configuration>
             <source>${jdk.version}</source>
             <target>${jdk.version}</target>
+            <compilerVersion>${jdk.version}</compilerVersion>
           </configuration>
         </plugin>
         <plugin>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org