You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/12/01 02:52:07 UTC
[incubator-pinot] branch master updated: Adding support for bloom
filter (#3528)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 75a4f85 Adding support for bloom filter (#3528)
75a4f85 is described below
commit 75a4f8541d26043d22a11fa8ef7e6c40a9a7589b
Author: Kishore Gopalakrishna <g....@gmail.com>
AuthorDate: Fri Nov 30 18:52:01 2018 -0800
Adding support for bloom filter (#3528)
* Adding support for bloom filter
* Fixing failing test cases
* Updated bloom filter implementation
1. Changed the implementation to use Guava BloomFilter library
2. Added unit test for bloom filter
3. Enforcing the maxmimum size of bloom filter to 1MB
* Adding bloom filter to integration tests
---
.../pinot/common/config/IndexingConfig.java | 12 ++
.../linkedin/pinot/common/config/TableConfig.java | 7 +
.../pinot/common/segment/SegmentMetadata.java | 2 +
.../com/linkedin/pinot/core/bloom/BloomFilter.java | 69 +++++++++
.../linkedin/pinot/core/bloom/BloomFilterType.java | 50 +++++++
.../linkedin/pinot/core/bloom/BloomFilterUtil.java | 54 +++++++
.../pinot/core/bloom/GuavaOnHeapBloomFilter.java | 74 ++++++++++
.../core/bloom/SegmentBloomFilterFactory.java | 48 +++++++
.../com/linkedin/pinot/core/common/DataSource.java | 3 +
.../indexsegment/mutable/MutableSegmentImpl.java | 5 +-
.../core/query/pruner/AbstractSegmentPruner.java | 17 ++-
.../query/pruner/ColumnValueSegmentPruner.java | 44 ++++--
.../core/query/pruner/PartitionSegmentPruner.java | 10 +-
.../core/segment/creator/impl/V1Constants.java | 1 +
.../creator/impl/bloom/BloomFilterCreator.java | 66 +++++++++
.../core/segment/index/SegmentMetadataImpl.java | 5 +
.../segment/index/column/ColumnIndexContainer.java | 3 +
.../index/column/PhysicalColumnIndexContainer.java | 21 +++
.../index/data/source/ColumnDataSource.java | 17 ++-
.../segment/index/loader/IndexLoadingConfig.java | 17 +++
.../segment/index/loader/SegmentPreProcessor.java | 7 +-
.../loader/bloomfilter/BloomFilterHandler.java | 160 +++++++++++++++++++++
.../segment/index/readers/BloomFilterReader.java | 54 +++++++
.../core/segment/store/ColumnIndexDirectory.java | 18 ++-
.../pinot/core/segment/store/ColumnIndexType.java | 3 +-
.../core/segment/store/FilePerIndexDirectory.java | 17 +++
.../segment/store/SegmentLocalFSDirectory.java | 5 +
.../segment/store/SingleFileIndexDirectory.java | 12 ++
.../virtualcolumn/VirtualColumnIndexContainer.java | 6 +
.../v2/store/StarTreeDimensionDataSource.java | 6 +
.../v2/store/StarTreeMetricDataSource.java | 6 +
.../core/common/RealtimeNoDictionaryTest.java | 8 +-
.../index/creator/BloomFilterCreatorTest.java | 150 +++++++++++++++++++
.../store/ColumnIndexDirectoryTestHelper.java | 15 ++
.../query/pruner/ColumnValueSegmentPrunerTest.java | 4 +-
.../tests/BaseClusterIntegrationTest.java | 6 +
.../pinot/integration/tests/ClusterTest.java | 26 ++--
...mentBuildPushOfflineClusterIntegrationTest.java | 2 +-
.../tests/HybridClusterIntegrationTest.java | 4 +-
...ridClusterIntegrationTestCommandLineRunner.java | 2 +-
.../tests/OfflineClusterIntegrationTest.java | 39 ++++-
.../tests/RealtimeClusterIntegrationTest.java | 4 +-
.../tests/SimpleMinionClusterIntegrationTest.java | 6 +-
.../linkedin/pinot/perf/PerfBenchmarkRunner.java | 71 ---------
.../com/linkedin/pinot/perf/PerfBenchmarkTest.java | 77 ----------
.../pinot/tools/perf/PerfBenchmarkDriver.java | 6 +-
.../pinot/tools/perf/PerfBenchmarkRunner.java | 16 ++-
pom.xml | 1 +
48 files changed, 1044 insertions(+), 212 deletions(-)
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
index b57825c..35de1e3 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java
@@ -43,6 +43,9 @@ public class IndexingConfig {
@ConfigKey("sortedColumn")
private List<String> _sortedColumn = new ArrayList<>();
+ @ConfigKey("bloomFilterColumns")
+ private List<String> _bloomFilterColumns = new ArrayList<>();
+
@ConfigKey("loadMode")
private String _loadMode;
@@ -113,6 +116,15 @@ public class IndexingConfig {
_sortedColumn = sortedColumn;
}
+
+ public List<String> getBloomFilterColumns() {
+ return _bloomFilterColumns;
+ }
+
+ public void setBloomFilterColumns(List<String> _bloomFilterColumns) {
+ this._bloomFilterColumns = _bloomFilterColumns;
+ }
+
public String getLoadMode() {
return _loadMode;
}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
index 960c7f0..373f369 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
@@ -394,6 +394,7 @@ public class TableConfig {
private List<String> _invertedIndexColumns;
private List<String> _noDictionaryColumns;
private List<String> _onHeapDictionaryColumns;
+ private List<String> _bloomFilterColumns;
private Map<String, String> _streamConfigs;
private String _streamPartitionAssignmentStrategy = DEFAULT_STREAM_PARTITION_ASSIGNMENT_STRATEGY;
@@ -508,6 +509,11 @@ public class TableConfig {
return this;
}
+ public Builder setBloomFilterColumns(List<String> bloomFilterColumns) {
+ _bloomFilterColumns = bloomFilterColumns;
+ return this;
+ }
+
public Builder setNoDictionaryColumns(List<String> noDictionaryColumns) {
_noDictionaryColumns = noDictionaryColumns;
return this;
@@ -583,6 +589,7 @@ public class TableConfig {
indexingConfig.setNoDictionaryColumns(_noDictionaryColumns);
indexingConfig.setOnHeapDictionaryColumns(_onHeapDictionaryColumns);
indexingConfig.setStreamConfigs(_streamConfigs);
+ indexingConfig.setBloomFilterColumns(_bloomFilterColumns);
StreamConsumptionConfig streamConsumptionConfig = new StreamConsumptionConfig();
streamConsumptionConfig.setStreamPartitionAssignmentStrategy(_streamPartitionAssignmentStrategy);
indexingConfig.setStreamConsumptionConfig(streamConsumptionConfig);
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java b/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
index 3a2d1bb..b1e4962 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java
@@ -82,6 +82,8 @@ public interface SegmentMetadata {
String getBitmapInvertedIndexFileName(String column);
+ String getBloomFilterFileName(String column);
+
String getCreatorName();
char getPaddingCharacter();
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilter.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilter.java
new file mode 100644
index 0000000..9c9e62c
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilter.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+
+/**
+ * Interface for bloom filter
+ */
+public interface BloomFilter {
+
+ /**
+ * Get the version of bloom filter implementation
+ * @return a version
+ */
+ int getVersion();
+
+ /**
+ * Get the type of the bloom filter
+ * @return a bloom filter type
+ */
+ BloomFilterType getBloomFilterType();
+
+ /**
+ * Add element to bloom filter
+ *
+ * @param input input object
+ */
+ void add(Object input);
+
+ /**
+ * Check if the input element may exist or not
+ *
+ * @param input input object for testing
+ * @return true if the input may exist, false if it does not exist
+ */
+ boolean mightContain(Object input);
+
+ /**
+ * Serialize bloom filter to output stream.
+ *
+ * @param out output stream
+ * @throws IOException
+ */
+ void writeTo(OutputStream out) throws IOException;
+
+ /**
+ * Deserialize the bloom filter from input stream.
+ * @param in input stream
+ * @throws IOException
+ */
+ void readFrom(InputStream in) throws IOException;
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterType.java
new file mode 100644
index 0000000..75c9b1e
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterType.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Enum for bloom filter type
+ */
+public enum BloomFilterType {
+ // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
+ // when serializing/deserializing a bloom filter
+ GUAVA_ON_HEAP(1);
+
+ private int _value;
+ private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
+
+ BloomFilterType(int value) {
+ _value = value;
+ }
+
+ static {
+ for (BloomFilterType pageType : BloomFilterType.values()) {
+ _bloomFilterTypeMap.put(pageType._value, pageType);
+ }
+ }
+
+ public static BloomFilterType valueOf(int pageType) {
+ return _bloomFilterTypeMap.get(pageType);
+ }
+
+ public int getValue() {
+ return _value;
+ }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterUtil.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterUtil.java
new file mode 100644
index 0000000..7e240b0
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/BloomFilterUtil.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+/**
+ * Util class for bloom filter
+ */
+public class BloomFilterUtil {
+
+ private BloomFilterUtil() {
+ }
+
+ public static long computeNumBits(long cardinality, double maxFalsePosProbability) {
+ return (long) (Math.ceil(
+ (cardinality * Math.log(maxFalsePosProbability)) / Math.log(1.0 / Math.pow(2.0, Math.log(2.0)))));
+ }
+
+ public static int computeNumberOfHashFunctions(long cardinality, long numBits) {
+ return (int) Math.max(1.0, Math.round(((double) numBits / cardinality) * Math.log(2.0)));
+ }
+
+ public static double computeMaxFalsePosProbability(long cardinality, int numHashFunction, long numBits) {
+ return Math.pow(1.0 - Math.exp(-1.0 * numHashFunction / ((double) numBits / cardinality)), numHashFunction);
+ }
+
+ public static double computeMaxFalsePositiveProbabilityForNumBits(long cardinality, long maxNumBits,
+ double defaultMaxFalsePosProbability) {
+ // Get the number of bits required for achieving default false positive probability
+ long numBitsRequired = BloomFilterUtil.computeNumBits(cardinality, defaultMaxFalsePosProbability);
+
+ // If the size of bloom filter is smaller than 1MB, use default max false positive probability
+ if (numBitsRequired <= maxNumBits) {
+ return defaultMaxFalsePosProbability;
+ }
+
+ // If the size of bloom filter is larger than 1MB, compute the maximum false positive probability within
+ // storage limit
+ int numHashFunction = BloomFilterUtil.computeNumberOfHashFunctions(cardinality, maxNumBits);
+ return BloomFilterUtil.computeMaxFalsePosProbability(cardinality, numHashFunction, maxNumBits);
+ }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/GuavaOnHeapBloomFilter.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/GuavaOnHeapBloomFilter.java
new file mode 100644
index 0000000..881803d
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/GuavaOnHeapBloomFilter.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import com.google.common.hash.Funnels;
+
+
+/**
+ * Bloom filter implementation with guava library
+ */
+public class GuavaOnHeapBloomFilter implements BloomFilter {
+ // Increment the version when the bloom filter implementation becomes backward incompatible
+ private static final int VERSION = 1;
+
+ private com.google.common.hash.BloomFilter _bloomFilter;
+
+ public GuavaOnHeapBloomFilter() {
+ }
+
+ public GuavaOnHeapBloomFilter(int cardinality, double maxFalsePosProbability) {
+ _bloomFilter =
+ com.google.common.hash.BloomFilter.create(Funnels.stringFunnel(Charset.forName("UTF-8")), cardinality,
+ maxFalsePosProbability);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public BloomFilterType getBloomFilterType() {
+ return BloomFilterType.GUAVA_ON_HEAP;
+ }
+
+ @Override
+ public void add(Object input) {
+ _bloomFilter.put(input.toString());
+ }
+
+ @Override
+ public boolean mightContain(Object input) {
+ return _bloomFilter.mightContain(input.toString());
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ _bloomFilter.writeTo(out);
+ }
+
+ @Override
+ public void readFrom(InputStream in) throws IOException {
+ _bloomFilter =
+ com.google.common.hash.BloomFilter.readFrom(in, Funnels.stringFunnel(Charset.forName("UTF-8")));
+ }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/SegmentBloomFilterFactory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/SegmentBloomFilterFactory.java
new file mode 100644
index 0000000..c8c580c
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/bloom/SegmentBloomFilterFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.bloom;
+
+/**
+ * Factory for bloom filter
+ */
+public class SegmentBloomFilterFactory {
+
+ /**
+ * Factory used when creating a new bloom filter
+ *
+ * @param cardinality cardinality of column
+ * @param maxFalsePosProbability maximum false positive probability
+ * @return a bloom filter
+ */
+ public static BloomFilter createSegmentBloomFilter(int cardinality, double maxFalsePosProbability) {
+ // TODO: when we add more types of bloom filter, we will need to add a new config and wire in here
+ return new GuavaOnHeapBloomFilter(cardinality, maxFalsePosProbability);
+ }
+
+ /**
+ * Factory used when deserializing a bloom filter
+ *
+ * @param type a bloom filter type
+ * @return a bloom filter based on the given type
+ */
+ public static BloomFilter createSegmentBloomFilter(BloomFilterType type) {
+ switch (type) {
+ case GUAVA_ON_HEAP:
+ return new GuavaOnHeapBloomFilter();
+ }
+ throw new RuntimeException("Invalid bloom filter type: " + type.toString());
+ }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java
index b44d721..6397024 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java
@@ -16,6 +16,7 @@
package com.linkedin.pinot.core.common;
import com.linkedin.pinot.core.operator.BaseOperator;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
import com.linkedin.pinot.core.segment.index.readers.Dictionary;
import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
@@ -26,4 +27,6 @@ public abstract class DataSource extends BaseOperator {
public abstract InvertedIndexReader getInvertedIndex();
public abstract Dictionary getDictionary();
+
+ public abstract BloomFilterReader getBloomFilter();
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index a32068a..9bd9de5 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -35,6 +35,7 @@ import com.linkedin.pinot.core.realtime.impl.invertedindex.RealtimeInvertedIndex
import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
import com.linkedin.pinot.core.segment.index.data.source.ColumnDataSource;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnContext;
import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProvider;
import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
@@ -79,6 +80,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final Map<String, DataFileReader> _indexReaderWriterMap = new HashMap<>();
private final Map<String, Integer> _maxNumValuesMap = new HashMap<>();
private final Map<String, RealtimeInvertedIndexReader> _invertedIndexMap = new HashMap<>();
+ private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
private final IdMap<FixedIntArray> _recordIdMap;
private boolean _aggregateMetrics;
@@ -386,7 +388,8 @@ public class MutableSegmentImpl implements MutableSegment {
public ColumnDataSource getDataSource(String columnName) {
if (!_schema.isVirtualColumn(columnName)) {
return new ColumnDataSource(_schema.getFieldSpecFor(columnName), _numDocsIndexed, _maxNumValuesMap.get(columnName),
- _indexReaderWriterMap.get(columnName), _invertedIndexMap.get(columnName), _dictionaryMap.get(columnName));
+ _indexReaderWriterMap.get(columnName), _invertedIndexMap.get(columnName), _dictionaryMap.get(columnName),
+ _bloomFilterMap.get(columnName));
} else {
return getVirtualDataSource(columnName);
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
index 258fae9..7d61461 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java
@@ -15,13 +15,15 @@
*/
package com.linkedin.pinot.core.query.pruner;
-import java.util.List;
-import java.util.Map;
import com.linkedin.pinot.common.data.FieldSpec;
import com.linkedin.pinot.common.request.FilterOperator;
import com.linkedin.pinot.common.utils.request.FilterQueryTree;
import com.linkedin.pinot.core.query.exception.BadQueryRequestException;
import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+
+import java.util.List;
+import java.util.Map;
import javax.annotation.Nonnull;
@@ -32,7 +34,8 @@ import javax.annotation.Nonnull;
*/
public abstract class AbstractSegmentPruner implements SegmentPruner {
- public abstract boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap);
+ public abstract boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap,
+ Map<String, BloomFilterReader> bloomFilterMap);
/**
* Given a non leaf filter query tree node prunes it as follows:
@@ -46,8 +49,8 @@ public abstract class AbstractSegmentPruner implements SegmentPruner {
*
* @return True to prune, false otherwise
*/
- protected boolean pruneNonLeaf(@Nonnull FilterQueryTree filterQueryTree,
- @Nonnull Map<String, ColumnMetadata> columnMetadataMap) {
+ protected boolean pruneNonLeaf(@Nonnull FilterQueryTree filterQueryTree, @Nonnull Map<String, ColumnMetadata> columnMetadataMap,
+ Map<String, BloomFilterReader> bloomFilterMap) {
List<FilterQueryTree> children = filterQueryTree.getChildren();
if (children.isEmpty()) {
@@ -58,7 +61,7 @@ public abstract class AbstractSegmentPruner implements SegmentPruner {
switch (filterOperator) {
case AND:
for (FilterQueryTree child : children) {
- if (pruneSegment(child, columnMetadataMap)) {
+ if (pruneSegment(child, columnMetadataMap, bloomFilterMap)) {
return true;
}
}
@@ -66,7 +69,7 @@ public abstract class AbstractSegmentPruner implements SegmentPruner {
case OR:
for (FilterQueryTree child : children) {
- if (!pruneSegment(child, columnMetadataMap)) {
+ if (!pruneSegment(child, columnMetadataMap, bloomFilterMap)) {
return false;
}
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index a2953ce..e94af5c 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -23,8 +23,13 @@ import com.linkedin.pinot.core.indexsegment.IndexSegment;
import com.linkedin.pinot.core.query.request.ServerQueryRequest;
import com.linkedin.pinot.core.segment.index.ColumnMetadata;
import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
import javax.annotation.Nonnull;
import org.apache.commons.configuration.Configuration;
@@ -48,7 +53,17 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
// For realtime segment, this map can be null.
Map<String, ColumnMetadata> columnMetadataMap =
((SegmentMetadataImpl) segment.getSegmentMetadata()).getColumnMetadataMap();
- return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap);
+
+ Map<String, BloomFilterReader> bloomFilterMap = new HashMap<>();
+ if (columnMetadataMap != null) {
+ for (String column : columnMetadataMap.keySet()) {
+ BloomFilterReader bloomFilterReader = segment.getDataSource(column).getBloomFilter();
+ if (bloomFilterReader != null) {
+ bloomFilterMap.put(column, bloomFilterReader);
+ }
+ }
+ }
+ return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap, bloomFilterMap);
}
@Override
@@ -74,7 +89,7 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
@SuppressWarnings("unchecked")
@Override
public boolean pruneSegment(@Nonnull FilterQueryTree filterQueryTree,
- @Nonnull Map<String, ColumnMetadata> columnMetadataMap) {
+ @Nonnull Map<String, ColumnMetadata> columnMetadataMap, Map<String, BloomFilterReader> bloomFilterMap) {
FilterOperator filterOperator = filterQueryTree.getOperator();
List<FilterQueryTree> children = filterQueryTree.getChildren();
@@ -86,7 +101,8 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
return false;
}
- ColumnMetadata columnMetadata = columnMetadataMap.get(filterQueryTree.getColumn());
+ String column = filterQueryTree.getColumn();
+ ColumnMetadata columnMetadata = columnMetadataMap.get(column);
if (columnMetadata == null) {
// Should not reach here after DataSchemaSegmentPruner
return true;
@@ -97,16 +113,22 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
if (filterOperator == FilterOperator.EQUALITY) {
// EQUALITY
+ boolean pruneSegment = false;
+ FieldSpec.DataType dataType = columnMetadata.getDataType();
+ Comparable value = getValue(filterQueryTree.getValue().get(0), dataType);
- // Doesn't have min/max value set in metadata
- if ((minValue == null) || (maxValue == null)) {
- return false;
+ // Check if the value is in the min/max range
+ if (minValue != null && maxValue != null) {
+ pruneSegment = (value.compareTo(minValue) < 0) || (value.compareTo(maxValue) > 0);
}
- // Check if the value is in the min/max range
- FieldSpec.DataType dataType = columnMetadata.getDataType();
- Comparable value = getValue(filterQueryTree.getValue().get(0), dataType);
- return (value.compareTo(minValue) < 0) || (value.compareTo(maxValue) > 0);
+ // If the bloom filter is available for the column, check if the value may exist
+ if (!pruneSegment && bloomFilterMap.containsKey(column)) {
+ BloomFilterReader bloomFilterReader = bloomFilterMap.get(column);
+ pruneSegment = !bloomFilterReader.mightContain(value);
+ }
+
+ return pruneSegment;
} else {
// RANGE
@@ -170,7 +192,7 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner {
}
} else {
// Parent node
- return pruneNonLeaf(filterQueryTree, columnMetadataMap);
+ return pruneNonLeaf(filterQueryTree, columnMetadataMap, bloomFilterMap);
}
}
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
index f8e1cd9..5f35b7f 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java
@@ -22,6 +22,9 @@ import com.linkedin.pinot.core.indexsegment.IndexSegment;
import com.linkedin.pinot.core.query.request.ServerQueryRequest;
import com.linkedin.pinot.core.segment.index.ColumnMetadata;
import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
@@ -59,7 +62,7 @@ public class PartitionSegmentPruner extends AbstractSegmentPruner {
Map<String, ColumnMetadata> columnMetadataMap =
((SegmentMetadataImpl) segment.getSegmentMetadata()).getColumnMetadataMap();
- return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap);
+ return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap, Collections.emptyMap());
}
/**
@@ -74,12 +77,13 @@ public class PartitionSegmentPruner extends AbstractSegmentPruner {
* @return True if segment can be pruned, false otherwise
*/
@Override
- public boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap) {
+ public boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap,
+ Map<String, BloomFilterReader> bloomFilterMap) {
List<FilterQueryTree> children = filterQueryTree.getChildren();
// Non-leaf node
if (children != null && !children.isEmpty()) {
- return pruneNonLeaf(filterQueryTree, columnMetadataMap);
+ return pruneNonLeaf(filterQueryTree, columnMetadataMap, bloomFilterMap);
}
// TODO: Enhance partition based pruning for RANGE operator.
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
index 772d5a3..c3facda 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java
@@ -57,6 +57,7 @@ public class V1Constants {
public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
+ public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
}
public static class MetadataKeys {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
new file mode 100644
index 0000000..c7d1293
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.segment.creator.impl.bloom;
+
+import com.linkedin.pinot.core.bloom.BloomFilterUtil;
+import com.linkedin.pinot.core.bloom.BloomFilter;
+import com.linkedin.pinot.core.bloom.SegmentBloomFilterFactory;
+import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+
+/**
+ * Bloom filter creator
+ *
+ * Note:
+ * 1. Currently, we limit the filter size to 1MB to avoid the heap overhead. We can remove it once we have the offheap
+ * implementation of the bloom filter.
+ * 2. When capping the bloom filter to 1MB, max false pos steeply grows from 1 million cardinality. If the column has
+ * larger than "5 million" cardinality, it is not recommended to use bloom filter since maxFalsePosProb is already
+ * 0.45 when the filter size is 1MB.
+ */
+public class BloomFilterCreator implements AutoCloseable {
+ private static double DEFAULT_MAX_FALSE_POS_PROBABILITY = 0.05;
+ private static int MB_IN_BITS = 8388608;
+
+ private BloomFilter _bloomFilter;
+ private File _bloomFilterFile;
+
+ public BloomFilterCreator(File indexDir, String columnName, int cardinality) {
+ _bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+ double maxFalsePosProbability =
+ BloomFilterUtil.computeMaxFalsePositiveProbabilityForNumBits(cardinality, MB_IN_BITS,
+ DEFAULT_MAX_FALSE_POS_PROBABILITY);
+ _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(cardinality, maxFalsePosProbability);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try (DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(_bloomFilterFile))) {
+ outputStream.writeInt(_bloomFilter.getBloomFilterType().getValue());
+ outputStream.writeInt(_bloomFilter.getVersion());
+ _bloomFilter.writeTo(outputStream);
+ }
+ }
+
+ public void add(Object input) {
+ _bloomFilter.add(input.toString());
+ }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
index 4b1c8af..5d8f164 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java
@@ -555,6 +555,11 @@ public class SegmentMetadataImpl implements SegmentMetadata {
return column + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
}
+ @Override
+ public String getBloomFilterFileName(String column) {
+ return column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION;
+ }
+
@Nullable
@Override
public String getCreatorName() {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
index 8850ed2..6e4e515 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java
@@ -16,6 +16,7 @@
package com.linkedin.pinot.core.segment.index.column;
import com.linkedin.pinot.core.io.reader.DataFileReader;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
import com.linkedin.pinot.core.segment.index.readers.Dictionary;
import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
@@ -39,4 +40,6 @@ public interface ColumnIndexContainer {
* Returns the dictionary for the column, or {@code null} if it does not exist.
*/
Dictionary getDictionary();
+
+ BloomFilterReader getBloomFilter();
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index fb78751..29ef748 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -27,6 +27,7 @@ import com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import com.linkedin.pinot.core.segment.index.ColumnMetadata;
import com.linkedin.pinot.core.segment.index.loader.IndexLoadingConfig;
import com.linkedin.pinot.core.segment.index.readers.BitmapInvertedIndexReader;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
import com.linkedin.pinot.core.segment.index.readers.BytesDictionary;
import com.linkedin.pinot.core.segment.index.readers.DoubleDictionary;
import com.linkedin.pinot.core.segment.index.readers.FloatDictionary;
@@ -44,6 +45,7 @@ import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
import com.linkedin.pinot.core.segment.store.ColumnIndexType;
import com.linkedin.pinot.core.segment.store.SegmentDirectory;
import java.io.IOException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,18 +56,29 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
private final DataFileReader _forwardIndex;
private final InvertedIndexReader _invertedIndex;
private final ImmutableDictionaryReader _dictionary;
+ private final BloomFilterReader _bloomFilterReader;
public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata,
IndexLoadingConfig indexLoadingConfig) throws IOException {
String columnName = metadata.getColumnName();
boolean loadInvertedIndex = false;
boolean loadOnHeapDictionary = false;
+ boolean loadBloomFilter = false;
if (indexLoadingConfig != null) {
loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
+ loadBloomFilter = indexLoadingConfig.getBloomFilterColumns().contains(columnName);
}
PinotDataBuffer fwdIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX);
+
if (metadata.hasDictionary()) {
+ //bloom filter
+ if (loadBloomFilter) {
+ PinotDataBuffer bloomFilterBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER);
+ _bloomFilterReader = new BloomFilterReader(bloomFilterBuffer);
+ } else {
+ _bloomFilterReader = null;
+ }
// Dictionary-based index
_dictionary = loadDictionary(segmentReader.getIndexFor(columnName, ColumnIndexType.DICTIONARY), metadata,
loadOnHeapDictionary);
@@ -100,6 +113,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
_forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
_invertedIndex = null;
_dictionary = null;
+ _bloomFilterReader = null;
}
}
@@ -118,6 +132,12 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
return _dictionary;
}
+ @Override
+ public BloomFilterReader getBloomFilter() {
+ return _bloomFilterReader;
+ }
+
+
private static ImmutableDictionaryReader loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata,
boolean loadOnHeap) {
FieldSpec.DataType dataType = metadata.getDataType();
@@ -175,4 +195,5 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
throw new IllegalStateException("Illegal data type for raw forward index: " + dataType);
}
}
+
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
index 16c79f4..6f89e71 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java
@@ -30,6 +30,7 @@ import com.linkedin.pinot.core.operator.blocks.SingleValueBlock;
import com.linkedin.pinot.core.realtime.impl.dictionary.MutableDictionary;
import com.linkedin.pinot.core.segment.index.ColumnMetadata;
import com.linkedin.pinot.core.segment.index.column.ColumnIndexContainer;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
import com.linkedin.pinot.core.segment.index.readers.Dictionary;
import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
@@ -44,6 +45,7 @@ public final class ColumnDataSource extends DataSource {
private final DataFileReader _forwardIndex;
private final InvertedIndexReader _invertedIndex;
private final Dictionary _dictionary;
+ private final BloomFilterReader _bloomFilter;
private final int _cardinality;
private final DataSourceMetadata _metadata;
@@ -53,21 +55,22 @@ public final class ColumnDataSource extends DataSource {
public ColumnDataSource(ColumnIndexContainer indexContainer, ColumnMetadata metadata) {
this(metadata.getColumnName(), metadata.getDataType(), metadata.isSingleValue(), metadata.isSorted(),
metadata.getTotalDocs(), metadata.getMaxNumberOfMultiValues(), indexContainer.getForwardIndex(),
- indexContainer.getInvertedIndex(), indexContainer.getDictionary(), metadata.getCardinality());
+ indexContainer.getInvertedIndex(), indexContainer.getDictionary(), indexContainer.getBloomFilter(),
+ metadata.getCardinality());
}
/**
* For REALTIME segment.
*/
public ColumnDataSource(FieldSpec fieldSpec, int numDocs, int maxNumMultiValues, DataFileReader forwardIndex,
- InvertedIndexReader invertedIndex, MutableDictionary dictionary) {
+ InvertedIndexReader invertedIndex, MutableDictionary dictionary, BloomFilterReader bloomFilter) {
this(fieldSpec.getName(), fieldSpec.getDataType(), fieldSpec.isSingleValueField(), false, numDocs,
- maxNumMultiValues, forwardIndex, invertedIndex, dictionary, Constants.UNKNOWN_CARDINALITY);
+ maxNumMultiValues, forwardIndex, invertedIndex, dictionary, bloomFilter, Constants.UNKNOWN_CARDINALITY);
}
private ColumnDataSource(String columnName, FieldSpec.DataType dataType, boolean isSingleValue, boolean isSorted,
int numDocs, int maxNumMultiValues, DataFileReader forwardIndex, InvertedIndexReader invertedIndex,
- Dictionary dictionary, int cardinality) {
+ Dictionary dictionary, BloomFilterReader bloomFilterReader, int cardinality) {
// Sanity check
if (isSingleValue) {
Preconditions.checkState(forwardIndex instanceof SingleColumnSingleValueReader);
@@ -93,6 +96,7 @@ public final class ColumnDataSource extends DataSource {
_forwardIndex = forwardIndex;
_invertedIndex = invertedIndex;
_dictionary = dictionary;
+ _bloomFilter = bloomFilterReader;
_cardinality = cardinality;
_metadata = new DataSourceMetadata() {
@@ -154,6 +158,11 @@ public final class ColumnDataSource extends DataSource {
}
@Override
+ public BloomFilterReader getBloomFilter() {
+ return _bloomFilter;
+ }
+
+ @Override
protected Block getNextBlock() {
if (_isSingleValue) {
return new SingleValueBlock((SingleColumnSingleValueReader) _forwardIndex, _numDocs, _dataType, _dictionary);
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 6381014..7c54c90 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -44,6 +44,8 @@ public class IndexLoadingConfig {
private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
private Map<String, String> _noDictionaryConfig = new HashMap<>();
private Set<String> _onHeapDictionaryColumns = new HashSet<>();
+ private Set<String> _bloomFilterColumns = new HashSet<>();
+
private SegmentVersion _segmentVersion;
// This value will remain true only when the empty constructor is invoked.
private boolean _enableDefaultColumns = true;
@@ -76,6 +78,11 @@ public class IndexLoadingConfig {
_invertedIndexColumns.addAll(invertedIndexColumns);
}
+ List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
+ if (bloomFilterColumns != null) {
+ _bloomFilterColumns.addAll(bloomFilterColumns);
+ }
+
List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
if (noDictionaryColumns != null) {
_noDictionaryColumns.addAll(noDictionaryColumns);
@@ -164,6 +171,12 @@ public class IndexLoadingConfig {
}
@VisibleForTesting
+ public void setBloomFilterColumns(@Nonnull Set<String> bloomFilterColumns) {
+ _bloomFilterColumns = bloomFilterColumns;
+ }
+
+
+ @VisibleForTesting
public void setOnHeapDictionaryColumns(@Nonnull Set<String> onHeapDictionaryColumns) {
_onHeapDictionaryColumns = onHeapDictionaryColumns;
}
@@ -183,6 +196,10 @@ public class IndexLoadingConfig {
return _onHeapDictionaryColumns;
}
+ public Set<String> getBloomFilterColumns() {
+ return _bloomFilterColumns;
+ }
+
@Nullable
public SegmentVersion getSegmentVersion() {
return _segmentVersion;
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
index 6cf064d..7ee16ba 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -19,6 +19,7 @@ import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.segment.ReadMode;
import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.loader.bloomfilter.BloomFilterHandler;
import com.linkedin.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGenerator;
import com.linkedin.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
import com.linkedin.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
@@ -63,7 +64,6 @@ public class SegmentPreProcessor implements AutoCloseable {
if (_segmentMetadata.getTotalDocs() == 0) {
return;
}
-
// Remove all the existing inverted index temp files before loading segments.
// NOTE: This step fixes the issue of temporary files not getting deleted after creating new inverted indexes.
// In this, we look for all files in the directory and remove the ones with '.bitmap.inv.tmp' extension.
@@ -91,6 +91,11 @@ public class SegmentPreProcessor implements AutoCloseable {
new InvertedIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
invertedIndexHandler.createInvertedIndices();
+ // Create bloom filter if required
+ BloomFilterHandler bloomFilterHandler =
+ new BloomFilterHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
+ bloomFilterHandler.createBloomFilters();
+
// Add min/max value to column metadata according to the prune mode.
// For star-tree index, because it can only increase the range, so min/max value can still be used in pruner.
ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode =
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
new file mode 100644
index 0000000..2f6991e
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.segment.index.loader.bloomfilter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.linkedin.pinot.common.data.FieldSpec.DataType;
+import com.linkedin.pinot.core.indexsegment.generator.SegmentVersion;
+import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
+import com.linkedin.pinot.core.segment.creator.impl.bloom.BloomFilterCreator;
+import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.core.segment.index.loader.IndexLoadingConfig;
+import com.linkedin.pinot.core.segment.index.loader.LoaderUtils;
+import com.linkedin.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import com.linkedin.pinot.core.segment.index.readers.DoubleDictionary;
+import com.linkedin.pinot.core.segment.index.readers.FloatDictionary;
+import com.linkedin.pinot.core.segment.index.readers.ImmutableDictionaryReader;
+import com.linkedin.pinot.core.segment.index.readers.IntDictionary;
+import com.linkedin.pinot.core.segment.index.readers.LongDictionary;
+import com.linkedin.pinot.core.segment.index.readers.StringDictionary;
+import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
+import com.linkedin.pinot.core.segment.store.ColumnIndexType;
+import com.linkedin.pinot.core.segment.store.SegmentDirectory;
+
+
+public class BloomFilterHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InvertedIndexHandler.class);
+
+ private final File _indexDir;
+ private final SegmentDirectory.Writer _segmentWriter;
+ private final String _segmentName;
+ private final SegmentVersion _segmentVersion;
+ private final Set<ColumnMetadata> _bloomFilterColumns = new HashSet<>();
+
+ public BloomFilterHandler(@Nonnull File indexDir, @Nonnull SegmentMetadataImpl segmentMetadata,
+ @Nonnull IndexLoadingConfig indexLoadingConfig, @Nonnull SegmentDirectory.Writer segmentWriter) {
+ _indexDir = indexDir;
+ _segmentWriter = segmentWriter;
+ _segmentName = segmentMetadata.getName();
+ _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+ for (String column : indexLoadingConfig.getBloomFilterColumns()) {
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+ if (columnMetadata != null) {
+ _bloomFilterColumns.add(columnMetadata);
+ }
+ }
+ }
+
+ public void createBloomFilters() throws Exception {
+ for (ColumnMetadata columnMetadata : _bloomFilterColumns) {
+ if (columnMetadata.hasDictionary()) {
+ createBloomFilterForColumn(columnMetadata);
+ }
+ }
+ }
+
+ private void createBloomFilterForColumn(ColumnMetadata columnMetadata) throws Exception {
+ String columnName = columnMetadata.getColumnName();
+
+ File bloomFilterFileInProgress = new File(_indexDir, columnName + ".bloom.inprogress");
+ File bloomFilterFile = new File(_indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+
+ if (!bloomFilterFileInProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+ if (_segmentWriter.hasIndexFor(columnName, ColumnIndexType.BLOOM_FILTER)) {
+ // Skip creating bloom filter index if already exists.
+ LOGGER.info("Found bloom filter for segment: {}, column: {}", _segmentName, columnName);
+ return;
+ }
+ // Create a marker file.
+ FileUtils.touch(bloomFilterFileInProgress);
+ } else {
+ // Marker file exists, which means last run gets interrupted.
+
+ // Remove bloom filter file.
+ FileUtils.deleteQuietly(bloomFilterFile);
+ }
+
+ // Create new bloom filter for the column.
+ LOGGER.info("Creating new bloom filter for segment: {}, column: {}", _segmentName, columnName);
+ try (BloomFilterCreator creator = new BloomFilterCreator(_indexDir, columnName,
+ columnMetadata.getCardinality())) {
+ if (columnMetadata.hasDictionary()) {
+ // Read dictionary
+ try (ImmutableDictionaryReader dictionaryReader = getDictionaryReader(columnMetadata, _segmentWriter)) {
+ for (int i = 0; i < dictionaryReader.length(); i++) {
+ creator.add(dictionaryReader.get(i));
+ }
+ }
+ } else {
+ // Read the forward index
+ throw new UnsupportedOperationException("Bloom filters not supported for no dictionary columns");
+ }
+ }
+
+ // For v3, write the generated bloom filter file into the single file and remove it.
+ if (_segmentVersion == SegmentVersion.v3) {
+ LoaderUtils.writeIndexToV3Format(_segmentWriter, columnName, bloomFilterFile, ColumnIndexType.BLOOM_FILTER);
+ }
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(bloomFilterFileInProgress);
+ LOGGER.info("Created bloom filter for segment: {}, column: {}", _segmentName, columnName);
+ }
+
+ private ImmutableDictionaryReader getDictionaryReader(ColumnMetadata columnMetadata,
+ SegmentDirectory.Writer segmentWriter) throws IOException {
+ PinotDataBuffer dictionaryBuffer =
+ segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
+ int cardinality = columnMetadata.getCardinality();
+ ImmutableDictionaryReader dictionaryReader;
+ DataType dataType = columnMetadata.getDataType();
+ switch (dataType) {
+ case INT:
+ dictionaryReader = new IntDictionary(dictionaryBuffer, cardinality);
+ break;
+ case LONG:
+ dictionaryReader = new LongDictionary(dictionaryBuffer, cardinality);
+ break;
+ case FLOAT:
+ dictionaryReader = new FloatDictionary(dictionaryBuffer, cardinality);
+ break;
+ case DOUBLE:
+ dictionaryReader = new DoubleDictionary(dictionaryBuffer, cardinality);
+ break;
+ case STRING:
+ dictionaryReader = new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength(),
+ (byte) columnMetadata.getPaddingCharacter());
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unsupported data type: " + dataType + " for column: " + columnMetadata.getColumnName());
+ }
+ return dictionaryReader;
+ }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java
new file mode 100644
index 0000000..349640f
--- /dev/null
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.segment.index.readers;
+
+import com.linkedin.pinot.core.bloom.BloomFilterType;
+import com.linkedin.pinot.core.bloom.BloomFilter;
+import com.linkedin.pinot.core.bloom.SegmentBloomFilterFactory;
+import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+
+/**
+ * Bloom filter reader
+ */
+public class BloomFilterReader {
+
+ private BloomFilter _bloomFilter;
+
+ public BloomFilterReader(PinotDataBuffer bloomFilterBuffer) throws IOException {
+ byte[] buffer = new byte[(int) bloomFilterBuffer.size()];
+ bloomFilterBuffer.copyTo(0, buffer);
+
+ try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(buffer))) {
+ BloomFilterType bloomFilterType = BloomFilterType.valueOf(in.readInt());
+ int version = in.readInt();
+ _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(bloomFilterType);
+ if (version != _bloomFilter.getVersion()) {
+ throw new IOException(
+ "Unexpected bloom filter version (type: " + bloomFilterType.toString() + ", version: " + version);
+ }
+ _bloomFilter.readFrom(in);
+ }
+ }
+
+ public boolean mightContain(Object key) {
+ return _bloomFilter.mightContain(key.toString());
+ }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
index 3579f1c..2973969 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java
@@ -93,7 +93,14 @@ abstract class ColumnIndexDirectory implements Closeable {
*/
public abstract PinotDataBuffer getInvertedIndexBufferFor(String column)
throws IOException;
-
+ /**
+ * Get inverted bloom filter buffer for a column
+ * @param column column name
+ * @return in-memory ByteBuffer like buffer for data
+ * @throws IOException
+ */
+ public abstract PinotDataBuffer getBloomFilterBufferFor(String column)
+ throws IOException;
/**
* Allocate a new data buffer of specified sizeBytes in the columnar index directory
* @param column column name
@@ -121,6 +128,15 @@ abstract class ColumnIndexDirectory implements Closeable {
*/
public abstract PinotDataBuffer newInvertedIndexBuffer(String column, long sizeBytes)
throws IOException;
+ /**
+ * Allocate a new data buffer of specified sizeBytes in the columnar index directory
+ * @param column column name
+ * @param sizeBytes sizeBytes for the buffer allocation
+ * @return in-memory ByteBuffer like buffer for data
+ * @throws IOException
+ */
+ public abstract PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes)
+ throws IOException;
/**
* Check if an index exists for a column
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
index 85d38f7..5f7e008 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java
@@ -18,7 +18,8 @@ package com.linkedin.pinot.core.segment.store;
public enum ColumnIndexType {
DICTIONARY("dictionary"),
FORWARD_INDEX("forward_index"),
- INVERTED_INDEX("inverted_index");
+ INVERTED_INDEX("inverted_index"),
+ BLOOM_FILTER("bloom_filter");
private final String indexName;
ColumnIndexType(String name) {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
index c6ea2d9..8599cde 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -81,6 +81,20 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
}
@Override
+ public PinotDataBuffer getBloomFilterBufferFor(String column)
+ throws IOException {
+ IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER);
+ return getReadBufferFor(key);
+ }
+
+ @Override
+ public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes)
+ throws IOException {
+ IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER);
+ return getWriteBufferFor(key, sizeBytes);
+ }
+
+ @Override
public boolean hasIndexFor(String column, ColumnIndexType type) {
File indexFile = getFileFor(column, type);
return indexFile.exists();
@@ -140,6 +154,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
case INVERTED_INDEX:
filename = metadata.getBitmapInvertedIndexFileName(column);
break;
+ case BLOOM_FILTER:
+ filename = metadata.getBloomFilterFileName(column);
+ break;
default:
throw new UnsupportedOperationException("Unknown index type: " + indexType.toString());
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
index d29c37c..3705a82 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java
@@ -233,6 +233,9 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
case INVERTED_INDEX:
buffer = columnIndexDirectory.getInvertedIndexBufferFor(column);
break;
+ case BLOOM_FILTER:
+ buffer = columnIndexDirectory.getBloomFilterBufferFor(column);
+ break;
default:
throw new RuntimeException("Unknown index type: " + type.name());
}
@@ -412,6 +415,8 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
return columnIndexDirectory.newForwardIndexBuffer(key.name, sizeBytes);
case INVERTED_INDEX:
return columnIndexDirectory.newInvertedIndexBuffer(key.name, sizeBytes);
+ case BLOOM_FILTER:
+ return columnIndexDirectory.newBloomFilterBuffer(key.name, sizeBytes);
default:
throw new RuntimeException("Unknown index type: " + indexType.name() +
" for directory: " + segmentDirectory);
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
index cf22cc6..5271985 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java
@@ -105,6 +105,12 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
}
@Override
+ public PinotDataBuffer getBloomFilterBufferFor(String column)
+ throws IOException {
+ return checkAndGetIndexBuffer(column, ColumnIndexType.BLOOM_FILTER);
+ }
+
+ @Override
public boolean hasIndexFor(String column, ColumnIndexType type) {
IndexKey key = new IndexKey(column, type);
return columnEntries.containsKey(key);
@@ -128,6 +134,12 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
return allocNewBufferInternal(column, ColumnIndexType.INVERTED_INDEX, sizeBytes, "inverted_index.create");
}
+ @Override
+ public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes)
+ throws IOException {
+ return allocNewBufferInternal(column, ColumnIndexType.BLOOM_FILTER, sizeBytes, "bloom_filter.create");
+ }
+
private PinotDataBuffer checkAndGetIndexBuffer(String column, ColumnIndexType type) {
IndexKey key = new IndexKey(column, type);
IndexEntry entry = columnEntries.get(key);
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
index aedd6db..f3d1c52 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.core.segment.virtualcolumn;
import com.linkedin.pinot.core.io.reader.DataFileReader;
import com.linkedin.pinot.core.segment.index.column.ColumnIndexContainer;
import com.linkedin.pinot.core.segment.index.column.PhysicalColumnIndexContainer;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
import com.linkedin.pinot.core.segment.index.readers.Dictionary;
import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
@@ -51,4 +52,9 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer {
public Dictionary getDictionary() {
return _dictionary;
}
+
+ @Override
+ public BloomFilterReader getBloomFilter() {
+ return null;
+ }
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
index b450ff5..8959969 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java
@@ -21,6 +21,7 @@ import com.linkedin.pinot.core.common.DataSource;
import com.linkedin.pinot.core.common.DataSourceMetadata;
import com.linkedin.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
import com.linkedin.pinot.core.operator.blocks.SingleValueBlock;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
import com.linkedin.pinot.core.segment.index.readers.Dictionary;
import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
@@ -100,6 +101,11 @@ public class StarTreeDimensionDataSource extends DataSource {
}
@Override
+ public BloomFilterReader getBloomFilter() {
+ return null;
+ }
+
+ @Override
public Dictionary getDictionary() {
return _dictionary;
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
index 31f742a..f9ac54d 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java
@@ -24,6 +24,7 @@ import com.linkedin.pinot.core.io.reader.impl.v1.BaseChunkSingleValueReader;
import com.linkedin.pinot.core.io.reader.impl.v1.FixedByteChunkSingleValueReader;
import com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import com.linkedin.pinot.core.operator.blocks.SingleValueBlock;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
import com.linkedin.pinot.core.segment.index.readers.Dictionary;
import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader;
import com.linkedin.pinot.core.segment.memory.PinotDataBuffer;
@@ -110,6 +111,11 @@ public class StarTreeMetricDataSource extends DataSource {
}
@Override
+ public BloomFilterReader getBloomFilter() {
+ return null;
+ }
+
+ @Override
protected Block getNextBlock() {
return new SingleValueBlock(_forwardIndex, _numDocs, _dataType, null);
}
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
index a278149..52b8542 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -86,10 +86,10 @@ public class RealtimeNoDictionaryTest {
}
Map<String, DataSource> dataSourceBlock = new HashMap<>();
- dataSourceBlock.put(INT_COL_NAME, new ColumnDataSource(intSpec, NUM_ROWS, 0, intRawIndex, null, null));
- dataSourceBlock.put(LONG_COL_NAME, new ColumnDataSource(longSpec, NUM_ROWS, 0, longRawIndex, null, null));
- dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec, NUM_ROWS, 0, floatRawIndex, null, null));
- dataSourceBlock.put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, NUM_ROWS, 0, doubleRawIndex, null, null));
+ dataSourceBlock.put(INT_COL_NAME, new ColumnDataSource(intSpec, NUM_ROWS, 0, intRawIndex, null, null, null));
+ dataSourceBlock.put(LONG_COL_NAME, new ColumnDataSource(longSpec, NUM_ROWS, 0, longRawIndex, null, null, null));
+ dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec, NUM_ROWS, 0, floatRawIndex, null, null, null));
+ dataSourceBlock.put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, NUM_ROWS, 0, doubleRawIndex, null, null, null));
return new DataFetcher(dataSourceBlock);
}
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
new file mode 100644
index 0000000..4a77a5d
--- /dev/null
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
@@ -0,0 +1,150 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.segment.index.creator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.BloomFilter;
+import com.linkedin.pinot.common.data.DimensionFieldSpec;
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.core.bloom.BloomFilterType;
+import com.linkedin.pinot.core.bloom.GuavaOnHeapBloomFilter;
+import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
+import com.linkedin.pinot.core.segment.creator.impl.bloom.BloomFilterCreator;
+import com.linkedin.pinot.core.bloom.BloomFilterUtil;
+
+import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import com.google.common.hash.Funnels;
+
+
+public class BloomFilterCreatorTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "BloomFilterCreatorTest");
+ private static int MB_IN_BYTES = 1024 * 1024;
+
+
+ public void setUp() throws Exception {
+ if (TEMP_DIR.exists()) {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ }
+ TEMP_DIR.mkdir();
+ }
+
+ @Test
+ public void testBloomFilterUtil() {
+ // Test against the known results
+ Assert.assertEquals(BloomFilterUtil.computeNumBits(1000000, 0.03), 7298441);
+ Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.03), 72984409);
+ Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.1), 47925292);
+
+ Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(1000000, 7298441), 5);
+ Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 72984409), 5);
+ Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 47925292), 3);
+
+ double threshold = 0.001;
+ Assert.assertTrue(
+ compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(1000000, 5, 7298441), 0.03, threshold));
+ Assert.assertTrue(
+ compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 5, 72984409), 0.03, threshold));
+ Assert.assertTrue(
+ compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 3, 47925292), 0.1, threshold));
+ }
+
+ private boolean compareDouble(double a, double b, double threshold) {
+ if (Math.abs(a - b) < threshold) {
+ return true;
+ }
+ return false;
+ }
+
+ @Test
+ public void testBloomFilterCreator() throws Exception {
+ // Create bloom filter directory
+ File bloomFilterDir = new File(TEMP_DIR, "bloomFilterDir");
+ bloomFilterDir.mkdirs();
+
+ // Create a bloom filter and serialize it to a file
+ int cardinality = 10000;
+ String columnName = "testColumn";
+ BloomFilterCreator
+ bloomFilterCreator = new BloomFilterCreator(bloomFilterDir, columnName, cardinality);
+ for (int i = 0; i < 5; i++) {
+ bloomFilterCreator.add(Integer.toString(i));
+ }
+ bloomFilterCreator.close();
+
+ // Deserialize the bloom filter and validate
+ File bloomFilterFile = new File(bloomFilterDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+
+ try (DataInputStream in = new DataInputStream(new FileInputStream(bloomFilterFile))) {
+ BloomFilterType type = BloomFilterType.valueOf(in.readInt());
+ int version = in.readInt();
+ GuavaOnHeapBloomFilter bloomFilter = new GuavaOnHeapBloomFilter();
+
+ Assert.assertEquals(type, bloomFilter.getBloomFilterType());
+ Assert.assertEquals(version, bloomFilter.getVersion());
+
+ bloomFilter.readFrom(in);
+ for (int i = 0; i < 5; i++) {
+ Assert.assertTrue(bloomFilter.mightContain(Integer.toString(i)));
+ }
+ for (int j = 5; j < 10; j++) {
+ Assert.assertFalse(bloomFilter.mightContain(Integer.toString(j)));
+ }
+ }
+ }
+
+ @Test
+ public void testBloomFilterSize() throws Exception {
+ int cardinalityArray[] = new int[] { 10, 100, 1000, 100000, 100000 , 1000000, 5000000, 10000000};
+ for (int cardinality : cardinalityArray) {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ File indexDir = new File(TEMP_DIR, "testBloomFilterSize");
+ Preconditions.checkState(indexDir.mkdirs());
+
+ String columnName = "testSize";
+ BloomFilterCreator
+ bloomFilterCreator = new BloomFilterCreator(indexDir, columnName, cardinality);
+ bloomFilterCreator.close();
+
+ File bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+
+ try (InputStream inputStream = new FileInputStream(bloomFilterFile)) {
+ byte[] bloomFilterBytes = IOUtils.toByteArray(inputStream);
+ long actualBloomFilterSize = bloomFilterBytes.length;
+ // Check if the size of bloom filter does not go beyond 1MB. Note that guava bloom filter has 11-12 bytes of
+ // overhead
+ Assert.assertTrue(actualBloomFilterSize < MB_IN_BYTES + 12);
+ }
+ }
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+}
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectoryTestHelper.java b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectoryTestHelper.java
index 7712ec0..a8851d0 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectoryTestHelper.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectoryTestHelper.java
@@ -50,6 +50,9 @@ public class ColumnIndexDirectoryTestHelper {
case INVERTED_INDEX:
buf = columnDirectory.newInvertedIndexBuffer(columnName, size);
break;
+ case BLOOM_FILTER:
+ buf = columnDirectory.newBloomFilterBuffer(columnName, size);
+ break;
}
return buf;
}
@@ -70,6 +73,10 @@ public class ColumnIndexDirectoryTestHelper {
case INVERTED_INDEX:
buf = columnDirectory.getInvertedIndexBufferFor(columnName);
break;
+ case BLOOM_FILTER:
+ buf = columnDirectory.getBloomFilterBufferFor(columnName);
+ break;
+
}
return buf;
}
@@ -130,6 +137,14 @@ public class ColumnIndexDirectoryTestHelper {
return invocationOnMock.getArguments()[0] + ".ii";
}
});
+ when(meta.getBloomFilterFileName(anyString()))
+ .thenAnswer(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ return invocationOnMock.getArguments()[0] + ".bloom";
+ }
+ });
return meta;
}
}
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
index 22fc481..1e3fba1 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java
@@ -21,6 +21,7 @@ import com.linkedin.pinot.common.utils.request.FilterQueryTree;
import com.linkedin.pinot.common.utils.request.RequestUtils;
import com.linkedin.pinot.core.query.pruner.ColumnValueSegmentPruner;
import com.linkedin.pinot.core.segment.index.ColumnMetadata;
+import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader;
import com.linkedin.pinot.pql.parsers.Pql2Compiler;
import java.util.HashMap;
import java.util.Map;
@@ -35,6 +36,7 @@ import org.testng.annotations.Test;
public class ColumnValueSegmentPrunerTest {
private static final Pql2Compiler COMPILER = new Pql2Compiler();
private static final Map<String, ColumnMetadata> COLUMN_METADATA_MAP = new HashMap<>();
+ private static final Map<String, BloomFilterReader> BLOOM_FILTER_MAP = new HashMap<>();
static {
COLUMN_METADATA_MAP.put("time", new ColumnMetadata.Builder().setColumnName("time")
@@ -95,6 +97,6 @@ public class ColumnValueSegmentPrunerTest {
BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(query);
FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
ColumnValueSegmentPruner pruner = new ColumnValueSegmentPruner();
- return pruner.pruneSegment(filterQueryTree, COLUMN_METADATA_MAP);
+ return pruner.pruneSegment(filterQueryTree, COLUMN_METADATA_MAP, BLOOM_FILTER_MAP);
}
}
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTest.java
index 7c6eec5..01e7a0c 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -58,6 +58,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
private static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter");
private static final List<String> DEFAULT_RAW_INDEX_COLUMNS =
Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime");
+ private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin");
protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
protected final File _avroDir = new File(_tempDir, "avroDir");
@@ -152,6 +153,11 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
}
@Nullable
+ protected List<String> getBloomFilterIndexColumns() {
+ return DEFAULT_BLOOM_FILTER_COLUMNS;
+ }
+
+ @Nullable
protected List<String> getRawIndexColumns() {
return DEFAULT_RAW_INDEX_COLUMNS;
}
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
index e931fd0..7803874 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
@@ -279,15 +279,15 @@ public abstract class ClusterTest extends ControllerTest {
}
protected void addOfflineTable(String tableName, SegmentVersion segmentVersion) throws Exception {
- addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null);
+ addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null);
}
protected void addOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
- TableTaskConfig taskConfig) throws Exception {
+ List<String> bloomFilterColumns, TableTaskConfig taskConfig) throws Exception {
TableConfig tableConfig =
getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
- invertedIndexColumns, taskConfig);
+ invertedIndexColumns, bloomFilterColumns, taskConfig);
if (!isUsingNewConfigFormat()) {
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJSONConfigString());
@@ -298,10 +298,10 @@ public abstract class ClusterTest extends ControllerTest {
protected void updateOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
- TableTaskConfig taskConfig) throws Exception {
+ List<String> bloomFilterColumns, TableTaskConfig taskConfig) throws Exception {
TableConfig tableConfig =
getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
- invertedIndexColumns, taskConfig);
+ invertedIndexColumns, bloomFilterColumns, taskConfig);
if (!isUsingNewConfigFormat()) {
sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJSONConfigString());
@@ -312,7 +312,7 @@ public abstract class ClusterTest extends ControllerTest {
private static TableConfig getOfflineTableConfig(String tableName, String timeColumnName, String timeType,
String brokerTenant, String serverTenant, String loadMode, SegmentVersion segmentVersion,
- List<String> invertedIndexColumns, TableTaskConfig taskConfig) throws Exception {
+ List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig) throws Exception {
return new TableConfig.Builder(Helix.TableType.OFFLINE).setTableName(tableName)
.setTimeColumnName(timeColumnName)
.setTimeType(timeType)
@@ -322,6 +322,7 @@ public abstract class ClusterTest extends ControllerTest {
.setLoadMode(loadMode)
.setSegmentVersion(segmentVersion.toString())
.setInvertedIndexColumns(invertedIndexColumns)
+ .setBloomFilterColumns(bloomFilterColumns)
.setTaskConfig(taskConfig)
.build();
}
@@ -374,8 +375,8 @@ public abstract class ClusterTest extends ControllerTest {
protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
- List<String> invertedIndexColumns, List<String> noDictionaryColumns, TableTaskConfig taskConfig,
- String streamConsumerFactoryName) throws Exception {
+ List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
+ TableTaskConfig taskConfig, String streamConsumerFactoryName) throws Exception {
Map<String, String> streamConfigs = new HashMap<>();
String streamType = "kafka";
streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
@@ -417,6 +418,7 @@ public abstract class ClusterTest extends ControllerTest {
.setLoadMode(loadMode)
.setSortedColumn(sortedColumn)
.setInvertedIndexColumns(invertedIndexColumns)
+ .setBloomFilterColumns(bloomFilterColumns)
.setNoDictionaryColumns(noDictionaryColumns)
.setStreamConfigs(streamConfigs)
.setTaskConfig(taskConfig)
@@ -437,13 +439,13 @@ public abstract class ClusterTest extends ControllerTest {
protected void addHybridTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
String kafkaTopic, int realtimeSegmentFlushSize, File avroFile, String timeColumnName, String timeType,
String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
- List<String> invertedIndexColumns, List<String> noDictionaryColumns, TableTaskConfig taskConfig,
- String streamConsumerFactoryName) throws Exception {
+ List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
+ TableTaskConfig taskConfig, String streamConsumerFactoryName) throws Exception {
addOfflineTable(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, SegmentVersion.v1,
- invertedIndexColumns, taskConfig);
+ invertedIndexColumns, bloomFilterColumns, taskConfig);
addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushSize, avroFile,
timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
- noDictionaryColumns, taskConfig, streamConsumerFactoryName);
+ bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
}
protected void createBrokerTenant(String tenantName, int brokerCount) throws Exception {
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 5ad0fcf..3c4323f 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -89,7 +89,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
// Create the table
addOfflineTable(getTableName(), _schema.getTimeColumnName(), _schema.getOutgoingTimeUnit().toString(), null, null,
- getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getTaskConfig());
+ getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig());
// Generate and push Pinot segments from Hadoop
generateAndPushSegmentsFromHadoop();
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java
index 023cae3..3c8fb8f 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -134,8 +134,8 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME,
- TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getRawIndexColumns(),
- getTaskConfig(), getStreamConsumerFactoryClassName());
+ TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
+ getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName());
completeTableConfiguration();
}
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index ec22006..8010a6c 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -289,7 +289,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
String timeType = outgoingTimeUnit.toString();
addHybridTable(_tableName, _useLlc, KAFKA_BROKER, KAFKA_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(),
_realtimeAvroFiles.get(0), timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, "MMAP",
- _sortedColumn, _invertedIndexColumns, null, null, getStreamConsumerFactoryClassName());
+ _sortedColumn, _invertedIndexColumns, null, null, null, getStreamConsumerFactoryClassName());
// Upload all segments
uploadSegments(_tarDir);
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 383c95b..4e5febf 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -47,6 +47,7 @@ import org.testng.annotations.Test;
public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 1;
+ private static final int NUM_SEGMENTS = 12;
// For inverted index triggering test
private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
@@ -54,6 +55,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+ private static final List<String> UPDATED_BLOOM_FLITER_COLUMNS = Arrays.asList("Carrier");
+ private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
+
// For default columns test
private static final String SCHEMA_WITH_EXTRA_COLUMNS =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema";
@@ -121,7 +125,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Create the table
addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, getInvertedIndexColumns(),
- getTaskConfig());
+ getBloomFilterIndexColumns(), getTaskConfig());
completeTableConfiguration();
@@ -149,7 +153,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Update table config and trigger reload
updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
- UPDATED_INVERTED_INDEX_COLUMNS, getTaskConfig());
+ UPDATED_INVERTED_INDEX_COLUMNS, null, getTaskConfig());
updateTableConfiguration();
@@ -170,6 +174,35 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}, 600_000L, "Failed to generate inverted index");
}
+ @Test
+ public void testBloomFilterTriggering() throws Exception {
+ final long numTotalDocs = getCountStarResult();
+ JSONObject queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+ Assert.assertEquals(queryResponse.getLong("numSegmentsProcessed"), NUM_SEGMENTS);
+
+ // Update table config and trigger reload
+ updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
+ null, UPDATED_BLOOM_FLITER_COLUMNS, getTaskConfig());
+
+ updateTableConfiguration();
+
+ sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+
+ TestUtils.waitForCondition(new Function<Void, Boolean>() {
+ @Override
+ public Boolean apply(@Nullable Void aVoid) {
+ try {
+ JSONObject queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+ // Total docs should not change during reload
+ Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+ return queryResponse.getLong("numSegmentsProcessed") == 0L;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 600_000L, "Failed to generate inverted index");
+ }
+
/**
* We will add extra new columns to the schema to test adding new columns with default value to the offline segments.
* <p>New columns are: (name, field type, data type, single/multi value, default null value)
@@ -362,7 +395,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
public void testBrokerResponseMetadata() throws Exception {
super.testBrokerResponseMetadata();
}
-
+
@Test
public void testUDF() throws Exception {
String pqlQuery = "SELECT COUNT(*) FROM mytable GROUP BY timeConvert(DaysSinceEpoch,'DAYS','SECONDS')";
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 37a117e..d72d41e 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -95,8 +95,8 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
addRealtimeTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, null, null,
- getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getRawIndexColumns(), getTaskConfig(),
- getStreamConsumerFactoryClassName());
+ getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(),
+ getTaskConfig(), getStreamConsumerFactoryClassName());
completeTableConfiguration();
}
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index a2cdb1a..e47c87b 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -79,9 +79,9 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
taskTypeConfigsMap.put(TestTaskGenerator.TASK_TYPE, Collections.emptyMap());
taskConfig.setTaskTypeConfigsMap(taskTypeConfigsMap);
- addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, taskConfig);
- addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, taskConfig);
- addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null);
+ addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig);
+ addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig);
+ addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null);
_helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
diff --git a/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkRunner.java b/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkRunner.java
deleted file mode 100644
index 03cafee..0000000
--- a/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkRunner.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.perf;
-
-import com.linkedin.pinot.tools.perf.PerfBenchmarkDriver;
-import java.util.Arrays;
-import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>PerfBenchmarkRunner</code> class is a tool to start Pinot cluster with optional preloaded segments.
- */
-public class PerfBenchmarkRunner {
- private PerfBenchmarkRunner() {
- }
-
- private static final Logger LOGGER = LoggerFactory.getLogger(PerfBenchmarkRunner.class);
-
- /**
- * Start Pinot server with pre-loaded segments.
- *
- * @param dataDir data directory.
- * @param tableNames list of table names to be loaded.
- * @param invertedIndexColumns list of inverted index columns.
- * @throws Exception
- */
- public static void startServerWithPreLoadedSegments(String dataDir, List<String> tableNames,
- List<String> invertedIndexColumns)
- throws Exception {
- LOGGER.info("Starting server and uploading segments.");
- PerfBenchmarkDriver driver = PerfBenchmarkDriver.startComponents(false, false, false, true, dataDir);
- for (String tableName : tableNames) {
- com.linkedin.pinot.tools.perf.PerfBenchmarkRunner.loadTable(driver, dataDir, tableName, invertedIndexColumns);
- }
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length > 0) {
- if (args[0].equalsIgnoreCase("startAllButServer") || args[0].equalsIgnoreCase("startAll")) {
- PerfBenchmarkDriver.startComponents(true, true, true, false, null);
- }
-
- if (args[0].equalsIgnoreCase("startServerWithPreLoadedSegments") || args[0].equalsIgnoreCase("startAll")) {
- String tableNames = args[1];
- String dataDir = args[2];
- List<String> invertedIndexColumns = null;
- if (args.length == 4) {
- invertedIndexColumns = Arrays.asList(args[3].split(","));
- }
- startServerWithPreLoadedSegments(dataDir, Arrays.asList(tableNames.split(",")), invertedIndexColumns);
- }
- } else {
- LOGGER.error("Expected one of [startAll|startAllButServer|StartServerWithPreLoadedSegments]");
- }
- }
-}
diff --git a/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkTest.java b/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkTest.java
deleted file mode 100644
index 2c503d4..0000000
--- a/pinot-perf/src/main/java/com/linkedin/pinot/perf/PerfBenchmarkTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.perf;
-
-import com.google.common.collect.Lists;
-import com.linkedin.pinot.tools.perf.PerfBenchmarkDriver;
-import com.linkedin.pinot.tools.perf.PerfBenchmarkDriverConf;
-import com.linkedin.pinot.tools.perf.QueryRunner;
-import java.util.ArrayList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PerfBenchmarkTest {
- private PerfBenchmarkTest() {
- }
-
- private static final Logger LOGGER = LoggerFactory.getLogger(PerfBenchmarkTest.class);
-
- private static void setupCluster(String dataDir, String offlineTableName) throws Exception {
- LOGGER.info("Setting up cluster");
- PerfBenchmarkDriver.startComponents(true, true, true, false, null);
- PerfBenchmarkRunner.startServerWithPreLoadedSegments(dataDir, Lists.newArrayList(offlineTableName),
- new ArrayList<String>());
- }
-
- private static void runQueries(String queryFile) throws Exception {
- System.out.println("Running queries....");
- PerfBenchmarkDriverConf conf = new PerfBenchmarkDriverConf();
- conf.setStartBroker(false);
- conf.setStartController(false);
- conf.setStartServer(false);
- conf.setStartZookeeper(false);
- conf.setUploadIndexes(false);
- conf.setRunQueries(true);
- conf.setConfigureResources(false);
-
- QueryRunner.singleThreadedQueryRunner(conf, queryFile, 1, 3000, 10);
- LOGGER.info("Running queries completed.");
- }
-
- private static void execute(String dataDir, String offlineTableName, String queryFile) throws Exception {
- setupCluster(dataDir, offlineTableName);
- runQueries(queryFile);
- }
-
- public static void main(String[] args) {
- if (args.length != 3) {
- LOGGER.error("Incorrect number of arguments.");
- LOGGER.info("PerfBenchmarkTest <DataDir> <offlineTableName> <PathToQueryFile>");
- return;
- }
-
- try {
- PerfBenchmarkTest.execute(args[0], args[1], args[2]);
- LOGGER.info("Benchmark test completed");
- System.exit(0);
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- e.printStackTrace();
- System.exit(1);
- }
- }
-}
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
index 999f022..2368d8a 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -16,6 +16,7 @@
package com.linkedin.pinot.tools.perf;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.config.TableNameBuilder;
@@ -259,10 +260,10 @@ public class PerfBenchmarkDriver {
public void configureTable(String tableName)
throws Exception {
- configureTable(tableName, null);
+ configureTable(tableName, null, null);
}
- public void configureTable(String tableName, List<String> invertedIndexColumns)
+ public void configureTable(String tableName, List<String> invertedIndexColumns, List<String> bloomFilterColumns)
throws Exception {
TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName)
.setSegmentAssignmentStrategy(_segmentAssignmentStrategy)
@@ -272,6 +273,7 @@ public class PerfBenchmarkDriver {
.setLoadMode(_loadMode)
.setSegmentVersion(_segmentFormatVersion)
.setInvertedIndexColumns(invertedIndexColumns)
+ .setBloomFilterColumns(bloomFilterColumns)
.build();
_helixResourceManager.addTable(tableConfig);
}
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
index ac124fe..17fec66 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java
@@ -72,6 +72,10 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command
usage = "Comma separated inverted index columns to be created (non-batch load).")
private String _invertedIndexColumns;
+ @Option(name = "-bloomFilterColumns", required = false, metaVar = "<String>",
+ usage = "Comma separated bloom filter columns to be created (non-batch load).")
+ private String _bloomFilterColumns;
+
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"},
usage = "Print this message.")
private boolean _help = false;
@@ -132,7 +136,7 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command
@Override
public void run() {
try {
- loadTable(driver, _dataDir, tableName, null);
+ loadTable(driver, _dataDir, tableName, null, null);
} catch (Exception e) {
LOGGER.error("Caught exception while loading table: {}", tableName, e);
}
@@ -146,14 +150,18 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command
if (_invertedIndexColumns != null) {
invertedIndexColumns = Arrays.asList(_invertedIndexColumns.split(","));
}
+ List<String> bloomFilterColumns = null;
+ if (_bloomFilterColumns != null) {
+ bloomFilterColumns = Arrays.asList(_bloomFilterColumns.split(","));
+ }
for (String tableName : _tableNames.split(",")) {
- loadTable(driver, _dataDir, tableName, invertedIndexColumns);
+ loadTable(driver, _dataDir, tableName, invertedIndexColumns, bloomFilterColumns);
}
}
}
public static void loadTable(PerfBenchmarkDriver driver, String dataDir, String tableName,
- List<String> invertedIndexColumns)
+ List<String> invertedIndexColumns, List<String> bloomFilterColumns)
throws Exception {
boolean tableConfigured = false;
File[] segments = new File(dataDir, tableName).listFiles();
@@ -161,7 +169,7 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command
for (File segment : segments) {
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segment);
if (!tableConfigured) {
- driver.configureTable(segmentMetadata.getTableName(), invertedIndexColumns);
+ driver.configureTable(segmentMetadata.getTableName(), invertedIndexColumns, bloomFilterColumns);
tableConfigured = true;
}
driver.addSegment(segmentMetadata);
diff --git a/pom.xml b/pom.xml
index 81a6dbb..4d6075e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -874,6 +874,7 @@
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
+ <compilerVersion>${jdk.version}</compilerVersion>
</configuration>
</plugin>
<plugin>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org