You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/10/08 04:36:26 UTC
[incubator-pinot] 01/01: Implement off-heap bloom filter reader
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch offheap_bloom_filter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 979828bfbaaeb1a9d4707b53541782fee0bb4eec
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Wed Oct 7 21:24:55 2020 -0700
Implement off-heap bloom filter reader
---
.../org/apache/pinot/core/bloom/BloomFilter.java | 74 -------------
.../apache/pinot/core/bloom/BloomFilterUtil.java | 57 -----------
.../pinot/core/bloom/GuavaOnHeapBloomFilter.java | 76 --------------
.../core/bloom/SegmentBloomFilterFactory.java | 51 ---------
.../indexsegment/mutable/MutableSegmentImpl.java | 7 ++
.../query/pruner/ColumnValueSegmentPruner.java | 2 +-
.../creator/BloomFilterCreator.java} | 42 +++-----
.../creator/impl/bloom/BloomFilterCreator.java | 68 ------------
.../impl/bloom/OnHeapGuavaBloomFilterCreator.java | 69 +++++++++++++
.../index/column/PhysicalColumnIndexContainer.java | 16 +--
.../segment/index/loader/IndexLoadingConfig.java | 19 ++--
.../loader/bloomfilter/BloomFilterHandler.java | 58 +++++------
.../segment/index/readers/BloomFilterReader.java | 43 +++-----
.../readers/bloom/BloomFilterReaderFactory.java} | 39 ++-----
.../bloom/GuavaBloomFilterReaderUtils.java} | 41 +++-----
.../bloom/OffHeapGuavaBloomFilterReader.java | 81 +++++++++++++++
.../apache/pinot/core/util/TableConfigUtils.java | 18 ++--
.../index/creator/BloomFilterCreatorTest.java | 114 +++++----------------
.../pinot/spi/config/table/BloomFilterConfig.java | 40 +++-----
.../pinot/spi/config/table/IndexingConfig.java | 10 ++
20 files changed, 320 insertions(+), 605 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilter.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilter.java
deleted file mode 100644
index b4aab57..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilter.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.bloom;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-
-/**
- * Interface for bloom filter
- */
-public interface BloomFilter {
-
- /**
- * Get the version of bloom filter implementation
- * @return a version
- */
- int getVersion();
-
- /**
- * Get the type of the bloom filter
- * @return a bloom filter type
- */
- BloomFilterType getBloomFilterType();
-
- /**
- * Add element to bloom filter
- *
- * @param input input object
- */
- void add(Object input);
-
- /**
- * Check if the input element may exist or not
- *
- * @param input input object for testing
- * @return true if the input may exist, false if it does not exist
- */
- boolean mightContain(Object input);
-
- /**
- * Serialize bloom filter to output stream.
- *
- * @param out output stream
- * @throws IOException
- */
- void writeTo(OutputStream out)
- throws IOException;
-
- /**
- * Deserialize the bloom filter from input stream.
- * @param in input stream
- * @throws IOException
- */
- void readFrom(InputStream in)
- throws IOException;
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterUtil.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterUtil.java
deleted file mode 100644
index fc83b01..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterUtil.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.bloom;
-
-/**
- * Util class for bloom filter
- */
-public class BloomFilterUtil {
-
- private BloomFilterUtil() {
- }
-
- public static long computeNumBits(long cardinality, double maxFalsePosProbability) {
- return (long) (Math
- .ceil((cardinality * Math.log(maxFalsePosProbability)) / Math.log(1.0 / Math.pow(2.0, Math.log(2.0)))));
- }
-
- public static int computeNumberOfHashFunctions(long cardinality, long numBits) {
- return (int) Math.max(1.0, Math.round(((double) numBits / cardinality) * Math.log(2.0)));
- }
-
- public static double computeMaxFalsePosProbability(long cardinality, int numHashFunction, long numBits) {
- return Math.pow(1.0 - Math.exp(-1.0 * numHashFunction / ((double) numBits / cardinality)), numHashFunction);
- }
-
- public static double computeMaxFalsePositiveProbabilityForNumBits(long cardinality, long maxNumBits,
- double defaultMaxFalsePosProbability) {
- // Get the number of bits required for achieving default false positive probability
- long numBitsRequired = BloomFilterUtil.computeNumBits(cardinality, defaultMaxFalsePosProbability);
-
- // If the size of bloom filter is smaller than 1MB, use default max false positive probability
- if (numBitsRequired <= maxNumBits) {
- return defaultMaxFalsePosProbability;
- }
-
- // If the size of bloom filter is larger than 1MB, compute the maximum false positive probability within
- // storage limit
- int numHashFunction = BloomFilterUtil.computeNumberOfHashFunctions(cardinality, maxNumBits);
- return BloomFilterUtil.computeMaxFalsePosProbability(cardinality, numHashFunction, maxNumBits);
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/GuavaOnHeapBloomFilter.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/GuavaOnHeapBloomFilter.java
deleted file mode 100644
index 1339516..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/GuavaOnHeapBloomFilter.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.bloom;
-
-import com.google.common.hash.Funnels;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-
-
-/**
- * Bloom filter implementation with guava library
- */
-public class GuavaOnHeapBloomFilter implements BloomFilter {
- // Increment the version when the bloom filter implementation becomes backward incompatible
- private static final int VERSION = 1;
-
- private com.google.common.hash.BloomFilter _bloomFilter;
-
- public GuavaOnHeapBloomFilter() {
- }
-
- public GuavaOnHeapBloomFilter(int cardinality, double maxFalsePosProbability) {
- _bloomFilter = com.google.common.hash.BloomFilter
- .create(Funnels.stringFunnel(Charset.forName("UTF-8")), cardinality, maxFalsePosProbability);
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
-
- @Override
- public BloomFilterType getBloomFilterType() {
- return BloomFilterType.GUAVA_ON_HEAP;
- }
-
- @Override
- public void add(Object input) {
- _bloomFilter.put(input.toString());
- }
-
- @Override
- public boolean mightContain(Object input) {
- return _bloomFilter.mightContain(input.toString());
- }
-
- @Override
- public void writeTo(OutputStream out)
- throws IOException {
- _bloomFilter.writeTo(out);
- }
-
- @Override
- public void readFrom(InputStream in)
- throws IOException {
- _bloomFilter = com.google.common.hash.BloomFilter.readFrom(in, Funnels.stringFunnel(Charset.forName("UTF-8")));
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/SegmentBloomFilterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/SegmentBloomFilterFactory.java
deleted file mode 100644
index 4d6dbd9..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/SegmentBloomFilterFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.bloom;
-
-/**
- * Factory for bloom filter
- */
-public class SegmentBloomFilterFactory {
-
- /**
- * Factory used when creating a new bloom filter
- *
- * @param cardinality cardinality of column
- * @param maxFalsePosProbability maximum false positive probability
- * @return a bloom filter
- */
- public static BloomFilter createSegmentBloomFilter(int cardinality, double maxFalsePosProbability) {
- // TODO: when we add more types of bloom filter, we will need to add a new config and wire in here
- return new GuavaOnHeapBloomFilter(cardinality, maxFalsePosProbability);
- }
-
- /**
- * Factory used when deserializing a bloom filter
- *
- * @param type a bloom filter type
- * @return a bloom filter based on the given type
- */
- public static BloomFilter createSegmentBloomFilter(BloomFilterType type) {
- switch (type) {
- case GUAVA_ON_HEAP:
- return new GuavaOnHeapBloomFilter();
- }
- throw new RuntimeException("Invalid bloom filter type: " + type.toString());
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 57534cf..38e061f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -1067,6 +1067,13 @@ public class MutableSegmentImpl implements MutableSegment {
_logger.error("Caught exception while closing text index for column: {}, continuing with error", column, e);
}
}
+ if (_bloomFilter != null) {
+ try {
+ _bloomFilter.close();
+ } catch (Exception e) {
+ _logger.error("Caught exception while closing bloom filter for column: {}, continuing with error", column, e);
+ }
+ }
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index 2704dc6..98eb36e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -155,7 +155,7 @@ public class ColumnValueSegmentPruner implements SegmentPruner {
// Check bloom filter
BloomFilterReader bloomFilter = dataSource.getBloomFilter();
if (bloomFilter != null) {
- if (!bloomFilter.mightContain(value)) {
+ if (!bloomFilter.mightContain(value.toString())) {
return true;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/BloomFilterCreator.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/creator/BloomFilterCreator.java
index ac6efd8..7647b96 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/BloomFilterCreator.java
@@ -16,38 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.bloom;
+package org.apache.pinot.core.segment.creator;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.Closeable;
+import java.io.IOException;
-/**
- * Enum for bloom filter type
- */
-public enum BloomFilterType {
- // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
- // when serializing/deserializing a bloom filter
- GUAVA_ON_HEAP(1);
-
- private int _value;
- private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
-
- BloomFilterType(int value) {
- _value = value;
- }
-
- static {
- for (BloomFilterType pageType : BloomFilterType.values()) {
- _bloomFilterTypeMap.put(pageType._value, pageType);
- }
- }
+public interface BloomFilterCreator extends Closeable {
- public static BloomFilterType valueOf(int pageType) {
- return _bloomFilterTypeMap.get(pageType);
- }
+ /**
+ * Adds a value to the bloom filter.
+ */
+ void add(String value);
- public int getValue() {
- return _value;
- }
+ /**
+ * Seals the index and flushes it to disk.
+ */
+ void seal()
+ throws IOException;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
deleted file mode 100644
index 7c1a6c4..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.creator.impl.bloom;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import org.apache.pinot.core.bloom.BloomFilter;
-import org.apache.pinot.core.bloom.BloomFilterUtil;
-import org.apache.pinot.core.bloom.SegmentBloomFilterFactory;
-import org.apache.pinot.core.segment.creator.impl.V1Constants;
-
-
-/**
- * Bloom filter creator
- *
- * Note:
- * 1. Currently, we limit the filter size to 1MB to avoid the heap overhead. We can remove it once we have the offheap
- * implementation of the bloom filter.
- * 2. When capping the bloom filter to 1MB, max false pos steeply grows from 1 million cardinality. If the column has
- * larger than "5 million" cardinality, it is not recommended to use bloom filter since maxFalsePosProb is already
- * 0.45 when the filter size is 1MB.
- */
-public class BloomFilterCreator implements AutoCloseable {
- private static double DEFAULT_MAX_FALSE_POS_PROBABILITY = 0.05;
- private static int MB_IN_BITS = 8388608;
-
- private BloomFilter _bloomFilter;
- private File _bloomFilterFile;
-
- public BloomFilterCreator(File indexDir, String columnName, int cardinality) {
- _bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
- double maxFalsePosProbability = BloomFilterUtil
- .computeMaxFalsePositiveProbabilityForNumBits(cardinality, MB_IN_BITS, DEFAULT_MAX_FALSE_POS_PROBABILITY);
- _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(cardinality, maxFalsePosProbability);
- }
-
- @Override
- public void close()
- throws IOException {
- try (DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(_bloomFilterFile))) {
- outputStream.writeInt(_bloomFilter.getBloomFilterType().getValue());
- outputStream.writeInt(_bloomFilter.getVersion());
- _bloomFilter.writeTo(outputStream);
- }
- }
-
- public void add(Object input) {
- _bloomFilter.add(input.toString());
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/OnHeapGuavaBloomFilterCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/OnHeapGuavaBloomFilterCreator.java
new file mode 100644
index 0000000..fec9a76
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/OnHeapGuavaBloomFilterCreator.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.bloom;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.core.segment.creator.BloomFilterCreator;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
+
+
+/**
+ * On-heap creator for guava bloom filter.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OnHeapGuavaBloomFilterCreator implements BloomFilterCreator {
+ public static final int TYPE_VALUE = 1;
+ public static final int VERSION = 1;
+
+ private final File _bloomFilterFile;
+ private final BloomFilter<String> _bloomFilter;
+
+ public OnHeapGuavaBloomFilterCreator(File indexDir, String columnName, int cardinality,
+ BloomFilterConfig bloomFilterConfig) {
+ _bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+ _bloomFilter =
+ BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8), cardinality, bloomFilterConfig.getFpp());
+ }
+
+ @Override
+ public void add(String value) {
+ _bloomFilter.put(value);
+ }
+
+ @Override
+ public void seal()
+ throws IOException {
+ try (DataOutputStream out = new DataOutputStream(new FileOutputStream(_bloomFilterFile))) {
+ out.writeInt(TYPE_VALUE);
+ out.writeInt(VERSION);
+ _bloomFilter.writeTo(out);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index 8405cf3..0573afe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -44,6 +44,7 @@ import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
import org.apache.pinot.core.segment.index.readers.SortedIndexReader;
import org.apache.pinot.core.segment.index.readers.StringDictionary;
import org.apache.pinot.core.segment.index.readers.TextIndexReader;
+import org.apache.pinot.core.segment.index.readers.bloom.BloomFilterReaderFactory;
import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader;
import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReader;
import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
@@ -66,7 +67,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
private final InvertedIndexReader<?> _rangeIndex;
private final TextIndexReader _textIndex;
private final BaseImmutableDictionary _dictionary;
- private final BloomFilterReader _bloomFilterReader;
+ private final BloomFilterReader _bloomFilter;
private final NullValueVectorReaderImpl _nullValueVectorReader;
public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata,
@@ -82,7 +83,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName);
loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
- loadBloomFilter = indexLoadingConfig.getBloomFilterColumns().contains(columnName);
+ loadBloomFilter = indexLoadingConfig.getBloomFilterConfigs().containsKey(columnName);
loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName);
}
@@ -108,9 +109,9 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
//bloom filter
if (loadBloomFilter) {
PinotDataBuffer bloomFilterBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER);
- _bloomFilterReader = new BloomFilterReader(bloomFilterBuffer);
+ _bloomFilter = BloomFilterReaderFactory.getBloomFilterReader(bloomFilterBuffer);
} else {
- _bloomFilterReader = null;
+ _bloomFilter = null;
}
// Dictionary-based index
_dictionary = loadDictionary(segmentReader.getIndexFor(columnName, ColumnIndexType.DICTIONARY), metadata,
@@ -150,7 +151,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
// Raw index
_forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
_dictionary = null;
- _bloomFilterReader = null;
+ _bloomFilter = null;
_rangeIndex = null;
_invertedIndex = null;
}
@@ -183,7 +184,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
@Override
public BloomFilterReader getBloomFilter() {
- return _bloomFilterReader;
+ return _bloomFilter;
}
@Override
@@ -265,5 +266,8 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
if (_textIndex != null) {
_textIndex.close();
}
+ if (_bloomFilter != null) {
+ _bloomFilter.close();
+ }
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index e296da2..798ce61 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -52,7 +53,7 @@ public class IndexLoadingConfig {
private Map<String, String> _noDictionaryConfig = new HashMap<>();
private Set<String> _varLengthDictionaryColumns = new HashSet<>();
private Set<String> _onHeapDictionaryColumns = new HashSet<>();
- private Set<String> _bloomFilterColumns = new HashSet<>();
+ private Map<String, BloomFilterConfig> _bloomFilterConfigs = new HashMap<>();
private boolean _enableDynamicStarTreeCreation;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;
private boolean _enableDefaultStarTree;
@@ -98,7 +99,13 @@ public class IndexLoadingConfig {
List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
if (bloomFilterColumns != null) {
- _bloomFilterColumns.addAll(bloomFilterColumns);
+ for (String bloomFilterColumn : bloomFilterColumns) {
+ _bloomFilterConfigs.put(bloomFilterColumn, new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP));
+ }
+ }
+ Map<String, BloomFilterConfig> bloomFilterConfigs = indexingConfig.getBloomFilterConfigs();
+ if (bloomFilterConfigs != null) {
+ _bloomFilterConfigs.putAll(bloomFilterConfigs);
}
List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
@@ -265,8 +272,8 @@ public class IndexLoadingConfig {
}
@VisibleForTesting
- public void setBloomFilterColumns(Set<String> bloomFilterColumns) {
- _bloomFilterColumns = bloomFilterColumns;
+ public void setBloomFilterConfigs(Map<String, BloomFilterConfig> bloomFilterConfigs) {
+ _bloomFilterConfigs = bloomFilterConfigs;
}
@VisibleForTesting
@@ -290,8 +297,8 @@ public class IndexLoadingConfig {
return _onHeapDictionaryColumns;
}
- public Set<String> getBloomFilterColumns() {
- return _bloomFilterColumns;
+ public Map<String, BloomFilterConfig> getBloomFilterConfigs() {
+ return _bloomFilterConfigs;
}
public boolean isEnableDynamicStarTreeCreation() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
index 6c6ec81..1e15e79 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -21,17 +21,20 @@ package org.apache.pinot.core.segment.index.loader.bloomfilter;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
-import javax.annotation.Nonnull;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.segment.creator.BloomFilterCreator;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
-import org.apache.pinot.core.segment.creator.impl.bloom.BloomFilterCreator;
+import org.apache.pinot.core.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.core.segment.index.loader.LoaderUtils;
import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.index.readers.BytesDictionary;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.core.segment.index.readers.DoubleDictionary;
import org.apache.pinot.core.segment.index.readers.FloatDictionary;
import org.apache.pinot.core.segment.index.readers.IntDictionary;
@@ -40,6 +43,7 @@ import org.apache.pinot.core.segment.index.readers.StringDictionary;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.apache.pinot.core.segment.store.ColumnIndexType;
import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,16 +56,18 @@ public class BloomFilterHandler {
private final SegmentDirectory.Writer _segmentWriter;
private final String _segmentName;
private final SegmentVersion _segmentVersion;
+ private final Map<String, BloomFilterConfig> _bloomFilterConfigs;
private final Set<ColumnMetadata> _bloomFilterColumns = new HashSet<>();
- public BloomFilterHandler(@Nonnull File indexDir, @Nonnull SegmentMetadataImpl segmentMetadata,
- @Nonnull IndexLoadingConfig indexLoadingConfig, @Nonnull SegmentDirectory.Writer segmentWriter) {
+ public BloomFilterHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+ SegmentDirectory.Writer segmentWriter) {
_indexDir = indexDir;
_segmentWriter = segmentWriter;
_segmentName = segmentMetadata.getName();
_segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+ _bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs();
- for (String column : indexLoadingConfig.getBloomFilterColumns()) {
+ for (String column : _bloomFilterConfigs.keySet()) {
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
if (columnMetadata != null) {
_bloomFilterColumns.add(columnMetadata);
@@ -75,6 +81,7 @@ public class BloomFilterHandler {
if (columnMetadata.hasDictionary()) {
createBloomFilterForColumn(columnMetadata);
}
+ // TODO: Support raw index
}
}
@@ -102,19 +109,17 @@ public class BloomFilterHandler {
}
// Create new bloom filter for the column.
- LOGGER.info("Creating new bloom filter for segment: {}, column: {}", _segmentName, columnName);
- try (BloomFilterCreator creator = new BloomFilterCreator(_indexDir, columnName, columnMetadata.getCardinality())) {
- if (columnMetadata.hasDictionary()) {
- // Read dictionary
- try (BaseImmutableDictionary dictionaryReader = getDictionaryReader(columnMetadata, _segmentWriter)) {
- for (int i = 0; i < dictionaryReader.length(); i++) {
- creator.add(dictionaryReader.get(i));
- }
- }
- } else {
- // Read the forward index
- throw new UnsupportedOperationException("Bloom filters not supported for no dictionary columns");
+ BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
+ LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", _segmentName, columnName,
+ bloomFilterConfig);
+ try (BloomFilterCreator bloomFilterCreator = new OnHeapGuavaBloomFilterCreator(_indexDir, columnName,
+ columnMetadata.getCardinality(), bloomFilterConfig);
+ Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter)) {
+ int length = dictionary.length();
+ for (int i = 0; i < length; i++) {
+ bloomFilterCreator.add(dictionary.getStringValue(i));
}
+ bloomFilterCreator.seal();
}
// For v3, write the generated bloom filter file into the single file and remove it.
@@ -133,29 +138,24 @@ public class BloomFilterHandler {
PinotDataBuffer dictionaryBuffer =
segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
int cardinality = columnMetadata.getCardinality();
- BaseImmutableDictionary dictionaryReader;
DataType dataType = columnMetadata.getDataType();
switch (dataType) {
case INT:
- dictionaryReader = new IntDictionary(dictionaryBuffer, cardinality);
- break;
+ return new IntDictionary(dictionaryBuffer, cardinality);
case LONG:
- dictionaryReader = new LongDictionary(dictionaryBuffer, cardinality);
- break;
+ return new LongDictionary(dictionaryBuffer, cardinality);
case FLOAT:
- dictionaryReader = new FloatDictionary(dictionaryBuffer, cardinality);
- break;
+ return new FloatDictionary(dictionaryBuffer, cardinality);
case DOUBLE:
- dictionaryReader = new DoubleDictionary(dictionaryBuffer, cardinality);
- break;
+ return new DoubleDictionary(dictionaryBuffer, cardinality);
case STRING:
- dictionaryReader = new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength(),
+ return new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength(),
(byte) columnMetadata.getPaddingCharacter());
- break;
+ case BYTES:
+ return new BytesDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength());
default:
throw new IllegalStateException(
"Unsupported data type: " + dataType + " for column: " + columnMetadata.getColumnName());
}
- return dictionaryReader;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java
index 63bebde..09a1dcf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java
@@ -18,40 +18,23 @@
*/
package org.apache.pinot.core.segment.index.readers;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import org.apache.pinot.core.bloom.BloomFilter;
-import org.apache.pinot.core.bloom.BloomFilterType;
-import org.apache.pinot.core.bloom.SegmentBloomFilterFactory;
-import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import java.io.Closeable;
/**
- * Bloom filter reader
+ * Interface for bloom filter reader.
*/
-public class BloomFilterReader {
+public interface BloomFilterReader extends Closeable {
- private BloomFilter _bloomFilter;
+ /**
+ * Returns {@code true} if the given value might have been put in this bloom filer, {@code false} otherwise.
+ */
+ boolean mightContain(String value);
- public BloomFilterReader(PinotDataBuffer bloomFilterBuffer)
- throws IOException {
- byte[] buffer = new byte[(int) bloomFilterBuffer.size()];
- bloomFilterBuffer.copyTo(0, buffer);
-
- try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(buffer))) {
- BloomFilterType bloomFilterType = BloomFilterType.valueOf(in.readInt());
- int version = in.readInt();
- _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(bloomFilterType);
- if (version != _bloomFilter.getVersion()) {
- throw new IOException(
- "Unexpected bloom filter version (type: " + bloomFilterType.toString() + ", version: " + version);
- }
- _bloomFilter.readFrom(in);
- }
- }
-
- public boolean mightContain(Object key) {
- return _bloomFilter.mightContain(key.toString());
- }
+ /**
+ * Returns {@code true} if the value with the given hash might have been put in this bloom filer, {@code false}
+ * otherwise.
+ * <p>This method is provided to prevent hashing the same value multiple times.
+ */
+ boolean mightContain(byte[] hash);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/BloomFilterReaderFactory.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/BloomFilterReaderFactory.java
index ac6efd8..e507b3d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/BloomFilterReaderFactory.java
@@ -16,38 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.bloom;
+package org.apache.pinot.core.segment.index.readers.bloom;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
-/**
- * Enum for bloom filter type
- */
-public enum BloomFilterType {
- // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
- // when serializing/deserializing a bloom filter
- GUAVA_ON_HEAP(1);
-
- private int _value;
- private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
-
- BloomFilterType(int value) {
- _value = value;
- }
-
- static {
- for (BloomFilterType pageType : BloomFilterType.values()) {
- _bloomFilterTypeMap.put(pageType._value, pageType);
- }
- }
-
- public static BloomFilterType valueOf(int pageType) {
- return _bloomFilterTypeMap.get(pageType);
+public class BloomFilterReaderFactory {
+ private BloomFilterReaderFactory() {
}
- public int getValue() {
- return _value;
+ public static BloomFilterReader getBloomFilterReader(PinotDataBuffer dataBuffer) {
+ int typeValue = dataBuffer.getInt(0);
+ int version = dataBuffer.getInt(4);
+ Preconditions.checkState(typeValue == 1 && version == 1);
+ return new OffHeapGuavaBloomFilterReader(dataBuffer.view(8, dataBuffer.size()));
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
index ac6efd8..648fdff 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java
@@ -16,38 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.bloom;
+package org.apache.pinot.core.segment.index.readers.bloom;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.pinot.spi.utils.StringUtils;
-/**
- * Enum for bloom filter type
- */
-public enum BloomFilterType {
- // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
- // when serializing/deserializing a bloom filter
- GUAVA_ON_HEAP(1);
-
- private int _value;
- private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
-
- BloomFilterType(int value) {
- _value = value;
+@SuppressWarnings("UnstableApiUsage")
+public class GuavaBloomFilterReaderUtils {
+ private GuavaBloomFilterReaderUtils() {
}
- static {
- for (BloomFilterType pageType : BloomFilterType.values()) {
- _bloomFilterTypeMap.put(pageType._value, pageType);
- }
- }
-
- public static BloomFilterType valueOf(int pageType) {
- return _bloomFilterTypeMap.get(pageType);
- }
+ // DO NOT change the hash function. It has to be aligned with the bloom filter creator.
+ private static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
- public int getValue() {
- return _value;
+ /**
+ * Returns the hash of the given value as a byte array.
+ */
+ public static byte[] hash(String value) {
+ return HASH_FUNCTION.hashBytes(StringUtils.encodeUtf8(value)).asBytes();
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/OffHeapGuavaBloomFilterReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/OffHeapGuavaBloomFilterReader.java
new file mode 100644
index 0000000..19a632f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/OffHeapGuavaBloomFilterReader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers.bloom;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+
+/**
+ * Off-heap reader for guava bloom filter.
+ * <p>The behavior should be aligned with {@link com.google.common.hash.BloomFilter}.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class OffHeapGuavaBloomFilterReader implements BloomFilterReader {
+ private final int _numHashFunctions;
+ private final long _numBits;
+ private final PinotDataBuffer _valueBuffer;
+
+ /**
+ * Format of the data buffer header:
+ * - Strategy ordinal: 1 byte
+ * - Number of hash functions: 1 byte
+ * - Number of long values: 4 bytes
+ */
+ public OffHeapGuavaBloomFilterReader(PinotDataBuffer dataBuffer) {
+ byte strategyOrdinal = dataBuffer.getByte(0);
+ Preconditions.checkState(strategyOrdinal == 1, "Invalid strategy ordinal: %s", strategyOrdinal);
+ _numHashFunctions = dataBuffer.getByte(1) & 0xFF;
+ _numBits = (long) dataBuffer.getInt(2) * Long.SIZE;
+ _valueBuffer = dataBuffer.view(6, dataBuffer.size());
+ }
+
+ @Override
+ public boolean mightContain(String value) {
+ return mightContain(GuavaBloomFilterReaderUtils.hash(value));
+ }
+
+ @Override
+ public boolean mightContain(byte[] hash) {
+ long hash1 = Longs.fromBytes(hash[7], hash[6], hash[5], hash[4], hash[3], hash[2], hash[1], hash[0]);
+ long hash2 = Longs.fromBytes(hash[15], hash[14], hash[13], hash[12], hash[11], hash[10], hash[9], hash[8]);
+ long combinedHash = hash1;
+ for (int i = 0; i < _numHashFunctions; i++) {
+ long bitIndex = (combinedHash & Long.MAX_VALUE) % _numBits;
+ // NOTE: Guava bloom filter stores bits in a long array. Inside each long value, the bits are stored in the
+ // reverse order (the first bit is stored as the right most bit of the long).
+ int longIndex = (int) (bitIndex >>> 6);
+ int bitIndexInLong = (int) (bitIndex & 0x3F);
+ int byteIndex = (longIndex << 3) | (7 - (bitIndexInLong >>> 3));
+ if ((_valueBuffer.getByte(byteIndex) & (1 << (bitIndexInLong & 7))) == 0) {
+ return false;
+ }
+ combinedHash += hash2;
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
+ // caller is responsible of closing the PinotDataBuffer.
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
index d793f34..18267f8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
@@ -40,7 +40,6 @@ import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
@@ -294,14 +293,19 @@ public final class TableConfigUtils {
noDictionaryColumnsSet.add(columnName);
}
}
+ Set<String> bloomFilterColumns = new HashSet<>();
if (indexingConfig.getBloomFilterColumns() != null) {
- for (String columnName : indexingConfig.getBloomFilterColumns()) {
- if (noDictionaryColumnsSet.contains(columnName)) {
- throw new IllegalStateException(
- "Cannot create a Bloom Filter on column " + columnName + " specified in the noDictionaryColumns config");
- }
- columnNameToConfigMap.put(columnName, "Bloom Filter Config");
+ bloomFilterColumns.addAll(indexingConfig.getBloomFilterColumns());
+ }
+ if (indexingConfig.getBloomFilterConfigs() != null) {
+ bloomFilterColumns.addAll(indexingConfig.getBloomFilterConfigs().keySet());
+ }
+ for (String bloomFilterColumn : bloomFilterColumns) {
+ if (noDictionaryColumnsSet.contains(bloomFilterColumn)) {
+ throw new IllegalStateException("Cannot create a Bloom Filter on column " + bloomFilterColumn
+ + " specified in the noDictionaryColumns config");
}
+ columnNameToConfigMap.put(bloomFilterColumn, "Bloom Filter Config");
}
if (indexingConfig.getInvertedIndexColumns() != null) {
for (String columnName : indexingConfig.getInvertedIndexColumns()) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
index e89b36d..4ecc586 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java
@@ -18,120 +18,54 @@
*/
package org.apache.pinot.core.segment.index.creator;
-import com.google.common.base.Preconditions;
-import java.io.DataInputStream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.pinot.core.bloom.BloomFilterType;
-import org.apache.pinot.core.bloom.BloomFilterUtil;
-import org.apache.pinot.core.bloom.GuavaOnHeapBloomFilter;
+import org.apache.pinot.core.segment.creator.BloomFilterCreator;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
-import org.apache.pinot.core.segment.creator.impl.bloom.BloomFilterCreator;
+import org.apache.pinot.core.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
+import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
+import org.apache.pinot.core.segment.index.readers.bloom.BloomFilterReaderFactory;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class BloomFilterCreatorTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "BloomFilterCreatorTest");
- private static int MB_IN_BYTES = 1024 * 1024;
+ @BeforeClass
public void setUp()
throws Exception {
- if (TEMP_DIR.exists()) {
- FileUtils.deleteQuietly(TEMP_DIR);
- }
- TEMP_DIR.mkdir();
- }
-
- @Test
- public void testBloomFilterUtil() {
- // Test against the known results
- Assert.assertEquals(BloomFilterUtil.computeNumBits(1000000, 0.03), 7298441);
- Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.03), 72984409);
- Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.1), 47925292);
-
- Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(1000000, 7298441), 5);
- Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 72984409), 5);
- Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 47925292), 3);
-
- double threshold = 0.001;
- Assert
- .assertTrue(compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(1000000, 5, 7298441), 0.03, threshold));
- Assert.assertTrue(
- compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 5, 72984409), 0.03, threshold));
- Assert.assertTrue(
- compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 3, 47925292), 0.1, threshold));
- }
-
- private boolean compareDouble(double a, double b, double threshold) {
- if (Math.abs(a - b) < threshold) {
- return true;
- }
- return false;
+ TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
}
@Test
public void testBloomFilterCreator()
throws Exception {
- // Create bloom filter directory
- File bloomFilterDir = new File(TEMP_DIR, "bloomFilterDir");
- bloomFilterDir.mkdirs();
-
- // Create a bloom filter and serialize it to a file
+ // Create the bloom filter
int cardinality = 10000;
String columnName = "testColumn";
- BloomFilterCreator bloomFilterCreator = new BloomFilterCreator(bloomFilterDir, columnName, cardinality);
- for (int i = 0; i < 5; i++) {
- bloomFilterCreator.add(Integer.toString(i));
- }
- bloomFilterCreator.close();
-
- // Deserialize the bloom filter and validate
- File bloomFilterFile = new File(bloomFilterDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
-
- try (DataInputStream in = new DataInputStream(new FileInputStream(bloomFilterFile))) {
- BloomFilterType type = BloomFilterType.valueOf(in.readInt());
- int version = in.readInt();
- GuavaOnHeapBloomFilter bloomFilter = new GuavaOnHeapBloomFilter();
-
- Assert.assertEquals(type, bloomFilter.getBloomFilterType());
- Assert.assertEquals(version, bloomFilter.getVersion());
-
- bloomFilter.readFrom(in);
+ try (BloomFilterCreator bloomFilterCreator = new OnHeapGuavaBloomFilterCreator(TEMP_DIR, columnName, cardinality,
+ new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP))) {
for (int i = 0; i < 5; i++) {
- Assert.assertTrue(bloomFilter.mightContain(Integer.toString(i)));
- }
- for (int j = 5; j < 10; j++) {
- Assert.assertFalse(bloomFilter.mightContain(Integer.toString(j)));
+ bloomFilterCreator.add(Integer.toString(i));
}
+ bloomFilterCreator.seal();
}
- }
-
- @Test
- public void testBloomFilterSize()
- throws Exception {
- int cardinalityArray[] = new int[]{10, 100, 1000, 100000, 100000, 1000000, 5000000, 10000000};
- for (int cardinality : cardinalityArray) {
- FileUtils.deleteQuietly(TEMP_DIR);
- File indexDir = new File(TEMP_DIR, "testBloomFilterSize");
- Preconditions.checkState(indexDir.mkdirs());
-
- String columnName = "testSize";
- BloomFilterCreator bloomFilterCreator = new BloomFilterCreator(indexDir, columnName, cardinality);
- bloomFilterCreator.close();
- File bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
-
- try (InputStream inputStream = new FileInputStream(bloomFilterFile)) {
- byte[] bloomFilterBytes = IOUtils.toByteArray(inputStream);
- long actualBloomFilterSize = bloomFilterBytes.length;
- // Check if the size of bloom filter does not go beyond 1MB. Note that guava bloom filter has 11-12 bytes of
- // overhead
- Assert.assertTrue(actualBloomFilterSize < MB_IN_BYTES + 12);
+ // Read the bloom filter
+ File bloomFilterFile = new File(TEMP_DIR, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+ try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(bloomFilterFile);
+ BloomFilterReader bloomFilterReader = BloomFilterReaderFactory.getBloomFilterReader(dataBuffer)) {
+ for (int i = 0; i < 5; i++) {
+ Assert.assertTrue(bloomFilterReader.mightContain(Integer.toString(i)));
+ }
+ for (int i = 5; i < 10; i++) {
+ Assert.assertFalse(bloomFilterReader.mightContain(Integer.toString(i)));
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/BloomFilterConfig.java
similarity index 50%
rename from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/config/table/BloomFilterConfig.java
index ac6efd8..d488ece 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/BloomFilterConfig.java
@@ -16,38 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.bloom;
+package org.apache.pinot.spi.config.table;
-import java.util.HashMap;
-import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.pinot.spi.config.BaseJsonConfig;
-/**
- * Enum for bloom filter type
- */
-public enum BloomFilterType {
- // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value
- // when serializing/deserializing a bloom filter
- GUAVA_ON_HEAP(1);
-
- private int _value;
- private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>();
+public class BloomFilterConfig extends BaseJsonConfig {
+ public static final double DEFAULT_FPP = 0.05;
- BloomFilterType(int value) {
- _value = value;
- }
-
- static {
- for (BloomFilterType pageType : BloomFilterType.values()) {
- _bloomFilterTypeMap.put(pageType._value, pageType);
- }
- }
+ private final double _fpp;
- public static BloomFilterType valueOf(int pageType) {
- return _bloomFilterTypeMap.get(pageType);
+ @JsonCreator
+ public BloomFilterConfig(@JsonProperty(value = "fpp", required = true) double fpp) {
+ Preconditions.checkArgument(fpp > 0.0 && fpp < 1.0, "Invalid fpp (false positive probability): %s", fpp);
+ _fpp = fpp;
}
- public int getValue() {
- return _value;
+ public double getFpp() {
+ return _fpp;
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 3081e28..3dd137b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -31,6 +31,7 @@ public class IndexingConfig extends BaseJsonConfig {
private boolean _createInvertedIndexDuringSegmentGeneration;
private List<String> _sortedColumn;
private List<String> _bloomFilterColumns;
+ private Map<String, BloomFilterConfig> _bloomFilterConfigs;
private String _loadMode;
private Map<String, String> _streamConfigs;
private String _segmentFormatVersion;
@@ -105,6 +106,15 @@ public class IndexingConfig extends BaseJsonConfig {
}
@Nullable
+ public Map<String, BloomFilterConfig> getBloomFilterConfigs() {
+ return _bloomFilterConfigs;
+ }
+
+ public void setBloomFilterConfigs(Map<String, BloomFilterConfig> bloomFilterConfigs) {
+ _bloomFilterConfigs = bloomFilterConfigs;
+ }
+
+ @Nullable
public String getLoadMode() {
return _loadMode;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org