You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/03/19 05:41:50 UTC
[incubator-pinot] 04/06: Wiring up end to end to support indexing
nested fields on complex objects
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch nested-object-indexing-1
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 224299deccf308ddfdab2e59556d413944596329
Author: kishore gopalakrishna <g....@gmail.com>
AuthorDate: Thu Jan 24 19:53:29 2019 -0800
Wiring up end to end to support indexing nested fields on complex objects
---
.../org/apache/pinot/common/data/FieldSpec.java | 3 +
.../pinot/common/data/objects/TextObject.java | 3 +-
pinot-core/pom.xml | 19 ++-
.../immutable/ImmutableSegmentLoader.java | 4 +-
.../io/reader/impl/v1/SortedIndexReaderImpl.java | 6 +
.../core/operator/filter/FilterOperatorUtils.java | 8 +-
.../filter/IndexBasedMatchesFilterOperator.java | 86 +++++++++++
.../filter/ScanBasedMatchesFilterOperator.java | 33 ++--
.../MatchesPredicateEvaluatorFactory.java | 21 +--
.../invertedindex/RealtimeInvertedIndexReader.java | 8 +-
.../core/segment/creator/InvertedIndexCreator.java | 8 +
.../core/segment/creator/impl/V1Constants.java | 1 +
.../creator/impl/inv/LuceneIndexCreator.java | 127 +++++++++++++++
.../inv/OffHeapBitmapInvertedIndexCreator.java | 6 +
.../impl/inv/OnHeapBitmapInvertedIndexCreator.java | 6 +
.../index/column/PhysicalColumnIndexContainer.java | 22 ++-
.../index/data/source/ColumnDataSource.java | 2 +-
.../loader/invertedindex/InvertedIndexHandler.java | 106 ++++++++++++-
.../index/readers/BitmapInvertedIndexReader.java | 7 +
.../segment/index/readers/InvertedIndexReader.java | 8 +
.../index/readers/LuceneInvertedIndexReader.java | 157 +++++++++++++++++++
.../virtualcolumn/DocIdVirtualColumnProvider.java | 7 +-
.../SingleStringVirtualColumnProvider.java | 7 +-
.../tests/LuceneIndexClusterIntegrationTest.java | 172 +++++++++++++++++++++
pinot-perf/pom.xml | 35 ++++-
.../org/apache/pinot/perf/LuceneBenchmark.java | 80 ++++++++++
.../apache/pinot/tools/perf/ZookeeperLauncher.java | 2 +-
27 files changed, 887 insertions(+), 57 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
index 080f0e7..ad5a5a3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
@@ -335,6 +335,9 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, ConfigNodeLife
case STRING:
jsonSchema.set("type", convertStringsToJsonArray("null", "string"));
return jsonSchema;
+ case BYTES:
+ jsonSchema.set("type", convertStringsToJsonArray("null", "bytes"));
+ return jsonSchema;
default:
throw new UnsupportedOperationException();
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java b/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
index cf5f27e..c171c40 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
@@ -27,7 +27,8 @@ import com.google.common.collect.Lists;
public class TextObject implements PinotObject {
byte[] _bytes;
- private static List<String> _FIELDS = Lists.newArrayList("Content");
+ public static String DEFAULT_FIELD = "Content";
+ private static List<String> _FIELDS = Lists.newArrayList(DEFAULT_FIELD);
@Override
public void init(byte[] bytes) {
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 8f4efe2..4c8215a 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -196,7 +196,24 @@
</exclusion>
</exclusions>
</dependency>
-
+
+ <!-- Lucene -->
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-core</artifactId>
+ <version>7.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-queryparser</artifactId>
+ <version>7.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-analyzers-common</artifactId>
+ <version>7.6.0</version>
+ </dependency>
+
<!-- test -->
<dependency>
<groupId>org.mockito</groupId>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index 2ddb80c..3d7ac31 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -118,8 +118,8 @@ public class ImmutableSegmentLoader {
SegmentDirectory.Reader segmentReader = segmentDirectory.createReader();
Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
- indexContainerMap
- .put(entry.getKey(), new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), indexLoadingConfig));
+ indexContainerMap.put(entry.getKey(),
+ new PhysicalColumnIndexContainer(indexDir, segmentReader, entry.getValue(), indexLoadingConfig));
}
if (schema == null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java
index 0018b8c..a393dd4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.io.reader.impl.v1;
import com.google.common.base.Preconditions;
import java.io.IOException;
import org.apache.pinot.common.utils.Pairs;
+import org.apache.pinot.common.utils.Pairs.IntPair;
+import org.apache.pinot.core.common.Predicate;
import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
import org.apache.pinot.core.io.reader.ReaderContext;
import org.apache.pinot.core.io.util.FixedByteValueReaderWriter;
@@ -120,6 +122,10 @@ public class SortedIndexReaderImpl extends BaseSingleColumnSingleValueReader<Sor
}
@Override
+ public IntPair getDocIds(Predicate predicate) {
+ throw new UnsupportedOperationException("");
+ }
+ @Override
public Pairs.IntPair getDocIds(int dictId) {
return new Pairs.IntPair(_reader.getInt(2 * dictId), _reader.getInt(2 * dictId + 1));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
index bc46b2f..f4354ae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
@@ -56,7 +56,13 @@ public class FilterOperatorUtils {
// Use inverted index if the predicate type is not RANGE or REGEXP_LIKE for efficiency
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
Predicate.Type predicateType = predicateEvaluator.getPredicateType();
- if (dataSourceMetadata.hasInvertedIndex() && (predicateType != Predicate.Type.RANGE) && (predicateType
+ if(predicateType == Predicate.Type.MATCHES) {
+ if(dataSourceMetadata.hasInvertedIndex()) {
+ return new IndexBasedMatchesFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId);
+ } else {
+ return new ScanBasedMatchesFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId);
+ }
+ } else if (dataSourceMetadata.hasInvertedIndex() && (predicateType != Predicate.Type.RANGE) && (predicateType
!= Predicate.Type.REGEXP_LIKE)) {
if (dataSourceMetadata.isSorted()) {
return new SortedInvertedIndexBasedFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/IndexBasedMatchesFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/IndexBasedMatchesFilterOperator.java
new file mode 100644
index 0000000..50f39b7
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/IndexBasedMatchesFilterOperator.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.Preconditions;
+import org.apache.lucene.search.TopDocs;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.common.predicate.MatchesPredicate;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.filter.predicate.MatchesPredicateEvaluatorFactory.DefaultMatchesPredicateEvaluator;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexBasedMatchesFilterOperator extends BaseFilterOperator {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IndexBasedMatchesFilterOperator.class);
+ private static final String OPERATOR_NAME = "IndexBasedMatchesFilterOperator";
+
+ private final DataSource _dataSource;
+ private final int _startDocId;
+ // TODO: change it to exclusive
+ // Inclusive
+ private final int _endDocId;
+ private MatchesPredicate _matchesPredicate;
+
+ public IndexBasedMatchesFilterOperator(PredicateEvaluator predicateEvaluator,
+ DataSource dataSource, int startDocId, int endDocId) {
+ // NOTE:
+ // Predicate that is always evaluated as true or false should not be passed into the
+ // TextMatchFilterOperator for
+ // performance concern.
+ // If predicate is always evaluated as true, use MatchAllFilterOperator; if predicate is always
+ // evaluated as false,
+ // use EmptyFilterOperator.
+ Preconditions
+ .checkArgument(!predicateEvaluator.isAlwaysTrue() && !predicateEvaluator.isAlwaysFalse());
+ Preconditions.checkArgument(predicateEvaluator instanceof DefaultMatchesPredicateEvaluator);
+
+ DefaultMatchesPredicateEvaluator evaluator =
+ (DefaultMatchesPredicateEvaluator) predicateEvaluator;
+ _matchesPredicate = evaluator.getMatchesPredicate();
+ _dataSource = dataSource;
+ _startDocId = startDocId;
+ _endDocId = endDocId;
+ }
+
+ @Override
+ protected FilterBlock getNextBlock() {
+
+ InvertedIndexReader invertedIndex = _dataSource.getInvertedIndex();
+ MutableRoaringBitmap bitmap = (MutableRoaringBitmap) invertedIndex.getDocIds(_matchesPredicate);
+
+ boolean exclusive = false;
+ ImmutableRoaringBitmap[] bitmapArray = new ImmutableRoaringBitmap[] {
+ bitmap
+ };
+ return new FilterBlock(new BitmapDocIdSet(bitmapArray, _startDocId, _endDocId, exclusive));
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedMatchesFilterOperator.java
similarity index 59%
copy from pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
copy to pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedMatchesFilterOperator.java
index cf5f27e..1f730e1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedMatchesFilterOperator.java
@@ -16,36 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.common.data.objects;
+package org.apache.pinot.core.operator.filter;
-import java.util.List;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
-import org.apache.pinot.common.data.PinotObject;
+public class ScanBasedMatchesFilterOperator extends BaseFilterOperator {
-import com.google.common.collect.Lists;
-
-public class TextObject implements PinotObject {
-
- byte[] _bytes;
- private static List<String> _FIELDS = Lists.newArrayList("Content");
-
- @Override
- public void init(byte[] bytes) {
- _bytes = bytes;
+ public ScanBasedMatchesFilterOperator(PredicateEvaluator predicateEvaluator,
+ DataSource dataSource, int startDocId, int endDocId) {
+ // TODO Auto-generated constructor stub
}
@Override
- public byte[] toBytes() {
- return _bytes;
- }
-
- @Override
- public List<String> getPropertyNames() {
- return _FIELDS;
+ protected FilterBlock getNextBlock() {
+ // TODO Auto-generated method stub
+ return null;
}
@Override
- public Object getProperty(String field) {
+ public String getOperatorName() {
// TODO Auto-generated method stub
return null;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java
index 26b7e16..684573f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java
@@ -34,17 +34,15 @@ public class MatchesPredicateEvaluatorFactory {
*/
public static BaseRawValueBasedPredicateEvaluator newRawValueBasedEvaluator(
MatchesPredicate textMatchPredicate, FieldSpec.DataType dataType) {
- return new RawValueBasedTextMatchPredicateEvaluator(textMatchPredicate);
+ return new DefaultMatchesPredicateEvaluator(textMatchPredicate);
}
- public static final class RawValueBasedTextMatchPredicateEvaluator
+ public static final class DefaultMatchesPredicateEvaluator
extends BaseRawValueBasedPredicateEvaluator {
- String _query;
- String _options;
+ private MatchesPredicate _matchesPredicate;
- public RawValueBasedTextMatchPredicateEvaluator(MatchesPredicate textMatchPredicate) {
- _query = textMatchPredicate.getQuery();
- _options = textMatchPredicate.getQueryOptions();
+ public DefaultMatchesPredicateEvaluator(MatchesPredicate matchesPredicate) {
+ this._matchesPredicate = matchesPredicate;
}
@Override
@@ -58,12 +56,9 @@ public class MatchesPredicateEvaluatorFactory {
"Text Match is not supported via scanning, its supported only via inverted index");
}
- public String getQueryString() {
- return _query;
- }
-
- public String getQueryOptions() {
- return _options;
+ public MatchesPredicate getMatchesPredicate(){
+ return _matchesPredicate;
}
+
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
index 363f688..4c45850 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.realtime.impl.invertedindex;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.pinot.core.common.Predicate;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -54,7 +56,11 @@ public class RealtimeInvertedIndexReader implements InvertedIndexReader<MutableR
_bitmaps.get(dictId).checkAndAdd(docId);
}
}
-
+
+ @Override
+ public MutableRoaringBitmap getDocIds(Predicate predicate) {
+ throw new UnsupportedOperationException("Predicate:"+ predicate + " is not supported");
+ }
@Override
public MutableRoaringBitmap getDocIds(int dictId) {
ThreadSafeMutableRoaringBitmap bitmap;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
index 3eafa9d..d5dadcf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.creator;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.pinot.common.data.PinotObject;
+
/**
* Currently only support RoaringBitmap inverted index.
@@ -65,6 +67,12 @@ public interface InvertedIndexCreator extends Closeable {
* For multi-valued column, adds the dictionary Ids for the next document.
*/
void add(int[] dictIds, int length);
+
+ /**
+ * For complex data types such as Map, JSON, TEXT
+ * @param object
+ */
+ void add(PinotObject object);
/**
* Seals the index and flushes it to disk.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 3536018..7153f3a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -60,6 +60,7 @@ public class V1Constants {
public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
+ public static final String LUCENE_INVERTED_INDEX_DIR = ".lucene.inv";
public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java
new file mode 100644
index 0000000..6622b57
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.PinotObject;
+import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
+import org.apache.pinot.core.segment.index.ColumnMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LuceneIndexCreator implements InvertedIndexCreator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InvertedIndexCreator.class);
+
+ public static final String VERSION = "7.6.0";
+ // Index file will be flushed after reaching this threshold
+ private static final int MAX_BUFFER_SIZE_MB = 500;
+ private static final Field.Store DEFAULT_STORE = Field.Store.NO;
+ private final Analyzer _analyzer;
+ private final IndexWriter _writer;
+ private final IndexWriterConfig _indexWriterConfig;
+ private final Directory _indexDirectory;
+ // TODO:Figure out a way to avoid this
+ boolean _isText = false;
+
+ public LuceneIndexCreator(ColumnMetadata columnMetadata, File outputDirectory) {
+ // TODO: Get IndexConfig and set the different analyzer for each field by default we set
+ // StandardAnalyzer and use TextField. This can be expensive and inefficient if all we need is
+ // exact match. See keyword analyzer
+ _analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer());
+ _indexWriterConfig = new IndexWriterConfig(_analyzer);
+ _indexWriterConfig.setRAMBufferSizeMB(MAX_BUFFER_SIZE_MB);
+ _isText = "TEXT".equalsIgnoreCase(columnMetadata.getObjectType());
+ try {
+ _indexDirectory = FSDirectory.open(outputDirectory.toPath());
+ _writer = new IndexWriter(_indexDirectory, _indexWriterConfig);
+ } catch (IOException e) {
+ LOGGER.error("Encountered error creating LuceneIndexCreator ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void add(int dictId) {
+ throw new UnsupportedOperationException(
+ "Lucene indexing not supported for dictionary encoded columns");
+ }
+
+ @Override
+ public void add(int[] dictIds, int length) {
+ throw new UnsupportedOperationException(
+ "Lucene indexing not supported for dictionary encoded columns");
+
+ }
+
+ @Override
+ public void add(PinotObject object) {
+ Document document = new Document();
+ List<String> propertyNames = object.getPropertyNames();
+ for (String propertyName : propertyNames) {
+ Field field;
+ // TODO: Figure out a way to avoid special casing Text, have a way to get propertyType from
+ // pinotObject?
+ // TODO: Handle list field
+ Object value = object.getProperty(propertyName);
+ if (value.getClass().isAssignableFrom(List.class)) {
+ List<?> list = (List<?>) value;
+ for (Object item : list) {
+ field = new TextField(propertyName, item.toString(), DEFAULT_STORE);
+ document.add(field);
+ }
+ } else {
+ field = new TextField(propertyName, value.toString(), DEFAULT_STORE);
+ document.add(field);
+ }
+ }
+ try {
+ _writer.addDocument(document);
+ } catch (IOException e) {
+ LOGGER.error("Encountered exception while adding doc:{}", document.toString(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void seal() throws IOException {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ _writer.close();
+ }
+
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
index c4e30b7..b525427 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
@@ -26,6 +26,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.PinotObject;
import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -119,6 +120,11 @@ public final class OffHeapBitmapInvertedIndexCreator implements InvertedIndexCre
}
@Override
+ public void add(PinotObject object) {
+ throw new UnsupportedOperationException("Bitmap Indexing not supported for Pinot Objects");
+ }
+
+ @Override
public void add(int dictId) {
putInt(_forwardIndexValueBuffer, _nextDocId++, dictId);
putInt(_invertedIndexLengthBuffer, dictId, getInt(_invertedIndexLengthBuffer, dictId) + 1);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
index c358361..83bc69c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.data.PinotObject;
import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -47,6 +48,11 @@ public final class OnHeapBitmapInvertedIndexCreator implements InvertedIndexCrea
}
@Override
+ public void add(PinotObject object) {
+ throw new UnsupportedOperationException("Bitmap Indexing not supported for Pinot Objects");
+ }
+
+ @Override
public void add(int dictId) {
_bitmaps[dictId].add(_nextDocId++);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index df83533..29b6b46 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.core.segment.index.column;
+import java.io.File;
import java.io.IOException;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader;
@@ -39,6 +41,7 @@ import org.apache.pinot.core.segment.index.readers.ImmutableDictionaryReader;
import org.apache.pinot.core.segment.index.readers.IntDictionary;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.LongDictionary;
+import org.apache.pinot.core.segment.index.readers.LuceneInvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.OnHeapDoubleDictionary;
import org.apache.pinot.core.segment.index.readers.OnHeapFloatDictionary;
import org.apache.pinot.core.segment.index.readers.OnHeapIntDictionary;
@@ -60,7 +63,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
private final ImmutableDictionaryReader _dictionary;
private final BloomFilterReader _bloomFilterReader;
- public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata,
+ public PhysicalColumnIndexContainer(File indexDir, SegmentDirectory.Reader segmentReader, ColumnMetadata metadata,
IndexLoadingConfig indexLoadingConfig)
throws IOException {
String columnName = metadata.getColumnName();
@@ -105,16 +108,25 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
metadata.getBitsPerElement());
}
if (loadInvertedIndex) {
- _invertedIndex =
- new BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX),
- metadata.getCardinality());
+ if (metadata.getObjectType() == null && metadata.getFieldSpec().getDataType() != DataType.BYTES) {
+ _invertedIndex =
+ new BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX),
+ metadata.getCardinality());
+ } else {
+ _invertedIndex = new LuceneInvertedIndexReader(indexDir, metadata);
+ }
} else {
_invertedIndex = null;
}
} else {
// Raw index
_forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
- _invertedIndex = null;
+ if (loadInvertedIndex && (metadata.getObjectType() != null
+ || metadata.getFieldSpec().getDataType() == DataType.BYTES)) {
+ _invertedIndex = new LuceneInvertedIndexReader(indexDir, metadata);
+ } else {
+ _invertedIndex = null;
+ }
_dictionary = null;
_bloomFilterReader = null;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
index 3044457..2734f75 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
@@ -87,7 +87,7 @@ public final class ColumnDataSource extends DataSource {
}
} else {
// Raw index
- Preconditions.checkState(invertedIndex == null);
+ //Preconditions.checkState(invertedIndex == null);
}
_operatorName = "ColumnDataSource [" + columnName + "]";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
index 15fa01d..b9a2bec 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -20,16 +20,24 @@ package org.apache.pinot.core.segment.index.loader.invertedindex;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.data.PinotObject;
+import org.apache.pinot.common.data.objects.JSONObject;
+import org.apache.pinot.common.data.objects.MapObject;
+import org.apache.pinot.common.data.objects.TextObject;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader;
import org.apache.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
+import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.creator.impl.inv.LuceneIndexCreator;
import org.apache.pinot.core.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
import org.apache.pinot.core.segment.index.ColumnMetadata;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
@@ -70,11 +78,99 @@ public class InvertedIndexHandler {
public void createInvertedIndices()
throws IOException {
for (ColumnMetadata columnMetadata : _invertedIndexColumns) {
- createInvertedIndexForColumn(columnMetadata);
+ String objectType = columnMetadata.getObjectType();
+ if (objectType == null) {
+ createInvertedIndexForSimpleField(columnMetadata);
+ } else {
+ createInvertedIndexForComplexObject(columnMetadata);
+ }
}
}
- private void createInvertedIndexForColumn(ColumnMetadata columnMetadata)
+ @SuppressWarnings("unchecked")
+ private void createInvertedIndexForComplexObject(ColumnMetadata columnMetadata)
+ throws IOException {
+ String column = columnMetadata.getColumnName();
+
+ File inProgress = new File(_indexDir, column + ".inv.inprogress");
+ File invertedIndexDir = new File(_indexDir, column + V1Constants.Indexes.LUCENE_INVERTED_INDEX_DIR);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+
+ if (invertedIndexDir.exists()) {
+ // Skip creating inverted index if already exists.
+ LOGGER.info("Found inverted index for segment: {}, column: {}", _segmentName, column);
+ return;
+ }
+
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run gets interrupted.
+ // Remove inverted index if exists.
+ // For v1 and v2, it's the actual inverted index. For v3, it's the temporary inverted index.
+ FileUtils.deleteQuietly(invertedIndexDir);
+ }
+
+ // Create new inverted index for the column.
+ LOGGER.info("Creating new lucene based inverted index for segment: {}, column: {}", _segmentName, column);
+ int numDocs = columnMetadata.getTotalDocs();
+ String objectType = columnMetadata.getObjectType();
+ Class<? extends PinotObject> pinotObjectClazz;
+ PinotObject pinotObject = null;
+ try {
+ switch (objectType.toUpperCase()) {
+ case "MAP":
+ pinotObjectClazz = MapObject.class;
+ break;
+ case "JSON":
+ pinotObjectClazz = JSONObject.class;
+ break;
+ case "TEXT":
+ pinotObjectClazz = TextObject.class;
+ break;
+ default:
+ // custom object type.
+ pinotObjectClazz = (Class<? extends PinotObject>) Class.forName(objectType);
+ }
+ pinotObject = pinotObjectClazz.getConstructor(new Class[]{}).newInstance(new Object[]{});
+ } catch (Exception e) {
+ LOGGER.error("Error pinot object for type:{}. Skipping inverted index creation", objectType);
+ return;
+ }
+
+ try (LuceneIndexCreator luceneIndexCreator = new LuceneIndexCreator(columnMetadata, invertedIndexDir)) {
+ try (DataFileReader fwdIndex = getForwardIndexReader(columnMetadata, _segmentWriter)) {
+ if (columnMetadata.isSingleValue()) {
+ // Single-value column.
+ VarByteChunkSingleValueReader svFwdIndex = (VarByteChunkSingleValueReader) fwdIndex;
+ for (int i = 0; i < numDocs; i++) {
+ byte[] bytes = svFwdIndex.getBytes(i);
+
+ pinotObject.init(bytes);
+ luceneIndexCreator.add(pinotObject);
+ }
+ } else {
+ throw new UnsupportedOperationException("Multi Value not supported for complex object types");
+ }
+ luceneIndexCreator.seal();
+ }
+ }
+ String tarGzPath = TarGzCompressionUtils.createTarGzOfDirectory(invertedIndexDir.getAbsolutePath());
+
+ // For v3, write the generated inverted index file into the single file and remove it.
+ if (_segmentVersion == SegmentVersion.v3) {
+ LoaderUtils.writeIndexToV3Format(_segmentWriter, column, new File(tarGzPath), ColumnIndexType.INVERTED_INDEX);
+ }
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created inverted index for segment: {}, column: {}", _segmentName, column);
+ }
+
+ private void createInvertedIndexForSimpleField(ColumnMetadata columnMetadata)
throws IOException {
String column = columnMetadata.getColumnName();
@@ -146,7 +242,11 @@ public class InvertedIndexHandler {
int numRows = columnMetadata.getTotalDocs();
int numBitsPerValue = columnMetadata.getBitsPerElement();
if (columnMetadata.isSingleValue()) {
- return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue);
+ if (columnMetadata.hasDictionary()) {
+ return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue);
+ } else {
+ return new VarByteChunkSingleValueReader(buffer);
+ }
} else {
return new FixedBitMultiValueReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(), numBitsPerValue);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java
index d68af16..d9d830a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java
@@ -22,6 +22,8 @@ import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
+
+import org.apache.pinot.core.common.Predicate;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.slf4j.Logger;
@@ -52,6 +54,11 @@ public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableR
load(indexDataBuffer);
}
+ @Override
+ public ImmutableRoaringBitmap getDocIds(Predicate predicate) {
+ throw new UnsupportedOperationException("Predicate based evaluation not supported for Bitmap based Indexing scheme");
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java
index 6a949c0..6b42f4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java
@@ -20,6 +20,8 @@ package org.apache.pinot.core.segment.index.readers;
import java.io.Closeable;
+import org.apache.pinot.core.common.Predicate;
+
public interface InvertedIndexReader<T> extends Closeable {
@@ -27,4 +29,10 @@ public interface InvertedIndexReader<T> extends Closeable {
* Get the document ids for the given dictionary id.
*/
T getDocIds(int dictId);
+
+ /**
+ * Get the document id's for the given predicate
+ */
+ T getDocIds(Predicate predicate);
+
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java
new file mode 100644
index 0000000..576bcc4
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.queryparser.flexible.core.nodes.FieldQueryNode;
+import org.apache.lucene.queryparser.surround.query.FieldsQuery;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.data.FieldSpec.FieldType;
+import org.apache.pinot.common.data.objects.TextObject;
+import org.apache.pinot.core.common.Predicate;
+import org.apache.pinot.core.common.predicate.MatchesPredicate;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.ColumnMetadata;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class LuceneInvertedIndexReader implements InvertedIndexReader<MutableRoaringBitmap> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(LuceneInvertedIndexReader.class);
+ private final IndexSearcher _searcher;
+ private final Analyzer _analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer());
+ private Directory _index;
+
+ /**
+ * TODO: Change this to take PinotDataBuffer, work around for now since Lucene needs actual
+ * directory
+ * @param metadata
+ * @param indexDir
+ * @param metadata
+ */
+ public LuceneInvertedIndexReader(File segmentIndexDir, ColumnMetadata metadata) {
+
+ try {
+ File searchIndexDir = new File(segmentIndexDir.getPath(),
+ metadata.getColumnName() + V1Constants.Indexes.LUCENE_INVERTED_INDEX_DIR);
+ _index = FSDirectory.open(searchIndexDir.toPath());
+ IndexReader reader = DirectoryReader.open(_index);
+ _searcher = new IndexSearcher(reader);
+ } catch (IOException e) {
+ LOGGER.error("Encountered error creating LuceneSearchIndexReader ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ _index.close();
+ }
+
+ @Override
+ public MutableRoaringBitmap getDocIds(int dictId) {
+ throw new UnsupportedOperationException(
+ "DictId based evaluation not supported for Lucene based Indexing scheme");
+ }
+
+ @Override
+ public MutableRoaringBitmap getDocIds(Predicate predicate) {
+ MatchesPredicate matchesPredicate = (MatchesPredicate) predicate;
+ MutableRoaringBitmap bitmap =
+ getDocIds(matchesPredicate.getQuery(), matchesPredicate.getQueryOptions());
+ return bitmap;
+ }
+
+ public MutableRoaringBitmap getDocIds(String queryStr, String options) {
+ QueryParser queryParser = new QueryParser(TextObject.DEFAULT_FIELD, _analyzer);
+ Query query = null;
+ try {
+ query = queryParser.parse(queryStr);
+ } catch (ParseException e) {
+ LOGGER.error("Encountered exception while parsing query {}", queryStr, e);
+ throw new RuntimeException(e);
+ }
+ MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+ try {
+ Collector collector = createCollector(bitmap);
+ _searcher.search(query, collector);
+ } catch (IOException e) {
+ LOGGER.error("Encountered exception while executing search query {}", queryStr, e);
+ throw new RuntimeException(e);
+ }
+ return bitmap;
+ }
+
+ private Collector createCollector(final MutableRoaringBitmap bitmap) {
+ return new LuceneResultCollector(bitmap);
+ }
+
+ public static final class LuceneResultCollector implements Collector {
+ private final MutableRoaringBitmap bitmap;
+
+ public LuceneResultCollector(MutableRoaringBitmap bitmap) {
+ this.bitmap = bitmap;
+ }
+
+ @Override
+ public boolean needsScores() {
+ return false;
+ }
+
+ @Override
+ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
+ return new LeafCollector() {
+
+ @Override
+ public void setScorer(Scorer scorer) throws IOException {
+ // ignore
+ }
+
+ @Override
+ public void collect(int doc) throws IOException {
+ bitmap.add(doc);
+ }
+ };
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java
index 22ce9ad..9662d0e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.virtualcolumn;
import java.io.IOException;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.utils.Pairs;
+import org.apache.pinot.common.utils.Pairs.IntPair;
+import org.apache.pinot.core.common.Predicate;
import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
@@ -108,7 +110,10 @@ public class DocIdVirtualColumnProvider extends BaseVirtualColumnProvider {
public Pairs.IntPair getDocIds(int dictId) {
return new Pairs.IntPair(dictId, dictId);
}
-
+ @Override
+ public IntPair getDocIds(Predicate predicate) {
+ throw new UnsupportedOperationException("");
+ }
@Override
public void close()
throws IOException {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java
index 6082306..8672fed 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.virtualcolumn;
import java.io.IOException;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.utils.Pairs;
+import org.apache.pinot.common.utils.Pairs.IntPair;
+import org.apache.pinot.core.common.Predicate;
import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.io.reader.impl.v1.SortedIndexReader;
@@ -75,7 +77,10 @@ public abstract class SingleStringVirtualColumnProvider extends BaseVirtualColum
public SingleStringInvertedIndex(int length) {
_length = length;
}
-
+ @Override
+ public IntPair getDocIds(Predicate predicate) {
+ throw new UnsupportedOperationException("");
+ }
@Override
public Pairs.IntPair getDocIds(int dictId) {
if (dictId == 0) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneIndexClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneIndexClusterIntegrationTest.java
new file mode 100644
index 0000000..ab4d561
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneIndexClusterIntegrationTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.data.DimensionFieldSpec;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.tools.data.generator.AvroWriter;
+import org.apache.pinot.tools.query.comparison.StarTreeQueryGenerator;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
+public class LuceneIndexClusterIntegrationTest extends BaseClusterIntegrationTest {
+ protected static final String DEFAULT_TABLE_NAME = "myTable";
+ static final long TOTAL_DOCS = 1_000L;
+
+ protected Schema _schema;
+ private StarTreeQueryGenerator _queryGenerator;
+ private String _currentTable;
+
+ @Nonnull
+ @Override
+ protected String getTableName() {
+ return _currentTable;
+ }
+
+ @Nonnull
+ @Override
+ protected String getSchemaFileName() {
+ return null;
+ }
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServers(1);
+
+ _schema = new Schema();
+ FieldSpec jsonFieldSpec = new DimensionFieldSpec();
+ jsonFieldSpec.setDataType(DataType.BYTES);
+ jsonFieldSpec.setDefaultNullValue("{}".getBytes());
+ jsonFieldSpec.setObjectType("JSON");
+ jsonFieldSpec.setName("headers");
+ jsonFieldSpec.setSingleValueField(true);
+ _schema.addField(jsonFieldSpec);
+
+ // Create the tables
+ ArrayList<String> invertedIndexColumns = Lists.newArrayList("headers");
+ addOfflineTable(DEFAULT_TABLE_NAME, null, null, null, null, null, SegmentVersion.v1,
+ invertedIndexColumns, null, null);
+
+ setUpSegmentsAndQueryGenerator();
+
+ // Wait for all documents loaded
+ _currentTable = DEFAULT_TABLE_NAME;
+ waitForAllDocsLoaded(10_000);
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ return TOTAL_DOCS;
+ }
+
+ protected void setUpSegmentsAndQueryGenerator() throws Exception {
+ org.apache.avro.Schema avroSchema = AvroWriter.getAvroSchema(_schema);
+ DataFileWriter recordWriter =
+ new DataFileWriter<>(new GenericDatumWriter<GenericData.Record>(avroSchema));
+ String parent = "/tmp/luceneTest";
+ File avroFile = new File(parent, "part-" + 0 + ".avro");
+ avroFile.getParentFile().mkdirs();
+ recordWriter.create(avroSchema, avroFile);
+ ObjectMapper mapper = new ObjectMapper();
+ for (int i = 0; i < TOTAL_DOCS; i++) {
+ ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
+ objectNode.put("k1", "value" + i);
+ objectNode.put("k2", "value" + i);
+ String json = mapper.writeValueAsString(objectNode);
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(json.getBytes());
+ record.put("headers", byteBuffer);
+ recordWriter.append(record);
+ }
+ recordWriter.close();
+
+ // Unpack the Avro files
+ List<File> avroFiles = Lists.newArrayList(avroFile);
+
+ // Create and upload segments without star tree indexes from Avro data
+ createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false);
+
+ }
+
+ private void createAndUploadSegments(List<File> avroFiles, String tableName,
+ boolean createStarTreeIndex) throws Exception {
+ System.out.println("SEGMENT_DIR" + _segmentDir);
+ TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName,
+ createStarTreeIndex, null, null, _schema, executor);
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+
+ uploadSegments(_tarDir);
+ }
+
+ @Test
+ public void testQueries() throws Exception {
+ String pqlQuery =
+ "Select count(*) from " + DEFAULT_TABLE_NAME + " WHERE headers matches('k1:\"value\\-1\"','')";
+ JsonNode pinotResponse = postQuery(pqlQuery);
+ System.out.println(pinotResponse);
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ dropOfflineTable(DEFAULT_TABLE_NAME);
+
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+}
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 68393be..dd795c1 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -19,7 +19,8 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -101,6 +102,25 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-core</artifactId>
+ <version>7.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-queryparser</artifactId>
+ <version>7.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-analyzers-common</artifactId>
+ <version>7.6.0</version>
+ </dependency>
+ <dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.15</version>
@@ -180,15 +200,20 @@
<binFileExtensions>
<unix>.sh</unix>
</binFileExtensions>
- <!-- Set the target configuration directory to be used in the bin scripts -->
+ <!-- Set the target configuration directory to be used in the bin
+ scripts -->
<configurationDirectory>conf</configurationDirectory>
- <!-- Copy the contents from "/src/main/config" to the target configuration directory in the assembled application -->
+ <!-- Copy the contents from "/src/main/config" to the target configuration
+ directory in the assembled application -->
<copyConfigurationDirectory>false</copyConfigurationDirectory>
- <!-- Include the target configuration directory in the beginning of the classpath declaration in the bin scripts -->
+ <!-- Include the target configuration directory in the beginning
+ of the classpath declaration in the bin scripts -->
<includeConfigurationDirectoryInClasspath>false</includeConfigurationDirectoryInClasspath>
<assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory>
<!-- Extra JVM arguments that will be included in the bin scripts -->
- <extraJvmArguments>-Xms24G -Xmx24G -Dlog4j.configuration=log4j.properties</extraJvmArguments>
+ <extraJvmArguments>-Xms24G -Xmx24G
+ -Dlog4j.configuration=log4j.properties
+ </extraJvmArguments>
<!-- Generate bin scripts for windows and unix pr default -->
<platforms>
<platform>unix</platform>
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/LuceneBenchmark.java b/pinot-perf/src/main/java/org/apache/pinot/perf/LuceneBenchmark.java
new file mode 100644
index 0000000..2af2ed5
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/LuceneBenchmark.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.WildcardQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.pinot.core.segment.index.readers.LuceneInvertedIndexReader;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+public class LuceneBenchmark {
+
+ public static void main(String[] args) throws IOException, ParseException {
+ Map<String, Analyzer> analyzerMap = new HashMap<>();
+ Analyzer wrapper = new PerFieldAnalyzerWrapper(new StandardAnalyzer());
+ Directory index = new RAMDirectory();
+ IndexWriterConfig config = new IndexWriterConfig(wrapper);
+
+ try (IndexWriter writer = new IndexWriter(index, config)) {
+ for (int i = 0; i < 1000; i++) {
+ Document doc = new Document();
+ doc.add(new TextField("k1", "value-" + i, Field.Store.YES));
+ doc.add(new TextField("k2", "value-" + i, Field.Store.YES));
+ writer.addDocument(doc);
+ }
+ }
+ String querystr;
+ querystr = "k1:\"value1?0\"";
+ Query q = new QueryParser("Content", wrapper).parse(querystr);
+ q = new WildcardQuery(new Term("k1", QueryParser.escape("value1*")));
+
+ // 3. searching
+ IndexReader reader = DirectoryReader.open(index);
+ IndexSearcher searcher = new IndexSearcher(reader);
+ MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+ Collector collector = new LuceneInvertedIndexReader.LuceneResultCollector(bitmap);
+ searcher.search(q, collector);
+
+ // 4. display results
+ System.out.println("Query string: " + querystr);
+ System.out.println("Found " + bitmap.getCardinality() + " hits.");
+
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java
index 838b6d1..c299257 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java
@@ -36,7 +36,7 @@ public class ZookeeperLauncher {
private ZkServer _zkServer;
public ZookeeperLauncher() {
- this("/tmp");
+ this(org.apache.commons.io.FileUtils.getTempDirectoryPath());
}
public ZookeeperLauncher(String baseTempDir) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org