You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/12/20 17:44:14 UTC
[incubator-pinot] 02/03: Wiring H3 Index for query processing
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch h3-index
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5b13496533012a77efe9553a72db9cd753019b75
Author: kishoreg <g....@gmail.com>
AuthorDate: Sat Dec 19 09:08:32 2020 -0800
Wiring H3 Index for query processing
---
.../org/apache/pinot/core/common/DataSource.java | 9 +
.../operator/filter/H3IndexFilterOperator.java | 65 +++++++
.../org/apache/pinot/core/plan/FilterPlanNode.java | 17 +-
.../request/context/predicate/GeoPredicate.java | 22 +++
.../core/segment/creator/impl/V1Constants.java | 1 +
.../segment/index/column/ColumnIndexContainer.java | 6 +
.../index/column/PhysicalColumnIndexContainer.java | 15 ++
.../segment/index/datasource/BaseDataSource.java | 11 +-
.../index/datasource/ImmutableDataSource.java | 2 +-
.../index/datasource/MutableDataSource.java | 7 +-
.../segment/index/loader/IndexLoadingConfig.java | 10 +
.../segment/index/loader/SegmentPreProcessor.java | 6 +
.../index/loader/invertedindex/H3IndexHandler.java | 203 +++++++++++++++++++++
.../index/readers/geospatial/H3IndexReader.java | 1 -
.../pinot/core/segment/store/ColumnIndexType.java | 3 +-
.../virtualcolumn/VirtualColumnIndexContainer.java | 6 +
.../core/startree/v2/store/StarTreeDataSource.java | 3 +-
.../pinot/spi/config/table/IndexingConfig.java | 9 +
18 files changed, 385 insertions(+), 11 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
index 75f0513..97d3609 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
@@ -25,6 +25,8 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
import org.apache.pinot.core.segment.index.readers.TextIndexReader;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
+
/**
* The {@code DataSource} contains all the indexes and metadata for a column for query execution purpose.
@@ -61,6 +63,13 @@ public interface DataSource {
InvertedIndexReader<?> getRangeIndex();
/**
+ * Returns the range index for the column if exists, or {@code null} if not.
+ * <p>TODO: Have a separate interface for range index.
+ */
+ @Nullable
+ H3IndexReader getH3Index();
+
+ /**
* Returns the text index for the column if exists, or {@code null} if not.
*/
@Nullable
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java
new file mode 100644
index 0000000..98cdf64
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.query.request.context.predicate.GeoPredicate;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+public class H3IndexFilterOperator extends BaseFilterOperator {
+ private static final String OPERATOR_NAME = "H3IndexFilterOperator";
+
+ // NOTE: Range index can only apply to dictionary-encoded columns for now
+ // TODO: Support raw index columns
+ private final GeoPredicate _geoPredicate;
+ private final DataSource _dataSource;
+ private final int _numDocs;
+
+ public H3IndexFilterOperator(GeoPredicate geoPredicate, DataSource dataSource, int numDocs) {
+ _geoPredicate = geoPredicate;
+ _dataSource = dataSource;
+ _numDocs = numDocs;
+ }
+
+ @Override
+ protected FilterBlock getNextBlock() {
+ H3IndexReader h3IndexReader = (H3IndexReader) _dataSource.getRangeIndex();
+ assert h3IndexReader != null;
+
+ long h3Id = 1000;
+ ImmutableRoaringBitmap docIds = h3IndexReader.getDocIds(h3Id);
+ return new FilterBlock(new BitmapDocIdSet(docIds, _numDocs) {
+
+ // Override this method to reflect the entries scanned
+ @Override
+ public long getNumEntriesScannedInFilter() {
+ return 0; //TODO:Return the one from ScanBased
+ }
+ });
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 7e076e0..f31fc77 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -30,13 +30,16 @@ import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
import org.apache.pinot.core.operator.filter.ExpressionFilterOperator;
import org.apache.pinot.core.operator.filter.FilterOperatorUtils;
+import org.apache.pinot.core.operator.filter.H3IndexFilterOperator;
import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
import org.apache.pinot.core.operator.filter.TextMatchFilterOperator;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.predicate.GeoPredicate;
import org.apache.pinot.core.query.request.context.predicate.Predicate;
import org.apache.pinot.core.query.request.context.predicate.TextMatchPredicate;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
@@ -120,9 +123,17 @@ public class FilterPlanNode implements PlanNode {
Predicate predicate = filter.getPredicate();
ExpressionContext lhs = predicate.getLhs();
if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
- // TODO: ExpressionFilterOperator does not support predicate types without PredicateEvaluator (IS_NULL,
- // IS_NOT_NULL, TEXT_MATCH)
- return new ExpressionFilterOperator(_indexSegment, predicate, _numDocs);
+ FunctionContext function = lhs.getFunction();
+ if (function.getFunctionName().equalsIgnoreCase("H3_WITHIN")) {
+ String columnName = function.getArguments().get(0).getIdentifier();
+ GeoPredicate geoPredicate = new GeoPredicate();
+ //set geo predicate
+ return new H3IndexFilterOperator(geoPredicate, _indexSegment.getDataSource(columnName), _numDocs);
+ } else {
+ // TODO: ExpressionFilterOperator does not support predicate types without PredicateEvaluator (IS_NULL,
+ // IS_NOT_NULL, TEXT_MATCH)
+ return new ExpressionFilterOperator(_indexSegment, predicate, _numDocs);
+ }
} else {
DataSource dataSource = _indexSegment.getDataSource(lhs.getIdentifier());
switch (predicate.getType()) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/GeoPredicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/GeoPredicate.java
new file mode 100644
index 0000000..8f51ae5
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/GeoPredicate.java
@@ -0,0 +1,22 @@
+package org.apache.pinot.core.query.request.context.predicate;
+
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.locationtech.jts.geom.Geometry;
+
+
+//TODO: Make this flexible
+public class GeoPredicate {
+
+ //this is the column name
+ ExpressionContext _lhs;
+
+ Type type;
+
+ Geometry _geometry;
+
+ double _distance;
+
+ enum Type {
+ WITHIN, OVERLAP;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 0a09394..362bdbb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -37,6 +37,7 @@ public class V1Constants {
public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
+ public static final String BITMAP_H3_INDEX_FILE_EXTENSION = ".bitmap.h3";
public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue";
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
index 087f4b2..f8af539 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
@@ -25,6 +25,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReaderImpl;
import org.apache.pinot.core.segment.index.readers.TextIndexReader;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
/**
@@ -48,6 +49,11 @@ public interface ColumnIndexContainer extends Closeable {
InvertedIndexReader<?> getRangeIndex();
/**
+ * Returns the range index for the column, or {@code null} if it does not exist.
+ */
+ H3IndexReader getH3Index();
+
+ /**
* Returns the text index for the column, or {@code null} if it does not exist.
*/
TextIndexReader getTextIndex();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index 1fe955c..e88d0a8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -49,6 +49,7 @@ import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardInde
import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import org.apache.pinot.core.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
import org.apache.pinot.core.segment.index.readers.sorted.SortedIndexReaderImpl;
import org.apache.pinot.core.segment.index.readers.text.LuceneTextIndexReader;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -66,6 +67,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
private final ForwardIndexReader<?> _forwardIndex;
private final InvertedIndexReader<?> _invertedIndex;
private final InvertedIndexReader<?> _rangeIndex;
+ private final H3IndexReader _h3Index;
private final TextIndexReader _textIndex;
private final BaseImmutableDictionary _dictionary;
private final BloomFilterReader _bloomFilter;
@@ -77,6 +79,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
String columnName = metadata.getColumnName();
boolean loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
boolean loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName);
+ boolean loadH3Index = indexLoadingConfig.getH3IndexColumns().contains(columnName);
boolean loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName);
boolean loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
BloomFilterConfig bloomFilterConfig = indexLoadingConfig.getBloomFilterConfigs().get(columnName);
@@ -97,6 +100,11 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
_textIndex = null;
}
+ if (loadH3Index) {
+ _h3Index = new H3IndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.H3_INDEX));
+ } else {
+ _h3Index = null;
+ }
PinotDataBuffer fwdIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX);
if (metadata.hasDictionary()) {
@@ -142,6 +150,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
} else {
_rangeIndex = null;
}
+
} else {
// Raw index
_forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
@@ -150,6 +159,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
_rangeIndex = null;
_invertedIndex = null;
}
+
}
@Override
@@ -168,6 +178,11 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
}
@Override
+ public H3IndexReader getH3Index() {
+ return _h3Index;
+ }
+
+ @Override
public TextIndexReader getTextIndex() {
return _textIndex;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
index 172e8e6..cf4b449 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
import org.apache.pinot.core.segment.index.readers.TextIndexReader;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
public abstract class BaseDataSource implements DataSource {
@@ -35,19 +36,21 @@ public abstract class BaseDataSource implements DataSource {
private final Dictionary _dictionary;
private final InvertedIndexReader<?> _invertedIndex;
private final InvertedIndexReader<?> _rangeIndex;
+ private final H3IndexReader _h3Index;
private final TextIndexReader _textIndex;
private final BloomFilterReader _bloomFilter;
private final NullValueVectorReader _nullValueVector;
public BaseDataSource(DataSourceMetadata dataSourceMetadata, ForwardIndexReader<?> forwardIndex,
@Nullable Dictionary dictionary, @Nullable InvertedIndexReader<?> invertedIndex,
- @Nullable InvertedIndexReader<?> rangeIndex, @Nullable TextIndexReader textIndex,
+ @Nullable InvertedIndexReader<?> rangeIndex, @Nullable H3IndexReader h3Index, @Nullable TextIndexReader textIndex,
@Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) {
_dataSourceMetadata = dataSourceMetadata;
_forwardIndex = forwardIndex;
_dictionary = dictionary;
_invertedIndex = invertedIndex;
_rangeIndex = rangeIndex;
+ _h3Index = h3Index;
_textIndex = textIndex;
_bloomFilter = bloomFilter;
_nullValueVector = nullValueVector;
@@ -83,6 +86,12 @@ public abstract class BaseDataSource implements DataSource {
@Nullable
@Override
+ public H3IndexReader getH3Index() {
+ return _h3Index;
+ }
+
+ @Nullable
+ @Override
public TextIndexReader getTextIndex() {
return _textIndex;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
index cc09adf..bd24dae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
@@ -35,7 +35,7 @@ public class ImmutableDataSource extends BaseDataSource {
public ImmutableDataSource(ColumnMetadata columnMetadata, ColumnIndexContainer columnIndexContainer) {
super(new ImmutableDataSourceMetadata(columnMetadata), columnIndexContainer.getForwardIndex(),
columnIndexContainer.getDictionary(), columnIndexContainer.getInvertedIndex(),
- columnIndexContainer.getRangeIndex(), columnIndexContainer.getTextIndex(),
+ columnIndexContainer.getRangeIndex(), columnIndexContainer.getH3Index(), columnIndexContainer.getTextIndex(),
columnIndexContainer.getBloomFilter(), columnIndexContainer.getNullValueVector());
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index a927353..e433ff5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
import org.apache.pinot.core.segment.index.readers.TextIndexReader;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
import org.apache.pinot.spi.data.FieldSpec;
@@ -41,11 +42,11 @@ public class MutableDataSource extends BaseDataSource {
@Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions, @Nullable Comparable minValue,
@Nullable Comparable maxValue, ForwardIndexReader forwardIndex, @Nullable Dictionary dictionary,
@Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
- @Nullable TextIndexReader textIndex, @Nullable BloomFilterReader bloomFilter,
+ @Nullable H3IndexReader h3Index, @Nullable TextIndexReader textIndex, @Nullable BloomFilterReader bloomFilter,
@Nullable NullValueVectorReader nullValueVector) {
super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction,
- partitions, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, textIndex, bloomFilter,
- nullValueVector);
+ partitions, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, h3Index, textIndex,
+ bloomFilter, nullValueVector);
}
private static class MutableDataSourceMetadata implements DataSourceMetadata {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 04d91d1..62dd06a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -49,6 +49,7 @@ public class IndexLoadingConfig {
private Set<String> _invertedIndexColumns = new HashSet<>();
private Set<String> _textIndexColumns = new HashSet<>();
private Set<String> _rangeIndexColumns = new HashSet<>();
+ private Set<String> _h3IndexColumns = new HashSet<>();
private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
private Map<String, String> _noDictionaryConfig = new HashMap<>();
private Set<String> _varLengthDictionaryColumns = new HashSet<>();
@@ -97,6 +98,11 @@ public class IndexLoadingConfig {
_rangeIndexColumns.addAll(rangeIndexColumns);
}
+ List<String> h3IndexColumns = indexingConfig.getH3IndexColumns();
+ if (h3IndexColumns != null) {
+ _h3IndexColumns.addAll(h3IndexColumns);
+ }
+
List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
if (bloomFilterColumns != null) {
for (String bloomFilterColumn : bloomFilterColumns) {
@@ -226,6 +232,10 @@ public class IndexLoadingConfig {
return _rangeIndexColumns;
}
+ public Set<String> getH3IndexColumns() {
+ return _h3IndexColumns;
+ }
+
public Map<String, Map<String, String>> getColumnProperties() {
return _columnProperties;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
index ca7f7e9..8a17eb6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -30,6 +30,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax
import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory;
+import org.apache.pinot.core.segment.index.loader.invertedindex.H3IndexHandler;
import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler;
import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler;
@@ -113,6 +114,11 @@ public class SegmentPreProcessor implements AutoCloseable {
new RangeIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
rangeIndexHandler.createRangeIndices();
+ // Create column H3 indices according to the index config.
+ H3IndexHandler h3IndexHandler =
+ new H3IndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
+ h3IndexHandler.createH3Indices();
+
Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns();
if (textIndexColumns.size() > 0) {
TextIndexHandler textIndexHandler =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/H3IndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/H3IndexHandler.java
new file mode 100644
index 0000000..e8bcbec
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/H3IndexHandler.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexCreator;
+import org.apache.pinot.core.segment.creator.impl.inv.RangeIndexCreator;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
+import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class H3IndexHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(H3IndexHandler.class);
+
+ private final File _indexDir;
+ private final SegmentDirectory.Writer _segmentWriter;
+ private final String _segmentName;
+ private final SegmentVersion _segmentVersion;
+ private final Set<ColumnMetadata> _h3IndexColumns = new HashSet<>();
+
+ public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+ SegmentDirectory.Writer segmentWriter) {
+ _indexDir = indexDir;
+ _segmentWriter = segmentWriter;
+ _segmentName = segmentMetadata.getName();
+ _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+ // Only create H3 index on non-dictionary-encoded columns
+ for (String column : indexLoadingConfig.getH3IndexColumns()) {
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+ if (columnMetadata != null && !columnMetadata.hasDictionary()) {
+ _h3IndexColumns.add(columnMetadata);
+ }
+ }
+ }
+
+ public void createH3Indices()
+ throws Exception {
+ for (ColumnMetadata columnMetadata : _h3IndexColumns) {
+ createH3IndexForColumn(columnMetadata);
+ }
+ }
+
+ private void createH3IndexForColumn(ColumnMetadata columnMetadata)
+ throws Exception {
+ String column = columnMetadata.getColumnName();
+ File inProgress = new File(_indexDir, column + ".h3.inprogress");
+ File h3IndexFile = new File(_indexDir, column + V1Constants.Indexes.BITMAP_H3_INDEX_FILE_EXTENSION);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+
+ if (_segmentWriter.hasIndexFor(column, ColumnIndexType.H3_INDEX)) {
+ // Skip creating range index if already exists.
+
+ LOGGER.info("Found h3 index for segment: {}, column: {}", _segmentName, column);
+ return;
+ }
+
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run gets interrupted.
+ // Remove range index if exists.
+ // For v1 and v2, it's the actual range index. For v3, it's the temporary range index.
+ FileUtils.deleteQuietly(h3IndexFile);
+ }
+
+ // Create new range index for the column.
+ LOGGER.info("Creating new h3 index for segment: {}, column: {}", _segmentName, column);
+ if (columnMetadata.hasDictionary()) {
+// handleDictionaryBasedColumn(columnMetadata);
+ } else {
+ handleNonDictionaryBasedColumn(columnMetadata);
+ }
+
+ // For v3, write the generated range index file into the single file and remove it.
+ if (_segmentVersion == SegmentVersion.v3) {
+ LoaderUtils.writeIndexToV3Format(_segmentWriter, column, h3IndexFile, ColumnIndexType.H3_INDEX);
+ }
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created range index for segment: {}, column: {}", _segmentName, column);
+ }
+
+ //TODO: add later
+ private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ throws IOException {
+// int numDocs = columnMetadata.getTotalDocs();
+// try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+// ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+// H3IndexCreator h3IndexCreator = new H3IndexCreator(_indexDir, columnMetadata.getFieldSpec(), 5)) {
+// if (columnMetadata.isSingleValue()) {
+// // Single-value column
+// for (int i = 0; i < numDocs; i++) {
+// forwardIndexReader.getDictId(i, readerContext);
+//// h3IndexCreator.add();
+// }
+// } else {
+// // Multi-value column
+//// int[] dictIds = new int[columnMetadata.getMaxNumberOfMultiValues()];
+//// for (int i = 0; i < numDocs; i++) {
+//// int length = forwardIndexReader.getDictIdMV(i, dictIds, readerContext);
+//// rangeIndexCreator.add(dictIds, length);
+//// }
+// }
+// h3IndexCreator.seal();
+// }
+ }
+
+ private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ throws Exception {
+ int numDocs = columnMetadata.getTotalDocs();
+ try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+ ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+ H3IndexCreator h3IndexCreator = new H3IndexCreator(_indexDir, columnMetadata.getFieldSpec(), 5)) {
+ if (columnMetadata.isSingleValue()) {
+ // Single-value column.
+ switch (columnMetadata.getDataType()) {
+ case BYTES:
+ for (int i = 0; i < numDocs; i++) {
+ byte[] bytes = forwardIndexReader.getBytes(i, readerContext);
+ Geometry geometry = GeometrySerializer.deserialize(bytes);
+ Coordinate coordinate = geometry.getCoordinate();
+ h3IndexCreator.add(i, coordinate.x, coordinate.y);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unsupported data type: " + columnMetadata.getDataType());
+ }
+ } else {
+ // Multi-value column
+ //TODO
+ throw new IllegalStateException(
+ "H3 indexing is not supported for Multivalue column : " + columnMetadata.getDataType());
+ }
+ h3IndexCreator.seal();
+ }
+ }
+
+ private ForwardIndexReader<?> getForwardIndexReader(ColumnMetadata columnMetadata,
+ SegmentDirectory.Writer segmentWriter)
+ throws IOException {
+ PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX);
+ int numRows = columnMetadata.getTotalDocs();
+ int numBitsPerValue = columnMetadata.getBitsPerElement();
+ if (columnMetadata.isSingleValue()) {
+ if (columnMetadata.hasDictionary()) {
+ return new FixedBitSVForwardIndexReaderV2(buffer, numRows, numBitsPerValue);
+ } else {
+ return new FixedByteChunkSVForwardIndexReader(buffer, columnMetadata.getDataType());
+ }
+ } else {
+ if (columnMetadata.hasDictionary()) {
+ return new FixedBitMVForwardIndexReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(),
+ numBitsPerValue);
+ } else {
+ throw new IllegalStateException("Raw index on multi-value column is not supported");
+ }
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java
index 96fa586..b8a6a6a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java
@@ -78,7 +78,6 @@ public class H3IndexReader implements Closeable {
}
}
- //todo: fix this
private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int index) {
int currentOffset = getOffset(index);
int bufferLength;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
index dcd21df..7cb285f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
@@ -25,7 +25,8 @@ public enum ColumnIndexType {
BLOOM_FILTER("bloom_filter"),
NULLVALUE_VECTOR("nullvalue_vector"),
TEXT_INDEX("text_index"),
- RANGE_INDEX("range_index");
+ RANGE_INDEX("range_index"),
+ H3_INDEX("h3_index");
private final String indexName;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
index dc67771..3750bfb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
@@ -26,6 +26,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReaderImpl;
import org.apache.pinot.core.segment.index.readers.TextIndexReader;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
/**
@@ -79,6 +80,11 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer {
}
@Override
+ public H3IndexReader getH3Index() {
+ return null;
+ }
+
+ @Override
public void close()
throws IOException {
_forwardIndex.close();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
index c24fb04..868cb47 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
@@ -32,7 +32,8 @@ public class StarTreeDataSource extends BaseDataSource {
public StarTreeDataSource(FieldSpec fieldSpec, int numDocs, ForwardIndexReader<?> forwardIndex,
@Nullable Dictionary dictionary) {
- super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null, null, null, null);
+ super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null, null, null, null,
+ null);
}
private static final class StarTreeDataSourceMetadata implements DataSourceMetadata {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 184d00a..92fc97f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -28,6 +28,7 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
public class IndexingConfig extends BaseJsonConfig {
private List<String> _invertedIndexColumns;
private List<String> _rangeIndexColumns;
+ private List<String> _h3IndexColumns;
private boolean _autoGeneratedInvertedIndex;
private boolean _createInvertedIndexDuringSegmentGeneration;
private List<String> _sortedColumn;
@@ -65,6 +66,14 @@ public class IndexingConfig extends BaseJsonConfig {
_invertedIndexColumns = invertedIndexColumns;
}
+ public List<String> getH3IndexColumns() {
+ return _h3IndexColumns;
+ }
+
+ public void setH3IndexColumns(List<String> h3IndexColumns) {
+ _h3IndexColumns = h3IndexColumns;
+ }
+
public List<String> getRangeIndexColumns() {
return _rangeIndexColumns;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org