You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/05/14 15:21:40 UTC
[incubator-pinot] branch master updated: Supporting range queries
using indexes (#5240)
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 602f28a Supporting range queries using indexes (#5240)
602f28a is described below
commit 602f28acae56dd598197a59ac2b425ea40149f08
Author: Kishore Gopalakrishna <g....@gmail.com>
AuthorDate: Thu May 14 08:21:28 2020 -0700
Supporting range queries using indexes (#5240)
* Adding range index support
* Adding comments
* Make range index to store start of all ranges and end of last range
* Adding comments and fixing broken tests
* More comments
Co-authored-by: Xiang Fu <fx...@gmail.com>
---
.../pinot/core/common/BlockMultiValIterator.java | 4 +-
.../org/apache/pinot/core/common/DataSource.java | 6 +
.../indexsegment/mutable/MutableSegmentImpl.java | 4 +-
.../DictionaryBasedMultiValueIterator.java | 108 ++++
...ava => DictionaryBasedSingleValueIterator.java} | 43 +-
.../docvaliterators/MultiValueIterator.java | 25 +
.../core/operator/filter/FilterOperatorUtils.java | 6 +
.../core/operator/filter/RangeFilterOperator.java | 114 +++++
.../filter/predicate/PredicateEvaluator.java | 3 +
.../org/apache/pinot/core/plan/FilterPlanNode.java | 1 -
.../core/segment/creator/impl/V1Constants.java | 1 +
.../creator/impl/inv/RangeIndexCreator.java | 542 +++++++++++++++++++++
.../segment/index/column/ColumnIndexContainer.java | 5 +
.../index/column/PhysicalColumnIndexContainer.java | 20 +-
.../segment/index/datasource/BaseDataSource.java | 11 +-
.../index/datasource/ImmutableDataSource.java | 2 +-
.../index/datasource/MutableDataSource.java | 4 +-
.../segment/index/loader/IndexLoadingConfig.java | 18 +
.../segment/index/loader/SegmentPreProcessor.java | 6 +
.../loader/invertedindex/RangeIndexHandler.java | 169 +++++++
.../segment/index/metadata/SegmentMetadata.java | 2 +
.../index/metadata/SegmentMetadataImpl.java | 5 +
.../segment/index/readers/RangeIndexReader.java | 230 +++++++++
.../pinot/core/segment/store/ColumnIndexType.java | 3 +-
.../core/segment/store/FilePerIndexDirectory.java | 3 +
.../virtualcolumn/VirtualColumnIndexContainer.java | 5 +
.../core/startree/v2/store/StarTreeDataSource.java | 3 +-
.../core/common/RealtimeNoDictionaryTest.java | 12 +-
.../index/creator/RangeIndexCreatorTest.java | 161 ++++++
.../tests/BaseClusterIntegrationTest.java | 8 +-
.../pinot/integration/tests/ClusterTest.java | 42 +-
.../ControllerPeriodicTasksIntegrationTests.java | 9 +-
.../tests/HybridClusterIntegrationTest.java | 2 +-
...ridClusterIntegrationTestCommandLineRunner.java | 2 +-
.../tests/OfflineClusterIntegrationTest.java | 41 +-
.../tests/SimpleMinionClusterIntegrationTest.java | 8 +-
.../pinot/spi/config/table/IndexingConfig.java | 9 +
.../spi/utils/builder/TableConfigBuilder.java | 7 +
.../tools/scan/query/RangePredicateFilter.java | 8 +
39 files changed, 1596 insertions(+), 56 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
index 69d5da5..68c0fc5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
@@ -24,7 +24,7 @@ package org.apache.pinot.core.common;
*/
public abstract class BlockMultiValIterator implements BlockValIterator {
- public int nextCharVal(int[] charArray) {
+ public int nextCharVal(char[] charArray) {
throw new UnsupportedOperationException();
}
@@ -44,7 +44,7 @@ public abstract class BlockMultiValIterator implements BlockValIterator {
throw new UnsupportedOperationException();
}
- public byte[][] nextBytesArrayVal(byte[][] bytesArrays) {
+ public int nextBytesArrayVal(byte[][] bytesArrays) {
throw new UnsupportedOperationException();
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
index 8f33931..492c344 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
@@ -55,6 +55,12 @@ public abstract class DataSource extends BaseOperator {
public abstract InvertedIndexReader getInvertedIndex();
/**
+ * Returns the inverted index for the column if exists, or {@code null} if not.
+ */
+ @Nullable
+ public abstract InvertedIndexReader getRangeIndex();
+
+ /**
* Returns the bloom filter for the column if exists, or {@code null} if not.
*/
@Nullable
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 39d7a30..74af7cc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -114,6 +114,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final Map<String, BaseMutableDictionary> _dictionaryMap = new HashMap<>();
private final Map<String, DataFileReader> _indexReaderWriterMap = new HashMap<>();
private final Map<String, InvertedIndexReader> _invertedIndexMap = new HashMap<>();
+ private final Map<String, InvertedIndexReader> _rangeIndexMap = new HashMap<>();
private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
private final Map<String, RealtimeNullValueVectorReaderWriter> _nullValueVectorMap = new HashMap<>();
private final IdMap<FixedIntArray> _recordIdMap;
@@ -621,11 +622,12 @@ public class MutableSegmentImpl implements MutableSegment {
DataFileReader forwardIndex = _indexReaderWriterMap.get(column);
BaseMutableDictionary dictionary = _dictionaryMap.get(column);
InvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
+ InvertedIndexReader rangeIndex = _rangeIndexMap.get(column);
BloomFilterReader bloomFilter = _bloomFilterMap.get(column);
RealtimeNullValueVectorReaderWriter nullValueVector = _nullValueVectorMap.get(column);
return new MutableDataSource(fieldSpec, _numDocsIndexed, numValuesInfo.getNumValues(),
numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, forwardIndex, dictionary,
- invertedIndex, bloomFilter, nullValueVector);
+ invertedIndex, rangeIndex, bloomFilter, nullValueVector);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
new file mode 100644
index 0000000..9f3dc2a
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.docvaliterators;
+
+import org.apache.pinot.core.common.BlockMultiValIterator;
+import org.apache.pinot.core.common.BlockSingleValIterator;
+import org.apache.pinot.core.io.reader.ReaderContext;
+import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
+import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+@SuppressWarnings("unchecked")
+public final class DictionaryBasedMultiValueIterator extends BlockMultiValIterator {
+
+ private final SingleColumnMultiValueReader _reader;
+ private final int _numDocs;
+ private final ReaderContext _context;
+ private final Dictionary _dictionary;
+ private final int[] _dictIds;
+
+ private int _nextDocId;
+
+ public DictionaryBasedMultiValueIterator(SingleColumnMultiValueReader reader, Dictionary dictionary, int numDocs,
+ int maxLength) {
+ _reader = reader;
+ _numDocs = numDocs;
+ _context = _reader.createContext();
+ _dictionary = dictionary;
+ _dictIds = new int[maxLength];
+ }
+
+ @Override
+ public int nextIntVal(int[] intArray) {
+ int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+ for (int i = 0; i < length; i++) {
+ intArray[i] = _dictionary.getIntValue(_dictIds[i]);
+ }
+ return length;
+ }
+
+ @Override
+ public int nextDoubleVal(double[] doubleArray) {
+ int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+ for (int i = 0; i < length; i++) {
+ doubleArray[i] = _dictionary.getDoubleValue(_dictIds[i]);
+ }
+ return length;
+ }
+
+ @Override
+ public int nextFloatVal(float[] floatArray) {
+ int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+ for (int i = 0; i < length; i++) {
+ floatArray[i] = _dictionary.getFloatValue(_dictIds[i]);
+ }
+ return length;
+ }
+
+ @Override
+ public int nextLongVal(long[] longArray) {
+ int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+ for (int i = 0; i < length; i++) {
+ longArray[i] = _dictionary.getLongValue(_dictIds[i]);
+ }
+ return length;
+ }
+
+ @Override
+ public int nextBytesArrayVal(byte[][] bytesArrays) {
+ int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+ for (int i = 0; i < length; i++) {
+ bytesArrays[i] = _dictionary.getBytesValue(_dictIds[i]);
+ }
+ return length;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _nextDocId < _numDocs;
+ }
+
+ @Override
+ public void skipTo(int docId) {
+ _nextDocId = docId;
+ }
+
+ @Override
+ public void reset() {
+ _nextDocId = 0;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
similarity index 52%
copy from pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
copy to pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
index 85ae7c7..34855e0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
@@ -18,28 +18,57 @@
*/
package org.apache.pinot.core.operator.docvaliterators;
-import org.apache.pinot.core.common.BlockMultiValIterator;
+import org.apache.pinot.core.common.BlockSingleValIterator;
import org.apache.pinot.core.io.reader.ReaderContext;
-import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
+import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
@SuppressWarnings("unchecked")
-public final class MultiValueIterator extends BlockMultiValIterator {
- private final SingleColumnMultiValueReader _reader;
+public final class DictionaryBasedSingleValueIterator extends BlockSingleValIterator {
+
+ private final SingleColumnSingleValueReader _reader;
private final int _numDocs;
private final ReaderContext _context;
+ private final Dictionary _dictionary;
private int _nextDocId;
- public MultiValueIterator(SingleColumnMultiValueReader reader, int numDocs) {
+ public DictionaryBasedSingleValueIterator(SingleColumnSingleValueReader reader, Dictionary dictionary, int numDocs) {
_reader = reader;
_numDocs = numDocs;
_context = _reader.createContext();
+ _dictionary = dictionary;
+ }
+
+ @Override
+ public int nextIntVal() {
+ return _dictionary.getIntValue(_reader.getInt(_nextDocId++, _context));
+ }
+
+ @Override
+ public long nextLongVal() {
+ return _dictionary.getLongValue(_reader.getInt(_nextDocId++, _context));
+ }
+
+ @Override
+ public float nextFloatVal() {
+ return _dictionary.getFloatValue(_reader.getInt(_nextDocId++, _context));
+ }
+
+ @Override
+ public double nextDoubleVal() {
+ return _dictionary.getDoubleValue(_reader.getInt(_nextDocId++, _context));
+ }
+
+ @Override
+ public String nextStringVal() {
+ return _dictionary.getStringValue(_reader.getInt(_nextDocId++, _context));
}
@Override
- public int nextIntVal(int[] intArray) {
- return _reader.getIntArray(_nextDocId++, intArray, _context);
+ public byte[] nextBytesVal() {
+ return _dictionary.getBytesValue(_reader.getInt(_nextDocId++, _context));
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
index 85ae7c7..4529c84 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
@@ -43,6 +43,31 @@ public final class MultiValueIterator extends BlockMultiValIterator {
}
@Override
+ public int nextCharVal(char[] charArray) {
+ return _reader.getCharArray(_nextDocId++, charArray);
+ }
+
+ @Override
+ public int nextDoubleVal(double[] doubleArray) {
+ return _reader.getDoubleArray(_nextDocId++, doubleArray);
+ }
+
+ @Override
+ public int nextFloatVal(float[] floatArray) {
+ return _reader.getFloatArray(_nextDocId++, floatArray);
+ }
+
+ @Override
+ public int nextLongVal(long[] longArray) {
+ return _reader.getLongArray(_nextDocId++, longArray);
+ }
+
+ @Override
+ public int nextBytesArrayVal(byte[][] bytesArrays) {
+ return _reader.getBytesArray(_nextDocId++, bytesArrays);
+ }
+
+ @Override
public boolean hasNext() {
return _nextDocId < _numDocs;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
index cb5bbb3..ed134b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
@@ -54,6 +54,12 @@ public class FilterOperatorUtils {
Predicate.Type predicateType = predicateEvaluator.getPredicateType();
+ //Only for dictionary encoded columns and offline data sources
+ if (predicateType == Predicate.Type.RANGE && dataSource.getDictionary() != null
+ && dataSource.getRangeIndex() != null) {
+ return new RangeFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId);
+ }
+
if (predicateType == Predicate.Type.TEXT_MATCH) {
return new TextMatchFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java
new file mode 100644
index 0000000..8398bef
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.common.Predicate;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.dociditerators.MVScanDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.SVScanDocIdIterator;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory;
+import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class RangeFilterOperator extends BaseFilterOperator {
+ private static final String OPERATOR_NAME = "RangeFilterOperator";
+
+ private final PredicateEvaluator _predicateEvaluator;
+ private final DataSource _dataSource;
+ private final int _startDocId;
+ private final int _endDocId;
+ private final boolean _exclusive;
+
+ public RangeFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int startDocId,
+ int endDocId) {
+ // NOTE:
+ // Predicate that is always evaluated as true or false should not be passed into the BitmapBasedFilterOperator for
+ // performance concern.
+ // If predicate is always evaluated as true, use MatchAllFilterOperator; if predicate is always evaluated as false,
+ // use EmptyFilterOperator.
+ Preconditions.checkArgument(!predicateEvaluator.isAlwaysTrue() && !predicateEvaluator.isAlwaysFalse());
+
+ _predicateEvaluator = predicateEvaluator;
+ _dataSource = dataSource;
+ _startDocId = startDocId;
+ _endDocId = endDocId;
+ _exclusive = predicateEvaluator.isExclusive();
+ }
+
+ @Override
+ protected FilterBlock getNextBlock() {
+
+ //only dictionary based is supported for now
+ Preconditions.checkState(_predicateEvaluator instanceof RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator);
+
+ RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator evaluator =
+ (RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator) _predicateEvaluator;
+
+ RangeIndexReader rangeIndexReader = (RangeIndexReader) _dataSource.getRangeIndex();
+ int startRangeId = rangeIndexReader.findRangeId(evaluator.getStartDictId());
+ int endRangeId = rangeIndexReader.findRangeId(evaluator.getEndDictId());
+ //Handle Matching Ranges - some ranges match fully but some partially
+ //below code assumes first and last range always match partially which may not be the case always //todo: optimize it
+ MutableRoaringBitmap mutableRoaringBitmap = new MutableRoaringBitmap();
+ mutableRoaringBitmap.or(rangeIndexReader.getDocIds(startRangeId));
+ if (endRangeId != startRangeId) {
+ mutableRoaringBitmap.or(rangeIndexReader.getDocIds(endRangeId));
+ }
+ final ScanBasedFilterOperator scanBasedFilterOperator =
+ new ScanBasedFilterOperator(_predicateEvaluator, _dataSource, _startDocId, _endDocId);
+ FilterBlockDocIdSet scanBlockDocIdSet = scanBasedFilterOperator.getNextBlock().getBlockDocIdSet();
+ BlockDocIdIterator iterator = scanBlockDocIdSet.iterator();
+
+ List<ImmutableRoaringBitmap> bitmapList = new ArrayList<>();
+ if (_dataSource.getDataSourceMetadata().isSingleValue()) {
+ bitmapList.add(((SVScanDocIdIterator) iterator).applyAnd(mutableRoaringBitmap));
+ } else {
+ bitmapList.add(((MVScanDocIdIterator) iterator).applyAnd(mutableRoaringBitmap));
+ }
+
+ //All the intermediate ranges will be full match
+ for (int rangeId = startRangeId + 1; rangeId < endRangeId; rangeId++) {
+ bitmapList.add(rangeIndexReader.getDocIds(rangeId));
+ }
+ ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[bitmapList.size()];
+ bitmapList.toArray(bitmaps);
+ return new FilterBlock(new BitmapDocIdSet(bitmaps, _startDocId, _endDocId, _exclusive) {
+
+ @Override
+ public long getNumEntriesScannedInFilter() {
+ return scanBlockDocIdSet.getNumEntriesScannedInFilter();
+ }
+ });
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluator.java
index a24e492..09c8768 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluator.java
@@ -22,6 +22,8 @@ import org.apache.pinot.core.common.Predicate;
public interface PredicateEvaluator {
+
+
/**
* APIs for both dictionary based and raw value based predicate evaluator
*/
@@ -180,4 +182,5 @@ public interface PredicateEvaluator {
* @return Whether the entry matches the predicate
*/
boolean applyMV(byte[][] values, int length);
+
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 0ddee87..2b1b29f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -117,7 +117,6 @@ public class FilterPlanNode implements PlanNode {
TransformExpressionTree expression = filterQueryTree.getExpression();
if (expression.getExpressionType() == TransformExpressionTree.ExpressionType.FUNCTION) {
-
return new ExpressionFilterOperator(segment, expression, predicate);
} else {
DataSource dataSource = segment.getDataSource(filterQueryTree.getColumn());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index c3316cb..3dc60aa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -36,6 +36,7 @@ public class V1Constants {
public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
+ public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue";
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java
new file mode 100644
index 0000000..ab33ea2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java
@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.Arrays;
+import it.unimi.dsi.fastutil.Swapper;
+import it.unimi.dsi.fastutil.ints.IntComparator;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.query.utils.Pair;
+import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+
+
+/**
+ * Implementation of {@link InvertedIndexCreator} that uses off-heap memory.
+ * <p>We use 2 passes to create the range index.
+ * <ul>
+ *
+ * <li>
+ * A
+ * </li>
+ * <li>
+ * In the first pass (adding values phase), when add() method is called, store the raw values into the
+ * value buffer (for multi-valued column we flatten the values).
+ * We also store the corresponding docId in docIdBuffer which will be sorted in the next phase based on the value in valueBuffer.
+ *
+ * </li>
+ * <li>
+ * In the second pass (processing values phase), when seal() method is called, we sort the docIdBuffer based on the value in valueBuffer.
+ * We then iterate over the sorted docIdBuffer and create ranges such that each range comprises of _numDocsPerRange.
+ * While
+ * </li>
+ * </ul>
+ */
+public final class RangeIndexCreator implements InvertedIndexCreator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexCreator.class);
+
+ //This will dump the content of temp buffers and ranges
+ private static final boolean TRACE = false;
+
+ private static final int RANGE_INDEX_VERSION = 1;
+
+ private static final int DEFAULT_NUM_RANGES = 20;
+
+ private static final String VALUE_BUFFER_SUFFIX = "val.buf";
+ private static final String DOC_ID_VALUE_BUFFER_SUFFIX = ".doc.id.buf";
+
+ //output file which will hold the range index
+ private final File _rangeIndexFile;
+
+ //File where the input values will be stored. This is a temp file that will be deleted at the end
+ private final File _tempValueBufferFile;
+ //pinot data buffer MMapped - maps the content of _tempValueBufferFile
+ private PinotDataBuffer _tempValueBuffer;
+ //a simple wrapper over _tempValueBuffer to make it easy to read/write any Number (INT,LONG, FLOAT, DOUBLE)
+ private NumberValueBuffer _numberValueBuffer;
+
+ //File where the docId will be stored. Temp file that will be deleted at the end
+ private final File _tempDocIdBufferFile;
+ //pinot data buffer MMapped - maps the content of _tempDocIdBufferFile
+ private PinotDataBuffer _docIdValueBuffer;
+ //a simple wrapper over _docIdValueBuffer to make it easy to read/write any INT
+ private IntValueBuffer _docIdBuffer;
+
+ private final int _numValues;
+ private int _nextDocId;
+ private int _nextValueId;
+ private int _numDocsPerRange;
+ private FieldSpec.DataType _valueType;
+
+ /**
+ *
+ * @param indexDir destination of the range index file
+ * @param fieldSpec fieldspec of the column to generate the range index
+ * @param valueType DataType of the column, INT if dictionary encoded, or INT, FLOAT, LONG, DOUBLE for raw encoded
+ * @param numRanges customize the number of ranges, if -1, we use DEFAULT_NUM_RANGES;
+ * @param numDocsPerRange customize the number of Docs Per Range, if -1 numDocsPerRange = totalValues/numRanges
+ * @param numDocs total number of documents
+ * @param numValues total number of values, used for Multi value columns (for single value columns numDocs== numValues)
+ * @throws IOException
+ */
+ public RangeIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType, int numRanges,
+ int numDocsPerRange, int numDocs, int numValues)
+ throws IOException {
+ _valueType = valueType;
+ String columnName = fieldSpec.getName();
+ _rangeIndexFile = new File(indexDir, columnName + BITMAP_RANGE_INDEX_FILE_EXTENSION);
+ _tempValueBufferFile = new File(indexDir, columnName + VALUE_BUFFER_SUFFIX);
+ _tempDocIdBufferFile = new File(indexDir, columnName + DOC_ID_VALUE_BUFFER_SUFFIX);
+ _numValues = fieldSpec.isSingleValueField() ? numDocs : numValues;
+ int valueSize = valueType.size();
+ try {
+ //use DEFAULT_NUM_RANGES if numRanges is not set
+ if (numRanges < 0) {
+ numRanges = DEFAULT_NUM_RANGES;
+ }
+ if (numDocsPerRange < 0) {
+ _numDocsPerRange = (int) Math.ceil(_numValues / numRanges);
+ }
+
+ //Value buffer to store the values added via add method
+ _tempValueBuffer = createTempBuffer((long) _numValues * valueSize, _tempValueBufferFile);
+
+ switch (_valueType) {
+ case INT:
+ _numberValueBuffer = new IntValueBuffer(_tempValueBuffer);
+ break;
+ case FLOAT:
+ _numberValueBuffer = new FloatValueBuffer(_tempValueBuffer);
+ break;
+ case LONG:
+ _numberValueBuffer = new LongValueBuffer(_tempValueBuffer);
+ break;
+ case DOUBLE:
+ _numberValueBuffer = new DoubleValueBuffer(_tempValueBuffer);
+ break;
+ default:
+ throw new UnsupportedOperationException("Range index is not supported for columns of data type:" + valueType);
+ }
+
+ //docId Buffer
+ _docIdValueBuffer = createTempBuffer((long) _numValues * Integer.BYTES, _tempDocIdBufferFile);
+ _docIdBuffer = new IntValueBuffer(_docIdValueBuffer);
+ } catch (Exception e) {
+ destroyBuffer(_tempValueBuffer, _tempValueBufferFile);
+ destroyBuffer(_tempValueBuffer, _tempDocIdBufferFile);
+ throw e;
+ }
+ }
+
+ @Override
+ public void add(int dictId) {
+ _numberValueBuffer.put(_nextDocId, dictId);
+ _docIdBuffer.put(_nextDocId, _nextDocId);
+ _nextDocId = _nextDocId + 1;
+ }
+
+ @Override
+ public void add(int[] dictIds, int length) {
+ for (int i = 0; i < length; i++) {
+ int dictId = dictIds[i];
+ _numberValueBuffer.put(_nextValueId, dictId);
+ _docIdBuffer.put(_nextValueId, _nextDocId);
+ _nextValueId = _nextValueId + 1;
+ }
+ _nextDocId = _nextDocId + 1;
+ }
+
+ @Override
+ public void addDoc(Object document, int docIdCounter) {
+ throw new IllegalStateException("Range index creator does not support Object type currently");
+ }
+
+ /**
+ * Generates the range Index file
+ * Sample output by running RangeIndexCreatorTest with TRACE=true and change log4.xml in core to info
+ * 15:18:47.330 RangeIndexCreator - Before sorting
+ * 15:18:47.333 RangeIndexCreator - DocIdBuffer [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, ]
+ * 15:18:47.333 RangeIndexCreator - ValueBuffer [ 3, 0, 0, 0, 3, 1, 3, 0, 2, 4, 4, 2, 4, 3, 2, 1, 0, 2, 0, 3, ]
+ * 15:18:47.371 RangeIndexCreator - After sorting
+ * 15:18:47.371 RangeIndexCreator - DocIdBuffer [ 16, 3, 1, 2, 7, 18, 15, 5, 14, 8, 17, 11, 0, 4, 6, 13, 19, 10, 9, 12, ]
+ * 15:18:47.371 RangeIndexCreator - ValueBuffer [ 0, 0, 0, 0, 0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4, ]
+ * 15:18:47.372 RangeIndexCreator - rangeOffsets = [ (0,5) ,(6,7) ,(8,11) ,(12,16) ,(17,19) , ]
+ * 15:18:47.372 RangeIndexCreator - rangeValues = [ (0,0) ,(1,1) ,(2,2) ,(3,3) ,(4,4) , ]
+ *
+ * @throws IOException
+ */
+ @Override
+ public void seal()
+ throws IOException {
+ if (TRACE) {
+ LOGGER.info("Before sorting");
+ dump();
+ }
+
+ //Sorts the value buffer while maintaining the mapping with the docId.
+ //The mapping is needed in the subsequent phase where we generate the bitmap for each range.
+ IntComparator comparator = (i, j) -> {
+ Number val1 = _numberValueBuffer.get(i);
+ Number val2 = _numberValueBuffer.get(j);
+ return _numberValueBuffer.compare(val1, val2);
+ };
+ Swapper swapper = (i, j) -> {
+ Number temp = _docIdBuffer.get(i).intValue();
+ _docIdBuffer.put(i, _docIdBuffer.get(j).intValue());
+ _docIdBuffer.put(j, temp);
+
+ Number tempValue = _numberValueBuffer.get(i);
+ _numberValueBuffer.put(i, _numberValueBuffer.get(j));
+ _numberValueBuffer.put(j, tempValue);
+ };
+ Arrays.quickSort(0, _numValues, comparator, swapper);
+
+ if (TRACE) {
+ LOGGER.info("After sorting");
+ dump();
+ }
+ //FIND THE RANGES
+ //go over the sorted value to compute ranges
+ List<Pair<Integer, Integer>> ranges = new ArrayList<>();
+
+ int boundary = _numDocsPerRange;
+ int start = 0;
+ for (int i = 0; i < _numValues; i++) {
+ if (i > start + boundary) {
+ if (comparator.compare(i, i - 1) != 0) {
+ ranges.add(new Pair(start, i - 1));
+ start = i;
+ }
+ }
+ }
+ ranges.add(new Pair(start, _numValues - 1));
+
+ //Dump ranges
+ if (TRACE) {
+ dumpRanges(ranges);
+ }
+ // RANGE INDEX FILE LAYOUT
+ //HEADER
+ // # VERSION (INT)
+ // # DATA_TYPE (String -> INT (length) (ACTUAL BYTES)
+ // # Number OF RANGES (INT)
+ // <RANGE START VALUE BUFFER> # (R + 1 )* ValueSize
+ // Range Start 0,
+ // .........
+ // Range Start R - 1
+ // Range MAX VALUE
+ // Bitmap for Range 0 Start Offset
+ // .....
+ // Bitmap for Range R Start Offset
+ //BODY
+ // Bitmap for range 0
+ // Bitmap for range 2
+ // ......
+ // Bitmap for range R - 1
+ long bytesWritten = 0;
+ try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(_rangeIndexFile));
+ DataOutputStream header = new DataOutputStream(bos);
+ FileOutputStream fos = new FileOutputStream(_rangeIndexFile);
+ DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(fos))) {
+
+ //VERSION
+ header.writeInt(RANGE_INDEX_VERSION);
+
+ bytesWritten += Integer.BYTES;
+
+ //value data type
+ byte[] valueDataTypeBytes = _valueType.name().getBytes(UTF_8);
+ header.writeInt(valueDataTypeBytes.length);
+ bytesWritten += Integer.BYTES;
+
+ header.write(valueDataTypeBytes);
+ bytesWritten += valueDataTypeBytes.length;
+
+ //Write the number of ranges
+ header.writeInt(ranges.size());
+ bytesWritten += Integer.BYTES;
+
+ //write the range start values
+ for (Pair<Integer, Integer> range : ranges) {
+ Number rangeStart = _numberValueBuffer.get(range.getFirst());
+ writeNumberToHeader(header, rangeStart);
+ }
+ bytesWritten += ranges.size() * _valueType.size(); // Range start values
+
+ Number lastRangeEnd = _numberValueBuffer.get(ranges.get(ranges.size() - 1).getSecond());
+ writeNumberToHeader(header, lastRangeEnd);
+ bytesWritten += _valueType.size(); // Last range end value
+
+ //compute the offset where the bitmap for the first range would be written
+ //bitmap start offset for each range, one extra to make it easy to get the length for last one.
+ long bitmapOffsetHeaderSize = (ranges.size() + 1) * Long.BYTES;
+
+ long bitmapOffset = bytesWritten + bitmapOffsetHeaderSize;
+ header.writeLong(bitmapOffset);
+ bytesWritten += Long.BYTES;
+ fos.getChannel().position(bitmapOffset);
+
+ for (int i = 0; i < ranges.size(); i++) {
+ Pair<Integer, Integer> range = ranges.get(i);
+ MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+ for (int index = range.getFirst(); index <= range.getSecond(); index++) {
+ bitmap.add(_docIdBuffer.get(index).intValue());
+ }
+ // Write offset and bitmap into file
+ int sizeInBytes = bitmap.serializedSizeInBytes();
+ bitmapOffset += sizeInBytes;
+
+ // Check for int overflow
+ Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _rangeIndexFile);
+
+ header.writeLong(bitmapOffset);
+ bytesWritten += Long.BYTES;
+
+ byte[] bytes = new byte[sizeInBytes];
+ bitmap.serialize(ByteBuffer.wrap(bytes));
+ dataOutputStream.write(bytes);
+ bytesWritten += bytes.length;
+ }
+ } catch (IOException e) {
+ FileUtils.deleteQuietly(_rangeIndexFile);
+ throw e;
+ }
+ Preconditions.checkState(bytesWritten == _rangeIndexFile.length(),
+ "Length of inverted index file: " + _rangeIndexFile.length() + " does not match the number of bytes written: "
+ + bytesWritten);
+ }
+
+ private void writeNumberToHeader(DataOutputStream header, Number number)
+ throws IOException {
+ switch (_valueType) {
+ case INT:
+ header.writeInt(number.intValue());
+ break;
+ case LONG:
+ header.writeLong(number.longValue());
+ break;
+ case FLOAT:
+ header.writeFloat(number.floatValue());
+ break;
+ case DOUBLE:
+ header.writeDouble(number.doubleValue());
+ break;
+ default:
+ throw new RuntimeException("Range index not supported for dataType: " + _valueType);
+ }
+ }
+
+ private void dumpRanges(List<Pair<Integer, Integer>> ranges) {
+ StringBuilder rangeOffsets = new StringBuilder("[ ");
+ StringBuilder rangeValues = new StringBuilder("[ ");
+ for (Pair<Integer, Integer> range : ranges) {
+ rangeOffsets.append("(").append(range.getFirst()).append(",").append(range.getSecond()).append(") ,");
+ rangeValues.append("(").append(_numberValueBuffer.get(range.getFirst())).append(",")
+ .append(_numberValueBuffer.get(range.getSecond())).append(") ,");
+ }
+ rangeOffsets.append(" ]");
+ rangeValues.append(" ]");
+ LOGGER.info("rangeOffsets = " + rangeOffsets);
+ LOGGER.info("rangeValues = " + rangeValues);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ org.apache.pinot.common.utils.FileUtils.close(new DataBufferAndFile(_tempValueBuffer, _tempValueBufferFile),
+ new DataBufferAndFile(_docIdValueBuffer, _tempDocIdBufferFile));
+ }
+
+ private class DataBufferAndFile implements Closeable {
+ private final PinotDataBuffer _dataBuffer;
+ private final File _file;
+
+ DataBufferAndFile(final PinotDataBuffer buffer, final File file) {
+ _dataBuffer = buffer;
+ _file = file;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ destroyBuffer(_dataBuffer, _file);
+ }
+ }
+
+ void dump() {
+ StringBuilder docIdAsString = new StringBuilder("DocIdBuffer [ ");
+ for (int i = 0; i < _numValues; i++) {
+ docIdAsString.append(_docIdBuffer.get(i) + ", ");
+ }
+ docIdAsString.append("]");
+ LOGGER.info(docIdAsString.toString());
+
+ StringBuilder valuesAsString = new StringBuilder("ValueBuffer [ ");
+ for (int i = 0; i < _numValues; i++) {
+ valuesAsString.append(_numberValueBuffer.get(i) + ", ");
+ }
+ valuesAsString.append("] ");
+ LOGGER.info(valuesAsString.toString());
+
+ }
+
+ private PinotDataBuffer createTempBuffer(long size, File mmapFile)
+ throws IOException {
+ return PinotDataBuffer
+ .mapFile(mmapFile, false, 0, size, PinotDataBuffer.NATIVE_ORDER, "RangeIndexCreator: temp buffer");
+ }
+
+ private void destroyBuffer(PinotDataBuffer buffer, File mmapFile)
+ throws IOException {
+ if (buffer != null) {
+ buffer.close();
+ if (mmapFile.exists()) {
+ FileUtils.forceDelete(mmapFile);
+ }
+ }
+ }
+
+ interface NumberValueBuffer {
+
+ void put(int position, Number value);
+
+ Number get(int position);
+
+ int compare(Number val1, Number val2);
+ }
+
+ class IntValueBuffer implements NumberValueBuffer {
+
+ private PinotDataBuffer _buffer;
+
+ IntValueBuffer(PinotDataBuffer buffer) {
+
+ _buffer = buffer;
+ }
+
+ @Override
+ public void put(int position, Number value) {
+ _buffer.putInt(position << 2, value.intValue());
+ }
+
+ @Override
+ public Number get(int position) {
+ return _buffer.getInt(position << 2);
+ }
+
+ @Override
+ public int compare(Number val1, Number val2) {
+ return Integer.compare(val1.intValue(), val2.intValue());
+ }
+ }
+
+ class LongValueBuffer implements NumberValueBuffer {
+
+ private PinotDataBuffer _buffer;
+
+ LongValueBuffer(PinotDataBuffer buffer) {
+
+ _buffer = buffer;
+ }
+
+ @Override
+ public void put(int position, Number value) {
+ _buffer.putInt(position << 3, value.intValue());
+ }
+
+ @Override
+ public Number get(int position) {
+ return _buffer.getInt(position << 3);
+ }
+
+ @Override
+ public int compare(Number val1, Number val2) {
+ return Long.compare(val1.longValue(), val2.longValue());
+ }
+ }
+
+ class FloatValueBuffer implements NumberValueBuffer {
+
+ private PinotDataBuffer _buffer;
+
+ FloatValueBuffer(PinotDataBuffer buffer) {
+
+ _buffer = buffer;
+ }
+
+ @Override
+ public void put(int position, Number value) {
+ _buffer.putInt(position << 2, value.intValue());
+ }
+
+ @Override
+ public Number get(int position) {
+ return _buffer.getInt(position << 2);
+ }
+
+ @Override
+ public int compare(Number val1, Number val2) {
+ return Long.compare(val1.longValue(), val2.longValue());
+ }
+ }
+
+ class DoubleValueBuffer implements NumberValueBuffer {
+
+ private PinotDataBuffer _buffer;
+
+ DoubleValueBuffer(PinotDataBuffer buffer) {
+
+ _buffer = buffer;
+ }
+
+ @Override
+ public void put(int position, Number value) {
+ _buffer.putInt(position << 3, value.intValue());
+ }
+
+ @Override
+ public Number get(int position) {
+ return _buffer.getInt(position << 3);
+ }
+
+ @Override
+ public int compare(Number val1, Number val2) {
+ return Long.compare(val1.longValue(), val2.longValue());
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
index 531c3fc..ab9fbfc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
@@ -41,6 +41,11 @@ public interface ColumnIndexContainer {
InvertedIndexReader getInvertedIndex();
/**
+ * Returns the inverted index for the column, or {@code null} if it does not exist.
+ */
+ InvertedIndexReader getRangeIndex();
+
+ /**
* Returns the dictionary for the column, or {@code null} if it does not exist.
*/
Dictionary getDictionary();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index aff965d..eb33185 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -33,6 +33,7 @@ import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
import org.apache.pinot.core.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
import org.apache.pinot.core.segment.index.readers.BytesDictionary;
import org.apache.pinot.core.segment.index.readers.DoubleDictionary;
@@ -61,6 +62,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
private final DataFileReader _forwardIndex;
private final InvertedIndexReader _invertedIndex;
+ private final InvertedIndexReader _rangeIndex;
private final BaseImmutableDictionary _dictionary;
private final BloomFilterReader _bloomFilterReader;
private final NullValueVectorReaderImpl _nullValueVectorReader;
@@ -70,11 +72,13 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
throws IOException {
String columnName = metadata.getColumnName();
boolean loadInvertedIndex = false;
+ boolean loadRangeIndex = false;
boolean loadTextIndex = false;
boolean loadOnHeapDictionary = false;
boolean loadBloomFilter = false;
if (indexLoadingConfig != null) {
loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
+ loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName);
loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
loadBloomFilter = indexLoadingConfig.getBloomFilterColumns().contains(columnName);
loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName);
@@ -108,6 +112,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
new SortedIndexReaderImpl(fwdIndexBuffer, metadata.getCardinality());
_forwardIndex = sortedIndexReader;
_invertedIndex = sortedIndexReader;
+ _rangeIndex = null;
return;
} else {
// Unsorted
@@ -124,14 +129,21 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
_invertedIndex =
new BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX),
metadata.getCardinality());
+ _rangeIndex = null;
+ } else if (loadRangeIndex) {
+ _invertedIndex = null;
+ _rangeIndex =
+ new RangeIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.RANGE_INDEX));
} else {
_invertedIndex = null;
+ _rangeIndex = null;
}
} else {
// Raw index
_forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
_dictionary = null;
_bloomFilterReader = null;
+ _rangeIndex = null;
if (loadTextIndex) {
Map<String, Map<String, String>> columnProperties = indexLoadingConfig.getColumnProperties();
_invertedIndex = new LuceneTextIndexReader(columnName, segmentIndexDir, metadata.getTotalDocs(),
@@ -153,6 +165,11 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
}
@Override
+ public InvertedIndexReader getRangeIndex() {
+ return _rangeIndex;
+ }
+
+ @Override
public BaseImmutableDictionary getDictionary() {
return _dictionary;
}
@@ -167,7 +184,8 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
return _nullValueVectorReader;
}
- private static BaseImmutableDictionary loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata,
+ //TODO: move this to a DictionaryLoader class
+ public static BaseImmutableDictionary loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata,
boolean loadOnHeap) {
FieldSpec.DataType dataType = metadata.getDataType();
if (loadOnHeap) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
index 4fc99cb..fc32487 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
@@ -39,17 +39,20 @@ public abstract class BaseDataSource extends DataSource {
private final DataFileReader _forwardIndex;
private final Dictionary _dictionary;
private final InvertedIndexReader _invertedIndex;
+ private final InvertedIndexReader _rangeIndex;
private final BloomFilterReader _bloomFilter;
private final NullValueVectorReader _nullValueVector;
private final String _operatorName;
public BaseDataSource(DataSourceMetadata dataSourceMetadata, DataFileReader forwardIndex,
@Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex,
- @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector, String operatorName) {
+ @Nullable InvertedIndexReader rangeIndex, @Nullable BloomFilterReader bloomFilter,
+ @Nullable NullValueVectorReader nullValueVector, String operatorName) {
_dataSourceMetadata = dataSourceMetadata;
_forwardIndex = forwardIndex;
_dictionary = dictionary;
_invertedIndex = invertedIndex;
+ _rangeIndex = rangeIndex;
_bloomFilter = bloomFilter;
_nullValueVector = nullValueVector;
_operatorName = operatorName;
@@ -79,6 +82,12 @@ public abstract class BaseDataSource extends DataSource {
@Nullable
@Override
+ public InvertedIndexReader getRangeIndex() {
+ return _rangeIndex;
+ }
+
+ @Nullable
+ @Override
public BloomFilterReader getBloomFilter() {
return _bloomFilter;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
index a7009a8..b4b0f5a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
@@ -35,7 +35,7 @@ public class ImmutableDataSource extends BaseDataSource {
public ImmutableDataSource(ColumnMetadata columnMetadata, ColumnIndexContainer columnIndexContainer) {
super(new ImmutableDataSourceMetadata(columnMetadata), columnIndexContainer.getForwardIndex(),
- columnIndexContainer.getDictionary(), columnIndexContainer.getInvertedIndex(),
+ columnIndexContainer.getDictionary(), columnIndexContainer.getInvertedIndex(), columnIndexContainer.getRangeIndex(),
columnIndexContainer.getBloomFilter(), columnIndexContainer.getNullValueVector(),
OPERATOR_NAME_PREFIX + columnMetadata.getColumnName());
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index cf8e8bb..70610da 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -40,10 +40,10 @@ public class MutableDataSource extends BaseDataSource {
public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
@Nullable PartitionFunction partitionFunction, int partitionId, DataFileReader forwardIndex,
- @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex,
+ @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
@Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) {
super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction,
- partitionId), forwardIndex, dictionary, invertedIndex, bloomFilter, nullValueVector,
+ partitionId), forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector,
OPERATOR_NAME_PREFIX + fieldSpec.getName());
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 8b85d73..6dac2ec 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -46,6 +46,7 @@ public class IndexLoadingConfig {
private List<String> _sortedColumns = Collections.emptyList();
private Set<String> _invertedIndexColumns = new HashSet<>();
private Set<String> _textIndexColumns = new HashSet<>();
+ private Set<String> _rangeIndexColumns = new HashSet<>();
private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
private Map<String, String> _noDictionaryConfig = new HashMap<>();
private Set<String> _varLengthDictionaryColumns = new HashSet<>();
@@ -86,6 +87,11 @@ public class IndexLoadingConfig {
_invertedIndexColumns.addAll(invertedIndexColumns);
}
+ List<String> rangeIndexColumns = indexingConfig.getRangeIndexColumns();
+ if (rangeIndexColumns != null) {
+ _rangeIndexColumns.addAll(rangeIndexColumns);
+ }
+
List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
if (bloomFilterColumns != null) {
_bloomFilterColumns.addAll(bloomFilterColumns);
@@ -204,6 +210,10 @@ public class IndexLoadingConfig {
@Nonnull
public Set<String> getInvertedIndexColumns() {
return _invertedIndexColumns;
+ } @Nonnull
+
+ public Set<String> getRangeIndexColumns() {
+ return _rangeIndexColumns;
}
@Nonnull
@@ -235,6 +245,14 @@ public class IndexLoadingConfig {
}
/**
+ * For tests only.
+ */
+ @VisibleForTesting
+ public void setRangeIndexColumns(@Nonnull Set<String> rangeIndexColumns) {
+ _rangeIndexColumns = rangeIndexColumns;
+ }
+
+ /**
* Used directly from text search unit test code since the test code
* doesn't really have a table config and is directly testing the
* query execution code of text search using data from generated segments
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
index 564321b..89432ab 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax
import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory;
import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler;
import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.core.segment.store.SegmentDirectory;
@@ -99,6 +100,11 @@ public class SegmentPreProcessor implements AutoCloseable {
new InvertedIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
invertedIndexHandler.createInvertedIndices();
+ // Create column inverted indices according to the index config.
+ RangeIndexHandler rangeIndexHandler =
+ new RangeIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
+ rangeIndexHandler.createRangeIndices();
+
Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns();
if (textIndexColumns.size() > 0) {
TextIndexHandler textIndexHandler =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
new file mode 100644
index 0000000..e7685a7
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
+import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader;
+import org.apache.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.creator.impl.inv.RangeIndexCreator;
+import org.apache.pinot.core.segment.index.column.PhysicalColumnIndexContainer;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RangeIndexHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexHandler.class);
+
+ private final File _indexDir;
+ private final SegmentDirectory.Writer _segmentWriter;
+ private final String _segmentName;
+ private final SegmentVersion _segmentVersion;
+ private final Set<ColumnMetadata> _rangeIndexColumns = new HashSet<>();
+
+ public RangeIndexHandler(@Nonnull File indexDir, @Nonnull SegmentMetadataImpl segmentMetadata,
+ @Nonnull IndexLoadingConfig indexLoadingConfig, @Nonnull SegmentDirectory.Writer segmentWriter) {
+ _indexDir = indexDir;
+ _segmentWriter = segmentWriter;
+ _segmentName = segmentMetadata.getName();
+ _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+ // Do not create inverted index for sorted column
+ for (String column : indexLoadingConfig.getRangeIndexColumns()) {
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+ if (columnMetadata != null && !columnMetadata.isSorted()) {
+ _rangeIndexColumns.add(columnMetadata);
+ }
+ }
+ }
+
+ public void createRangeIndices()
+ throws IOException {
+ for (ColumnMetadata columnMetadata : _rangeIndexColumns) {
+ createRangeIndexForColumn(columnMetadata);
+ }
+ }
+
+ private void createRangeIndexForColumn(ColumnMetadata columnMetadata)
+ throws IOException {
+ //Range index supported only for dictionary encoded columns for now
+ if (!columnMetadata.hasDictionary()) {
+ LOGGER.warn("Skipping creation of Range index for column:{}. It's only supported for dictionary encoded columns",
+ columnMetadata.getColumnName());
+
+ return;
+ }
+ String column = columnMetadata.getColumnName();
+
+ File inProgress = new File(_indexDir, column + ".range.inprogress");
+ File rangeIndexFile = new File(_indexDir, column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+
+ if (_segmentWriter.hasIndexFor(column, ColumnIndexType.RANGE_INDEX)) {
+ // Skip creating range index if already exists.
+
+ LOGGER.info("Found Range index for segment: {}, column: {}", _segmentName, column);
+ return;
+ }
+
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run gets interrupted.
+ // Remove inverted index if exists.
+ // For v1 and v2, it's the actual inverted index. For v3, it's the temporary inverted index.
+ FileUtils.deleteQuietly(rangeIndexFile);
+ }
+
+ // Create new inverted index for the column.
+ LOGGER.info("Creating new range index for segment: {}, column: {}", _segmentName, column);
+ int numDocs = columnMetadata.getTotalDocs();
+
+ PinotDataBuffer dictBuffer = _segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
+ BaseImmutableDictionary dictionary = PhysicalColumnIndexContainer.loadDictionary(dictBuffer, columnMetadata, false);
+ handleDictionaryBasedColumn(dictionary, columnMetadata, numDocs);
+
+ // For v3, write the generated inverted index file into the single file and remove it.
+ if (_segmentVersion == SegmentVersion.v3) {
+ LoaderUtils.writeIndexToV3Format(_segmentWriter, column, rangeIndexFile, ColumnIndexType.RANGE_INDEX);
+ }
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created inverted index for segment: {}, column: {}", _segmentName, column);
+ }
+
+ private void handleDictionaryBasedColumn(BaseImmutableDictionary dictionary, ColumnMetadata columnMetadata,
+ int numDocs)
+ throws IOException {
+ try (RangeIndexCreator creator = new RangeIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+ FieldSpec.DataType.INT, -1, -1, numDocs, columnMetadata.getTotalNumberOfEntries())) {
+ try (DataFileReader fwdIndex = getForwardIndexReader(columnMetadata, _segmentWriter)) {
+ if (columnMetadata.isSingleValue()) {
+ // Single-value column.
+ FixedBitSingleValueReader svFwdIndex = (FixedBitSingleValueReader) fwdIndex;
+ for (int i = 0; i < numDocs; i++) {
+ creator.add(svFwdIndex.getInt(i));
+ }
+ } else {
+ // Multi-value column.
+ SingleColumnMultiValueReader mvFwdIndex = (SingleColumnMultiValueReader) fwdIndex;
+ int[] dictIds = new int[columnMetadata.getMaxNumberOfMultiValues()];
+ for (int i = 0; i < numDocs; i++) {
+ int length = mvFwdIndex.getIntArray(i, dictIds);
+ creator.add(dictIds, length);
+ }
+ }
+ creator.seal();
+ }
+ }
+ }
+
+ private DataFileReader getForwardIndexReader(ColumnMetadata columnMetadata, SegmentDirectory.Writer segmentWriter)
+ throws IOException {
+ PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX);
+ int numRows = columnMetadata.getTotalDocs();
+ int numBitsPerValue = columnMetadata.getBitsPerElement();
+ if (columnMetadata.isSingleValue()) {
+ return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue);
+ } else {
+ return new FixedBitMultiValueReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(), numBitsPerValue);
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
index 8ee589a..d01406c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
@@ -96,6 +96,8 @@ public interface SegmentMetadata {
String getBitmapInvertedIndexFileName(String column);
+ String getBitmapRangeIndexFileName(String column);
+
String getBloomFilterFileName(String column);
String getNullValueVectorFileName(String column);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
index 5b58f9e..3bbdcdd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
@@ -452,6 +452,11 @@ public class SegmentMetadataImpl implements SegmentMetadata {
}
@Override
+ public String getBitmapRangeIndexFileName(String column) {
+ return column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+ }
+
+ @Override
public String getBloomFilterFileName(String column) {
return column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/RangeIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/RangeIndexReader.java
new file mode 100644
index 0000000..b4d1a77
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/RangeIndexReader.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RangeIndexReader implements InvertedIndexReader<ImmutableRoaringBitmap> {
+ public static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexReader.class);
+
+ private final PinotDataBuffer _buffer;
+ private final int _version;
+ private final FieldSpec.DataType _valueType;
+ private final int _numRanges;
+ final long _bitmapIndexOffset;
+ private final Number[] _rangeStartArray;
+ private final Number _lastRangeEnd;
+
+ private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps = null;
+
+ /**
+ * Constructs an inverted index with the specified size.
+ * @param indexDataBuffer data buffer for the inverted index.
+ */
+ public RangeIndexReader(PinotDataBuffer indexDataBuffer) {
+ long offset = 0;
+ _buffer = indexDataBuffer;
+ //READER VERSION
+ _version = _buffer.getInt(offset);
+ offset += Integer.BYTES;
+
+ //READ THE VALUE TYPE (INT, LONG, DOUBLE, FLOAT)
+ int valueTypeBytesLength = _buffer.getInt(offset);
+ offset += Integer.BYTES;
+ byte[] valueTypeBytes = new byte[valueTypeBytesLength];
+ _buffer.copyTo(offset, valueTypeBytes);
+ offset += valueTypeBytesLength;
+ _valueType = FieldSpec.DataType.valueOf(new String(valueTypeBytes));
+
+ //READ THE NUMBER OF RANGES
+ _numRanges = _buffer.getInt(offset);
+ offset += Integer.BYTES;
+ long rangeArrayStartOffset = offset;
+
+ _rangeStartArray = new Number[_numRanges];
+ final long lastOffset = _buffer.getLong(offset + (_numRanges + 1) * _valueType.size() + _numRanges * Long.BYTES);
+
+ _bitmapIndexOffset = offset + (_numRanges + 1) * _valueType.size();
+
+ Preconditions.checkState(lastOffset == _buffer.size(),
+ "The last offset should be equal to buffer size! Current lastOffset: " + lastOffset + ", buffer size: "
+ + _buffer.size());
+ switch (_valueType) {
+ case INT:
+ for (int i = 0; i < _numRanges; i++) {
+ _rangeStartArray[i] = _buffer.getInt(rangeArrayStartOffset + i * Integer.BYTES);
+ }
+ _lastRangeEnd = _buffer.getInt(rangeArrayStartOffset + _numRanges * Integer.BYTES);
+ break;
+ case LONG:
+ for (int i = 0; i < _numRanges; i++) {
+ _rangeStartArray[i] = _buffer.getLong(rangeArrayStartOffset + i * Long.BYTES);
+ }
+ _lastRangeEnd = _buffer.getLong(rangeArrayStartOffset + _numRanges * Long.BYTES);
+ break;
+ case FLOAT:
+ for (int i = 0; i < _numRanges; i++) {
+ _rangeStartArray[i] = _buffer.getFloat(rangeArrayStartOffset + i * Float.BYTES);
+ }
+ _lastRangeEnd = _buffer.getFloat(rangeArrayStartOffset + _numRanges * Float.BYTES);
+ break;
+ case DOUBLE:
+ for (int i = 0; i < _numRanges; i++) {
+ _rangeStartArray[i] = _buffer.getDouble(rangeArrayStartOffset + i * Double.BYTES);
+ }
+ _lastRangeEnd = _buffer.getDouble(rangeArrayStartOffset + _numRanges * Double.BYTES);
+ break;
+ default:
+ throw new RuntimeException("Range Index Unsupported for dataType:" + _valueType);
+ }
+ }
+
+ @VisibleForTesting
+ public Number[] getRangeStartArray() {
+ return _rangeStartArray;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ImmutableRoaringBitmap getDocIds(int rangeId) {
+ SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
+ // Return the bitmap if it's still on heap
+ if (_bitmaps != null) {
+ bitmapArrayReference = _bitmaps.get();
+ if (bitmapArrayReference != null) {
+ SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[rangeId];
+ if (bitmapReference != null) {
+ ImmutableRoaringBitmap value = bitmapReference.get();
+ if (value != null) {
+ return value;
+ }
+ }
+ } else {
+ bitmapArrayReference = new SoftReference[_numRanges];
+ _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+ }
+ } else {
+ bitmapArrayReference = new SoftReference[_numRanges];
+ _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+ }
+ synchronized (this) {
+ ImmutableRoaringBitmap value;
+ if (bitmapArrayReference[rangeId] == null || bitmapArrayReference[rangeId].get() == null) {
+ value = buildRoaringBitmapForIndex(rangeId);
+ bitmapArrayReference[rangeId] = new SoftReference<ImmutableRoaringBitmap>(value);
+ } else {
+ value = bitmapArrayReference[rangeId].get();
+ }
+ return value;
+ }
+ }
+
+ @Override
+ public ImmutableRoaringBitmap getDocIds(Object value) {
+ // This should not be called from anywhere. If it happens, there is a bug
+ // and that's why we throw illegal state exception
+ throw new IllegalStateException("bitmap inverted index reader supports lookup only on dictionary id");
+ }
+
+ private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int rangeId) {
+ final long currentOffset = getOffset(rangeId);
+ final long nextOffset = getOffset(rangeId + 1);
+ final int bufferLength = (int) (nextOffset - currentOffset);
+
+ // Slice the buffer appropriately for Roaring Bitmap
+ ByteBuffer bb = _buffer.toDirectByteBuffer(currentOffset, bufferLength);
+ return new ImmutableRoaringBitmap(bb);
+ }
+
+ private long getOffset(final int rangeId) {
+ return _buffer.getLong(_bitmapIndexOffset + rangeId * Long.BYTES);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _buffer.close();
+ }
+
+ /**
+ * Find the rangeIndex that the value falls in
+ * Note this assumes that the value is in one of the range.
+ * @param value
+ * @return
+ */
+ public int findRangeId(int value) {
+ for (int i = 0; i < _rangeStartArray.length; i++) {
+ if (value < _rangeStartArray[i].intValue()) {
+ return i - 1;
+ }
+ }
+ if (value <= _lastRangeEnd.intValue()) {
+ return _rangeStartArray.length - 1;
+ }
+ return -1;
+ }
+
+ public int findRangeId(long value) {
+ for (int i = 0; i < _rangeStartArray.length; i++) {
+ if (value < _rangeStartArray[i].longValue()) {
+ return i - 1;
+ }
+ }
+ if (value <= _lastRangeEnd.longValue()) {
+ return _rangeStartArray.length - 1;
+ }
+ return -1;
+ }
+
+ public int findRangeId(float value) {
+ for (int i = 0; i < _rangeStartArray.length; i++) {
+ if (value < _rangeStartArray[i].floatValue()) {
+ return i - 1;
+ }
+ }
+ if (value <= _lastRangeEnd.floatValue()) {
+ return _rangeStartArray.length - 1;
+ }
+ return -1;
+ }
+
+ public int findRangeId(double value) {
+ for (int i = 0; i < _rangeStartArray.length; i++) {
+ if (value < _rangeStartArray[i].doubleValue()) {
+ return i - 1;
+ }
+ }
+ if (value <= _lastRangeEnd.doubleValue()) {
+ return _rangeStartArray.length - 1;
+ }
+ return -1;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
index bdb3b6a..dcd21df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
@@ -24,7 +24,8 @@ public enum ColumnIndexType {
INVERTED_INDEX("inverted_index"),
BLOOM_FILTER("bloom_filter"),
NULLVALUE_VECTOR("nullvalue_vector"),
- TEXT_INDEX("text_index");
+ TEXT_INDEX("text_index"),
+ RANGE_INDEX("range_index");
private final String indexName;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
index d5e8311..2b1328f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -123,6 +123,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
case INVERTED_INDEX:
filename = metadata.getBitmapInvertedIndexFileName(column);
break;
+ case RANGE_INDEX:
+ filename = metadata.getBitmapRangeIndexFileName(column);
+ break;
case BLOOM_FILTER:
filename = metadata.getBloomFilterFileName(column);
break;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
index de2af42..c91500d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
@@ -52,6 +52,11 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer {
}
@Override
+ public InvertedIndexReader getRangeIndex() {
+ return null;
+ }
+
+ @Override
public Dictionary getDictionary() {
return _dictionary;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
index da13528..0e6e077 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
@@ -25,6 +25,7 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
import org.apache.pinot.core.segment.index.datasource.BaseDataSource;
import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.spi.data.FieldSpec;
@@ -33,7 +34,7 @@ public class StarTreeDataSource extends BaseDataSource {
public StarTreeDataSource(FieldSpec fieldSpec, int numDocs, SingleColumnSingleValueReader forwardIndex,
@Nullable Dictionary dictionary) {
- super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null, null,
+ super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null,null, null,
OPERATOR_NAME_PREFIX + fieldSpec.getName());
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
index 9e85b24..a0a5b95 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -110,17 +110,17 @@ public class RealtimeNoDictionaryTest {
Map<String, DataSource> dataSourceBlock = new HashMap<>();
dataSourceBlock.put(INT_COL_NAME,
- new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, intRawIndex, null, null, null, null));
+ new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, intRawIndex, null, null, null,null, null));
dataSourceBlock.put(LONG_COL_NAME,
- new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, longRawIndex, null, null, null, null));
+ new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, longRawIndex, null, null, null,null, null));
dataSourceBlock.put(FLOAT_COL_NAME,
- new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, floatRawIndex, null, null, null, null));
+ new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, floatRawIndex, null, null, null,null, null));
dataSourceBlock.put(DOUBLE_COL_NAME,
- new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, doubleRawIndex, null, null, null, null));
+ new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, doubleRawIndex, null, null, null,null, null));
dataSourceBlock.put(STRING_COL_NAME,
- new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, stringRawIndex, null, null, null, null));
+ new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, stringRawIndex, null, null, null,null, null));
dataSourceBlock.put(BYTES_COL_NAME,
- new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, bytesRawIndex, null, null, null, null));
+ new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, bytesRawIndex, null, null, null,null, null));
return new DataFetcher(dataSourceBlock);
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java
new file mode 100644
index 0000000..0a4c171
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.creator;
+
+import com.google.common.base.Preconditions;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.pinot.core.segment.creator.impl.inv.RangeIndexCreator;
+import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+
+
+/**
+ * Class for testing Range index.
+ */
+public class RangeIndexCreatorTest {
+
+ @Test
+ public void testInt()
+ throws Exception {
+ File indexDir = new File(System.getProperty("java.io.tmpdir") + "/testRangeIndex");
+ indexDir.mkdirs();
+ FieldSpec fieldSpec = new MetricFieldSpec();
+ fieldSpec.setDataType(FieldSpec.DataType.INT);
+ String columnName = "latency";
+ fieldSpec.setName(columnName);
+ int cardinality = 20;
+ int numDocs = 1000;
+ int numValues = 1000;
+ RangeIndexCreator creator =
+ new RangeIndexCreator(indexDir, fieldSpec, FieldSpec.DataType.INT, -1, -1, numDocs, numValues);
+ Random r = new Random();
+ Number[] values = new Number[numValues];
+ for (int i = 0; i < numDocs; i++) {
+ int val = r.nextInt(cardinality);
+ creator.add(val);
+ values[i] = val;
+ }
+ creator.seal();
+
+ File rangeIndexFile = new File(indexDir, columnName + BITMAP_RANGE_INDEX_FILE_EXTENSION);
+ //TEST THE BUFFER FORMAT
+
+ testRangeIndexBufferFormat(values, rangeIndexFile);
+
+ //TEST USING THE READER
+ PinotDataBuffer pinotDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(rangeIndexFile);
+ RangeIndexReader rangeIndexReader = new RangeIndexReader(pinotDataBuffer);
+ Number[] rangeStartArray = rangeIndexReader.getRangeStartArray();
+ for (int rangeId = 0; rangeId < rangeStartArray.length; rangeId++) {
+ ImmutableRoaringBitmap bitmap = rangeIndexReader.getDocIds(rangeId);
+ for (int docId : bitmap.toArray()) {
+ checkInt(rangeStartArray, rangeId, values, docId);
+ }
+ }
+ }
+
+ private void checkInt(Number[] rangeStartArray, int rangeId, Number[] values, int docId) {
+ if (rangeId != rangeStartArray.length - 1) {
+ Assert.assertTrue(
+ rangeStartArray[rangeId].intValue() <= values[docId].intValue() && values[docId].intValue() < rangeStartArray[
+ rangeId + 1].intValue(), "rangestart:" + rangeStartArray[rangeId] + " value:" + values[docId]);
+ } else {
+ Assert.assertTrue(rangeStartArray[rangeId].intValue() <= values[docId].intValue(),
+ "rangestart:" + rangeStartArray[rangeId] + " value:" + values[docId]);
+ }
+ }
+
+ private void testRangeIndexBufferFormat(Number[] values, File rangeIndexFile)
+ throws IOException {
+ DataInputStream dis = new DataInputStream(new FileInputStream(rangeIndexFile));
+ int version = dis.readInt();
+ int valueTypeBytesLength = dis.readInt();
+
+ byte[] valueTypeBytes = new byte[valueTypeBytesLength];
+ dis.read(valueTypeBytes);
+ String name = new String(valueTypeBytes);
+ FieldSpec.DataType dataType = FieldSpec.DataType.valueOf(name);
+
+ int numRanges = dis.readInt();
+
+ Number[] rangeStart = new Number[numRanges];
+ Number rangeEnd;
+
+ switch (dataType) {
+ case INT:
+ for (int i = 0; i < numRanges; i++) {
+ rangeStart[i] = dis.readInt();
+ }
+ rangeEnd = dis.readInt();
+ break;
+ case LONG:
+ for (int i = 0; i < numRanges; i++) {
+ rangeStart[i] = dis.readLong();
+ }
+ rangeEnd = dis.readLong();
+ break;
+ case FLOAT:
+ for (int i = 0; i < numRanges; i++) {
+ rangeStart[i] = dis.readFloat();
+ }
+ rangeEnd = dis.readFloat();
+ break;
+ case DOUBLE:
+ for (int i = 0; i < numRanges; i++) {
+ rangeStart[i] = dis.readDouble();
+ }
+ rangeEnd = dis.readDouble();
+ break;
+ }
+
+ long[] rangeBitmapOffsets = new long[numRanges + 1];
+ for (int i = 0; i <= numRanges; i++) {
+ rangeBitmapOffsets[i] = dis.readLong();
+ }
+ ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[numRanges];
+ for (int i = 0; i < numRanges; i++) {
+ long serializedBitmapLength;
+ serializedBitmapLength = rangeBitmapOffsets[i + 1] - rangeBitmapOffsets[i];
+ byte[] bytes = new byte[(int) serializedBitmapLength];
+ dis.read(bytes, 0, (int) serializedBitmapLength);
+ bitmaps[i] = new ImmutableRoaringBitmap(ByteBuffer.wrap(bytes));
+ for (int docId : bitmaps[i].toArray()) {
+ if (i != numRanges - 1) {
+ Assert.assertTrue(
+ rangeStart[i].intValue() <= values[docId].intValue() && values[docId].intValue() < rangeStart[i + 1]
+ .intValue(), "rangestart:" + rangeStart[i] + " value:" + values[docId]);
+ } else {
+ Assert.assertTrue(rangeStart[i].intValue() <= values[docId].intValue());
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index ced3014..4e74c42 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,6 +71,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
private static final List<String> DEFAULT_RAW_INDEX_COLUMNS =
Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime");
private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin");
+ private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Arrays.asList("", "Origin");
protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
protected final File _avroDir = new File(_tempDir, "avroDir");
@@ -172,6 +173,11 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
}
@Nullable
+ protected List<String> getRangeIndexColumns() {
+ return DEFAULT_RANGE_INDEX_COLUMNS;
+ }
+
+ @Nullable
protected List<String> getRawIndexColumns() {
return DEFAULT_RAW_INDEX_COLUMNS;
}
@@ -420,7 +426,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
addRealtimeTable(tableName, useLLC, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName,
getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(), getInvertedIndexColumns(),
- getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(),
+ getBloomFilterIndexColumns(), getRangeIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(),
numReplicas);
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 3c0ab89..19fe79c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -293,40 +293,43 @@ public abstract class ClusterTest extends ControllerTest {
protected void addOfflineTable(String tableName, SegmentVersion segmentVersion)
throws Exception {
- addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null, null, null);
+ addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null, null, null, null);
}
protected void addOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
- List<String> bloomFilterColumns, TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig,
- String sortedColumn)
+ List<String> bloomFilterColumns, List<String> rangeIndexColumns, TableTaskConfig taskConfig,
+ SegmentPartitionConfig segmentPartitionConfig, String sortedColumn)
throws Exception {
TableConfig tableConfig =
getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
- invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn, "daily");
+ invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig,
+ sortedColumn, "daily");
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
}
protected void updateOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
- List<String> bloomFilterColumns, TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig,
- String sortedColumn)
+ List<String> bloomFilterColumns, List<String> rangeIndexColumns, TableTaskConfig taskConfig,
+ SegmentPartitionConfig segmentPartitionConfig, String sortedColumn)
throws Exception {
TableConfig tableConfig =
getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
- invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn, "daily");
+ invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig,
+ sortedColumn, "daily");
sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonString());
}
private static TableConfig getOfflineTableConfig(String tableName, String timeColumnName, String timeType,
String brokerTenant, String serverTenant, String loadMode, SegmentVersion segmentVersion,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig,
- SegmentPartitionConfig segmentPartitionConfig, String sortedColumn, String segmentPushFrequency) {
+ List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
+ TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig, String sortedColumn,
+ String segmentPushFrequency) {
return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(timeColumnName)
.setTimeType(timeType).setSegmentPushFrequency(segmentPushFrequency).setNumReplicas(1)
.setBrokerTenant(brokerTenant).setServerTenant(serverTenant).setLoadMode(loadMode)
.setSegmentVersion(segmentVersion.toString()).setInvertedIndexColumns(invertedIndexColumns)
- .setBloomFilterColumns(bloomFilterColumns).setTaskConfig(taskConfig)
+ .setBloomFilterColumns(bloomFilterColumns).setRangeIndexColumns(rangeIndexColumns).setTaskConfig(taskConfig)
.setSegmentPartitionConfig(segmentPartitionConfig).setSortedColumn(sortedColumn).build();
}
@@ -377,19 +380,19 @@ public abstract class ClusterTest extends ControllerTest {
protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
- TableTaskConfig taskConfig, String streamConsumerFactoryName)
+ List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
+ List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName)
throws Exception {
addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, avroFile,
timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
- bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, 1);
+ bloomFilterColumns, rangeIndexColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, 1);
}
protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
- TableTaskConfig taskConfig, String streamConsumerFactoryName, int numReplicas)
+ List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
+ List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName, int numReplicas)
throws Exception {
addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, avroFile,
timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
@@ -480,14 +483,15 @@ public abstract class ClusterTest extends ControllerTest {
protected void addHybridTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
String kafkaTopic, int realtimeSegmentFlushSize, File avroFile, String timeColumnName, String timeType,
String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
- TableTaskConfig taskConfig, String streamConsumerFactoryName, SegmentPartitionConfig segmentPartitionConfig)
+ List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
+ List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName,
+ SegmentPartitionConfig segmentPartitionConfig)
throws Exception {
addOfflineTable(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, SegmentVersion.v1,
- invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn);
+ invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig, sortedColumn);
addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushSize, avroFile,
timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
- bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
+ bloomFilterColumns, rangeIndexColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
}
protected JsonNode getDebugInfo(final String uri)
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index bb343f5..6050314 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -169,7 +169,8 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
*/
private void setupOfflineTable(String table)
throws Exception {
- addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
+ addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null,
+ null);
}
/**
@@ -194,7 +195,7 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
String timeType = outgoingTimeUnit.toString();
addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null,
- null, null, null);
+ null, null, null, null);
ExecutorService executor = Executors.newCachedThreadPool();
ClusterIntegrationTestUtils
@@ -225,8 +226,8 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR, topic,
getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME,
- getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(),
- getTaskConfig(), getStreamConsumerFactoryClassName());
+ getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
+ getRangeIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName());
}
@Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index aae6005..e9e8e47 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -139,7 +139,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME,
- TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
+ TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRangeIndexColumns(),
getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(), getSegmentPartitionConfig());
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index dba6e22..8935966 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -300,7 +300,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
String timeType = outgoingTimeUnit.toString();
addHybridTable(_tableName, _useLlc, KAFKA_BROKER, KAFKA_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(),
_realtimeAvroFiles.get(0), timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, "MMAP",
- _sortedColumn, _invertedIndexColumns, null, null, null, getStreamConsumerFactoryClassName(), null);
+ _sortedColumn, _invertedIndexColumns, null, null, null, null, getStreamConsumerFactoryClassName(), null);
// Upload all segments
uploadSegments(getTableName(), _tarDir);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 5ccc832..db3bd93 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -72,6 +72,11 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+ // For inverted index triggering test
+ private static final List<String> UPDATED_RANGE_INDEX_COLUMNS = Arrays.asList("DivActualElapsedTime");
+ private static final String TEST_UPDATED_RANGE_INDEX_QUERY =
+ "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime > 305";
+
private static final List<String> UPDATED_BLOOM_FILTER_COLUMNS = Collections.singletonList("Carrier");
private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
@@ -127,7 +132,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Create the table
addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, getInvertedIndexColumns(),
- getBloomFilterIndexColumns(), getTaskConfig(), null, null);
+ getBloomFilterIndexColumns(), getRangeIndexColumns(), getTaskConfig(), null, null);
// Upload all segments
uploadSegments(getTableName(), _tarDir);
@@ -282,7 +287,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Update table config and trigger reload
updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
- UPDATED_INVERTED_INDEX_COLUMNS, null, getTaskConfig(), null, null);
+ UPDATED_INVERTED_INDEX_COLUMNS, null, null, getTaskConfig(), null, null);
sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
@@ -307,7 +312,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Update table config and trigger reload
updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null,
- UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig(), null, null);
+ UPDATED_BLOOM_FILTER_COLUMNS, null, getTaskConfig(), null, null);
sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
@@ -320,6 +325,36 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
} catch (Exception e) {
throw new RuntimeException(e);
}
+ }, 600_000L, "Failed to generate bloom index");
+ }
+
+ @Test
+ public void testRangeIndexTriggering()
+ throws Exception {
+ final long numTotalDocs = getCountStarResult();
+ JsonNode queryResponse = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
+ System.out.println("Before queryResponse = " + queryResponse);
+ assertEquals(queryResponse.get("numEntriesScannedInFilter").asLong(), numTotalDocs);
+ long beforeCount = queryResponse.get("aggregationResults").get(0).get("value").asLong();
+
+ // Update table config and trigger reload
+ updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null, null,
+ UPDATED_RANGE_INDEX_COLUMNS, getTaskConfig(), null, null);
+
+ sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode queryResponse1 = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
+ System.out.println("After queryResponse = " + queryResponse);
+ assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
+ long afterCount = queryResponse.get("aggregationResults").get(0).get("value").asLong();
+ //we should be scanning less than numTotalDocs with index enabled.
+ //In the current implementation its 8785, but it
+ return beforeCount == afterCount && queryResponse1.get("numEntriesScannedInFilter").asLong() < numTotalDocs;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}, 600_000L, "Failed to generate inverted index");
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 6c849dc..3416164 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -84,9 +84,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
// Add 3 offline tables, where 2 of them have TestTask enabled
TableTaskConfig taskConfig =
new TableTaskConfig(Collections.singletonMap(TestTaskGenerator.TASK_TYPE, Collections.emptyMap()));
- addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig, null, null);
- addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig, null, null);
- addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null, null, null);
+ addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, null, taskConfig, null,
+ null);
+ addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, null, taskConfig, null,
+ null);
+ addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null, null, null, null);
_helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 162c520..b2e0ce2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -26,6 +26,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
public class IndexingConfig extends BaseJsonConfig {
private List<String> _invertedIndexColumns;
+ private List<String> _rangeIndexColumns;
private boolean _autoGeneratedInvertedIndex;
private boolean _createInvertedIndexDuringSegmentGeneration;
private List<String> _sortedColumn;
@@ -59,6 +60,14 @@ public class IndexingConfig extends BaseJsonConfig {
_invertedIndexColumns = invertedIndexColumns;
}
+ public List<String> getRangeIndexColumns() {
+ return _rangeIndexColumns;
+ }
+
+ public void setRangeIndexColumns(List<String> rangeIndexColumns) {
+ _rangeIndexColumns = rangeIndexColumns;
+ }
+
public boolean isAutoGeneratedInvertedIndex() {
return _autoGeneratedInvertedIndex;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index f86c807..63cc43c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -81,6 +81,7 @@ public class TableConfigBuilder {
private List<String> _noDictionaryColumns;
private List<String> _onHeapDictionaryColumns;
private List<String> _bloomFilterColumns;
+ private List<String> _rangeIndexColumns;
private Map<String, String> _streamConfigs;
private SegmentPartitionConfig _segmentPartitionConfig;
@@ -229,6 +230,11 @@ public class TableConfigBuilder {
return this;
}
+ public TableConfigBuilder setRangeIndexColumns(List<String> rangeIndexColumns) {
+ _rangeIndexColumns = rangeIndexColumns;
+ return this;
+ }
+
public TableConfigBuilder setStreamConfigs(Map<String, String> streamConfigs) {
Preconditions.checkState(_tableType == TableType.REALTIME);
_streamConfigs = streamConfigs;
@@ -314,6 +320,7 @@ public class TableConfigBuilder {
indexingConfig.setNoDictionaryColumns(_noDictionaryColumns);
indexingConfig.setOnHeapDictionaryColumns(_onHeapDictionaryColumns);
indexingConfig.setBloomFilterColumns(_bloomFilterColumns);
+ indexingConfig.setRangeIndexColumns(_rangeIndexColumns);
indexingConfig.setStreamConfigs(_streamConfigs);
indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
index f93ee87..226aef4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
@@ -80,4 +80,12 @@ public class RangePredicateFilter implements PredicateFilter {
}
return false;
}
+
+ public int getStartIndex() {
+ return _startIndex;
+ }
+
+ public int getEndIndex() {
+ return _endIndex;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org