You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/10/28 20:00:02 UTC
[incubator-pinot] 02/02: Adding index creator and reader
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch json-indexing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit bdfd3f105d170f3dc02f0a25feb3b922fe3ffc6c
Author: kishoreg <g....@gmail.com>
AuthorDate: Sun Oct 18 22:05:51 2020 -0700
Adding index creator and reader
---
.../io/util/VarLengthBytesValueReaderWriter.java | 28 +-
.../operator/filter/BitmapBasedFilterOperator.java | 4 +-
.../core/operator/filter/FilterOperatorUtils.java | 2 +-
.../operator/filter/JSONMatchFilterOperator.java | 138 +++++
.../transform/function/CastTransformFunction.java | 2 +-
.../{Predicate.java => JSONMatchPredicate.java} | 57 +-
.../query/request/context/predicate/Predicate.java | 3 +-
.../core/segment/creator/impl/V1Constants.java | 1 +
.../segment/creator/impl/inv/JSONIndexCreator.java | 656 +++++++++++++++++++++
.../creator/impl/inv/NestedObjectIndexCreator.java | 158 -----
.../segment/index/loader/IndexLoadingConfig.java | 10 +
.../segment/index/loader/SegmentPreProcessor.java | 6 +
.../loader/invertedindex/JSONIndexHandler.java | 205 +++++++
.../segment/index/readers/JSONIndexReader.java | 147 +++++
.../pinot/core/segment/store/ColumnIndexType.java | 3 +-
.../core/segment/store/FilePerIndexDirectory.java | 5 +
.../pinot/spi/config/table/IndexingConfig.java | 9 +
.../java/org/apache/pinot/spi/data/FieldSpec.java | 2 +-
18 files changed, 1249 insertions(+), 187 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
index 5a6a25a..88f56f6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.core.io.util;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.Arrays;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -63,19 +66,19 @@ public class VarLengthBytesValueReaderWriter implements ValueReader {
/**
* Magic bytes used to identify the dictionary files written in variable length bytes format.
*/
- private static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;");
+ public static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;");
/**
* Increment this version if there are any structural changes in the store format and
* deal with backward compatibility correctly based on old versions.
*/
- private static final int VERSION = 1;
+ public static final int VERSION = 1;
// Offsets of different fields in the header. Having as constants for readability.
- private static final int VERSION_OFFSET = MAGIC_BYTES.length;
- private static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES;
- private static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES;
- private static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES;
+ public static final int VERSION_OFFSET = MAGIC_BYTES.length;
+ public static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES;
+ public static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES;
+ public static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES;
private final PinotDataBuffer _dataBuffer;
@@ -144,6 +147,19 @@ public class VarLengthBytesValueReaderWriter implements ValueReader {
return false;
}
+ public static byte[] getHeaderBytes(int numElements)
+ throws IOException {
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream(HEADER_LENGTH);
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.write(MAGIC_BYTES);
+ dos.writeInt(VERSION);
+ dos.writeInt(numElements);
+ dos.writeInt(HEADER_LENGTH);
+ dos.close();
+ return out.toByteArray();
+ }
+
private void writeHeader() {
for (int offset = 0; offset < MAGIC_BYTES.length; offset++) {
_dataBuffer.putByte(offset, MAGIC_BYTES[offset]);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
index d9c25e1..9c307a3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
@@ -38,9 +38,9 @@ public class BitmapBasedFilterOperator extends BaseFilterOperator {
private final boolean _exclusive;
private final int _numDocs;
- BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs) {
+ public BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, InvertedIndexReader invertedIndexReader, int numDocs) {
_predicateEvaluator = predicateEvaluator;
- _invertedIndexReader = dataSource.getInvertedIndex();
+ _invertedIndexReader = invertedIndexReader;
_docIds = null;
_exclusive = predicateEvaluator.isExclusive();
_numDocs = numDocs;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
index 3bc676e..5076ef6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
@@ -65,7 +65,7 @@ public class FilterOperatorUtils {
return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
}
if (dataSource.getInvertedIndex() != null) {
- return new BitmapBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
+ return new BitmapBasedFilterOperator(predicateEvaluator, dataSource.getInvertedIndex(), numDocs);
}
return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java
new file mode 100644
index 0000000..adb0550
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.EqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.InPredicate;
+import org.apache.pinot.core.query.request.context.predicate.NotEqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.NotInPredicate;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.segment.creator.impl.inv.JSONIndexCreator;
+import org.apache.pinot.core.segment.index.readers.JSONIndexReader;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+@SuppressWarnings("rawtypes")
+public class JSONMatchFilterOperator extends BaseFilterOperator {
+ private static final String OPERATOR_NAME = "JSONMatchFilterOperator";
+
+ private final JSONIndexReader _nestedObjectIndexReader;
+ private final int _numDocs;
+ private String _column;
+ private final FilterContext _filterContext;
+
+ public JSONMatchFilterOperator(String column, FilterContext filterContext, JSONIndexReader nestedObjectIndexReader,
+ int numDocs) {
+ _column = column;
+ _filterContext = filterContext;
+ _nestedObjectIndexReader = nestedObjectIndexReader;
+ _numDocs = numDocs;
+ }
+
+ @Override
+ protected FilterBlock getNextBlock() {
+ ImmutableRoaringBitmap docIds = process(_filterContext);
+ System.out.println("docIds = " + docIds);
+ return new FilterBlock(new BitmapDocIdSet(docIds, _numDocs));
+ }
+
+ private MutableRoaringBitmap process(FilterContext filterContext) {
+ List<FilterContext> children = _filterContext.getChildren();
+ MutableRoaringBitmap resultBitmap = null;
+
+ switch (filterContext.getType()) {
+ case AND:
+ for (FilterContext child : children) {
+ if (resultBitmap == null) {
+ resultBitmap = process(child);
+ } else {
+ resultBitmap.and(process(child));
+ }
+ }
+ break;
+ case OR:
+ for (FilterContext child : children) {
+ if (resultBitmap == null) {
+ resultBitmap = process(child);
+ } else {
+ resultBitmap.or(process(child));
+ }
+ }
+ break;
+ case PREDICATE:
+ Predicate predicate = filterContext.getPredicate();
+ Predicate newPredicate = null;
+ switch (predicate.getType()) {
+
+ case EQ:
+ EqPredicate eqPredicate = (EqPredicate) predicate;
+ newPredicate = new EqPredicate(ExpressionContext.forIdentifier(_column),
+ eqPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR + eqPredicate
+ .getValue());
+ break;
+ case NOT_EQ:
+ NotEqPredicate nEqPredicate = (NotEqPredicate) predicate;
+ newPredicate = new NotEqPredicate(ExpressionContext.forIdentifier(_column),
+ nEqPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR
+ + nEqPredicate.getValue());
+ break;
+ case IN:
+ InPredicate inPredicate = (InPredicate) predicate;
+ List<String> newInValues = inPredicate.getValues().stream().map(
+ value -> inPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR
+ + value).collect(Collectors.toList());
+ newPredicate = new InPredicate(ExpressionContext.forIdentifier(_column), newInValues);
+ break;
+ case NOT_IN:
+ NotInPredicate notInPredicate = (NotInPredicate) predicate;
+ List<String> newNotInValues = inPredicate.getValues().stream().map(
+ value -> notInPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR
+ + value).collect(Collectors.toList());
+ newPredicate = new InPredicate(ExpressionContext.forIdentifier(_column), newNotInValues);
+ break;
+ case IS_NULL:
+ newPredicate = predicate;
+ break;
+ case IS_NOT_NULL:
+ newPredicate = predicate;
+ break;
+ case RANGE:
+ case REGEXP_LIKE:
+ case TEXT_MATCH:
+ throw new UnsupportedOperationException("JSON Match does not support RANGE, REGEXP or TEXTMATCH");
+ }
+
+ resultBitmap = _nestedObjectIndexReader.getMatchingDocIds(newPredicate);
+ break;
+ }
+ return resultBitmap;
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java
index c826446..cb6227f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java
@@ -30,7 +30,7 @@ import org.apache.pinot.core.operator.transform.transformer.datetime.SDFToSDFTra
import org.apache.pinot.core.plan.DocIdSetPlanNode;
-public class CastTransformFunction extends BaseTransformFunction {
+ public class CastTransformFunction extends BaseTransformFunction {
public static final String FUNCTION_NAME = "cast";
private TransformFunction _transformFunction;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java
index 203e950..97ad787 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java
@@ -18,30 +18,55 @@
*/
package org.apache.pinot.core.query.request.context.predicate;
+import java.util.Objects;
import org.apache.pinot.core.query.request.context.ExpressionContext;
/**
- * The {@code Predicate} class represents the predicate in the filter.
- * <p>Currently the query engine only accepts string literals as the right-hand side of the predicate, so we store the
- * right-hand side of the predicate as string or list of strings.
+ * Predicate for JSON_MATCH.
*/
-public interface Predicate {
- enum Type {
- EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL;
+public class JSONMatchPredicate implements Predicate {
+ private final ExpressionContext _lhs;
+ private final String _value;
- public boolean isExclusive() {
- return this == NOT_EQ || this == NOT_IN || this == IS_NOT_NULL;
+ public JSONMatchPredicate(ExpressionContext lhs, String value) {
+ _lhs = lhs;
+ _value = value;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.JSON_MATCH;
+ }
+
+ @Override
+ public ExpressionContext getLhs() {
+ return _lhs;
+ }
+
+ public String getValue() {
+ return _value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JSONMatchPredicate)) {
+ return false;
}
+ JSONMatchPredicate that = (JSONMatchPredicate) o;
+ return Objects.equals(_lhs, that._lhs) && Objects.equals(_value, that._value);
}
- /**
- * Returns the type of the predicate.
- */
- Type getType();
+ @Override
+ public int hashCode() {
+ return Objects.hash(_lhs, _value);
+ }
- /**
- * Returns the left-hand side expression of the predicate.
- */
- ExpressionContext getLhs();
+ @Override
+ public String toString() {
+ return "json_match(" + _lhs + ",'" + _value + "')";
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
index 203e950..a204fc6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
@@ -28,7 +28,7 @@ import org.apache.pinot.core.query.request.context.ExpressionContext;
*/
public interface Predicate {
enum Type {
- EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL;
+ EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL, JSON_MATCH;
public boolean isExclusive() {
return this == NOT_EQ || this == NOT_IN || this == IS_NOT_NULL;
@@ -44,4 +44,5 @@ public interface Predicate {
* Returns the left-hand side expression of the predicate.
*/
ExpressionContext getLhs();
+
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 0498f8c..e745120 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -36,6 +36,7 @@ public class V1Constants {
public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
+ public static final String JSON_INDEX_FILE_EXTENSION = ".json.idx";
public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java
new file mode 100644
index 0000000..705f690
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java
@@ -0,0 +1,656 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.wnameless.json.flattener.JsonFlattener;
+import com.google.common.io.Files;
+import com.google.common.primitives.UnsignedBytes;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.filter.JSONMatchFilterOperator;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.EqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.segment.index.readers.JSONIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class JSONIndexCreator implements Closeable {
+
+ //separator used to join the key and value to create posting list key
+ public static String POSTING_LIST_KEY_SEPARATOR = "|";
+ static int FLUSH_THRESHOLD = 50_000;
+ static int VERSION = 1;
+ private final File flattenedDocId2RootDocIdMappingFile;
+ private final File postingListFile;
+ private File dictionaryheaderFile;
+ private File dictionaryOffsetFile;
+ private File dictionaryFile;
+ private File invertedIndexOffsetFile;
+ private File invertedIndexFile;
+ private File outputIndexFile;
+
+ private int docId = 0;
+ private int numFlatennedDocId = 0;
+ int chunkId = 0;
+
+ private DataOutputStream postingListWriter;
+ private DataOutputStream flattenedDocId2RootDocIdWriter;
+
+ Map<String, List<Integer>> postingListMap = new TreeMap<>();
+ List<Integer> flattenedDocIdList = new ArrayList<>();
+ List<Integer> postingListChunkOffsets = new ArrayList<>();
+ List<Integer> chunkLengths = new ArrayList<>();
+ private FieldSpec fieldSpec;
+
+ public JSONIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType)
+ throws IOException {
+ this.fieldSpec = fieldSpec;
+ System.out.println("indexDir = " + indexDir);
+
+ String name = fieldSpec.getName();
+ postingListFile = new File(indexDir + name + "_postingList.buf");
+ postingListWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(postingListFile)));
+ postingListChunkOffsets.add(postingListWriter.size());
+
+ dictionaryheaderFile = new File(indexDir, name + "_dictionaryHeader.buf");
+ dictionaryOffsetFile = new File(indexDir, name + "_dictionaryOffset.buf");
+ dictionaryFile = new File(indexDir, name + "_dictionary.buf");
+ invertedIndexOffsetFile = new File(indexDir, name + "_invertedIndexOffset.buf");
+ invertedIndexFile = new File(indexDir, name + "_invertedIndex.buf");
+ flattenedDocId2RootDocIdMappingFile = new File(indexDir, name + "_flattenedDocId.buf");
+ flattenedDocId2RootDocIdWriter =
+ new DataOutputStream(new BufferedOutputStream(new FileOutputStream(flattenedDocId2RootDocIdMappingFile)));
+
+ //output file
+ outputIndexFile = new File(indexDir, name + ".nested.idx");
+ }
+
+ public void add(byte[] data)
+ throws IOException {
+
+ JsonNode jsonNode = new ObjectMapper().readTree(data);
+ List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
+ for (Map<String, String> map : flattenedMapList) {
+ //
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ //handle key posting list
+ String key = entry.getKey();
+
+ List<Integer> keyPostingList = postingListMap.get(key);
+ if (keyPostingList == null) {
+ keyPostingList = new ArrayList<>();
+ postingListMap.put(key, keyPostingList);
+ }
+ keyPostingList.add(numFlatennedDocId);
+
+ //handle keyvalue posting list
+ String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue();
+ List<Integer> keyValuePostingList = postingListMap.get(keyValue);
+ if (keyValuePostingList == null) {
+ keyValuePostingList = new ArrayList<>();
+ postingListMap.put(keyValue, keyValuePostingList);
+ }
+ keyValuePostingList.add(numFlatennedDocId);
+ }
+ //flattenedDocId2RootDocIdMapping
+ flattenedDocIdList.add(numFlatennedDocId);
+
+ numFlatennedDocId++;
+ }
+ docId++;
+
+ //flush data
+ if (docId % FLUSH_THRESHOLD == 0) {
+ flush();
+ }
+ }
+
+ /**
+ * Multi value
+ * @param dataArray
+ * @param length
+ * @throws IOException
+ */
+ public void add(byte[][] dataArray, int length)
+ throws IOException {
+
+ for (int i = 0; i < length; i++) {
+ byte[] data = dataArray[i];
+ JsonNode jsonNode = new ObjectMapper().readTree(data);
+ List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
+ for (Map<String, String> map : flattenedMapList) {
+ //
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ //handle key posting list
+ String key = entry.getKey();
+
+ List<Integer> keyPostingList = postingListMap.get(key);
+ if (keyPostingList == null) {
+ keyPostingList = new ArrayList<>();
+ postingListMap.put(key, keyPostingList);
+ }
+ keyPostingList.add(numFlatennedDocId);
+
+ //handle keyvalue posting list
+ String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue();
+ List<Integer> keyValuePostingList = postingListMap.get(keyValue);
+ if (keyValuePostingList == null) {
+ keyValuePostingList = new ArrayList<>();
+ postingListMap.put(keyValue, keyValuePostingList);
+ }
+ keyValuePostingList.add(numFlatennedDocId);
+ }
+ //flattenedDocId2RootDocIdMapping
+ flattenedDocIdList.add(numFlatennedDocId);
+
+ numFlatennedDocId++;
+ }
+ }
+ docId++;
+
+ //flush data
+ if (docId % FLUSH_THRESHOLD == 0) {
+ flush();
+ }
+ }
+
+ public void seal()
+ throws IOException {
+
+ flush();
+
+ flattenedDocId2RootDocIdWriter.close();
+ postingListWriter.close();
+
+ //key posting list merging
+ System.out.println("InvertedIndex");
+ System.out.println("=================");
+
+ int maxKeyLength = createInvertedIndex(postingListFile, postingListChunkOffsets, chunkLengths);
+ System.out.println("=================");
+
+ int flattenedDocid = 0;
+ DataInputStream flattenedDocId2RootDocIdReader =
+ new DataInputStream(new BufferedInputStream(new FileInputStream(flattenedDocId2RootDocIdMappingFile)));
+ int[] rootDocIdArray = new int[numFlatennedDocId];
+ while (flattenedDocid < numFlatennedDocId) {
+ rootDocIdArray[flattenedDocid++] = flattenedDocId2RootDocIdReader.readInt();
+ }
+ System.out.println("FlattenedDocId to RootDocId Mapping = ");
+ System.out.println(Arrays.toString(rootDocIdArray));
+
+ //PUT all contents into one file
+
+ //header
+ // version + maxDictionaryLength + [store the offsets + length for each one (dictionary offset file, dictionaryFile, index offset file, index file, flattened docId to rootDocId file)]
+ long headerSize = 2 * Integer.BYTES + 6 * 2 * Long.BYTES;
+
+ long dataSize =
+ dictionaryheaderFile.length() + dictionaryOffsetFile.length() + dictionaryFile.length() + invertedIndexFile
+ .length() + invertedIndexOffsetFile.length() + flattenedDocId2RootDocIdMappingFile.length();
+
+ long totalSize = headerSize + dataSize;
+ PinotDataBuffer pinotDataBuffer =
+ PinotDataBuffer.mapFile(outputIndexFile, false, 0, totalSize, ByteOrder.BIG_ENDIAN, "Nested inverted index");
+
+ pinotDataBuffer.putInt(0, VERSION);
+ pinotDataBuffer.putInt(1 * Integer.BYTES, maxKeyLength);
+ long writtenBytes = headerSize;
+
+ //add dictionary header
+ int bufferId = 0;
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryheaderFile.length());
+ pinotDataBuffer.readFrom(writtenBytes, dictionaryheaderFile, 0, dictionaryheaderFile.length());
+ writtenBytes += dictionaryheaderFile.length();
+
+ //add dictionary offset
+ bufferId = bufferId + 1;
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryOffsetFile.length());
+ pinotDataBuffer.readFrom(writtenBytes, dictionaryOffsetFile, 0, dictionaryOffsetFile.length());
+ writtenBytes += dictionaryOffsetFile.length();
+
+ //add dictionary
+ bufferId = bufferId + 1;
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryFile.length());
+ pinotDataBuffer.readFrom(writtenBytes, dictionaryFile, 0, dictionaryFile.length());
+ writtenBytes += dictionaryFile.length();
+
+ //add index offset
+ bufferId = bufferId + 1;
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, invertedIndexOffsetFile.length());
+ pinotDataBuffer.readFrom(writtenBytes, invertedIndexOffsetFile, 0, invertedIndexOffsetFile.length());
+ writtenBytes += invertedIndexOffsetFile.length();
+
+ //add index data
+ bufferId = bufferId + 1;
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, invertedIndexFile.length());
+ pinotDataBuffer.readFrom(writtenBytes, invertedIndexFile, 0, invertedIndexFile.length());
+ writtenBytes += invertedIndexFile.length();
+
+ //add flattened docid to root doc id mapping
+ bufferId = bufferId + 1;
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+ pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, flattenedDocId2RootDocIdMappingFile.length());
+ pinotDataBuffer
+ .readFrom(writtenBytes, flattenedDocId2RootDocIdMappingFile, 0, flattenedDocId2RootDocIdMappingFile.length());
+ writtenBytes += flattenedDocId2RootDocIdMappingFile.length();
+ }
+
+ private long getBufferStartOffset(int bufferId) {
+ return 2 * Integer.BYTES + 2 * bufferId * Long.BYTES;
+ }
+
+ private int createInvertedIndex(File postingListFile, List<Integer> postingListChunkOffsets,
+ List<Integer> chunkLengthList)
+ throws IOException {
+
+ List<Iterator<ImmutablePair<byte[], int[]>>> chunkIterators = new ArrayList<>();
+
+ for (int i = 0; i < chunkLengthList.size(); i++) {
+
+ final DataInputStream postingListFileReader =
+ new DataInputStream(new BufferedInputStream(new FileInputStream(postingListFile)));
+ postingListFileReader.skipBytes(postingListChunkOffsets.get(i));
+ final int length = chunkLengthList.get(i);
+ chunkIterators.add(new Iterator<ImmutablePair<byte[], int[]>>() {
+ int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return index < length;
+ }
+
+ @Override
+ public ImmutablePair<byte[], int[]> next() {
+ try {
+ int keyLength = postingListFileReader.readInt();
+ byte[] keyBytes = new byte[keyLength];
+ postingListFileReader.read(keyBytes);
+
+ int postingListLength = postingListFileReader.readInt();
+ int[] postingList = new int[postingListLength];
+ for (int i = 0; i < postingListLength; i++) {
+ postingList[i] = postingListFileReader.readInt();
+ }
+ index++;
+ return ImmutablePair.of(keyBytes, postingList);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+ final Comparator<byte[]> byteArrayComparator = UnsignedBytes.lexicographicalComparator();
+
+ PriorityQueue<ImmutablePair<Integer, ImmutablePair<byte[], int[]>>> queue =
+ new PriorityQueue<>(chunkLengthList.size(),
+ (o1, o2) -> byteArrayComparator.compare(o1.getRight().getLeft(), o2.getRight().getLeft()));
+ for (int i = 0; i < chunkIterators.size(); i++) {
+ Iterator<ImmutablePair<byte[], int[]>> iterator = chunkIterators.get(i);
+ if (iterator.hasNext()) {
+ queue.offer(ImmutablePair.of(i, iterator.next()));
+ }
+ }
+ byte[] prevKey = null;
+ RoaringBitmap roaringBitmap = new RoaringBitmap();
+
+ Writer writer = new Writer(dictionaryheaderFile, dictionaryOffsetFile, dictionaryFile, invertedIndexOffsetFile,
+ invertedIndexFile);
+ while (!queue.isEmpty()) {
+ ImmutablePair<Integer, ImmutablePair<byte[], int[]>> poll = queue.poll();
+ byte[] currKey = poll.getRight().getLeft();
+ if (prevKey != null && byteArrayComparator.compare(prevKey, currKey) != 0) {
+ System.out.println(new String(prevKey) + ":" + roaringBitmap);
+ writer.add(prevKey, roaringBitmap);
+ roaringBitmap.clear();
+ }
+
+ roaringBitmap.add(poll.getRight().getRight());
+ prevKey = currKey;
+
+ //add the next key from the chunk where the currKey was removed from
+ Iterator<ImmutablePair<byte[], int[]>> iterator = chunkIterators.get(poll.getLeft());
+ if (iterator.hasNext()) {
+ queue.offer(ImmutablePair.of(poll.getLeft(), iterator.next()));
+ }
+ }
+
+ if (prevKey != null) {
+ writer.add(prevKey, roaringBitmap);
+ }
+ writer.finish();
+ return writer.getMaxDictionaryValueLength();
+ }
+
+ private void flush()
+ throws IOException {
+ //write the key (length|actual bytes) - posting list(length, flattenedDocIds)
+ System.out.println("postingListMap = " + postingListMap);
+ for (Map.Entry<String, List<Integer>> entry : postingListMap.entrySet()) {
+ byte[] keyBytes = entry.getKey().getBytes(Charset.forName("UTF-8"));
+ postingListWriter.writeInt(keyBytes.length);
+ postingListWriter.write(keyBytes);
+ List<Integer> flattenedDocIdList = entry.getValue();
+ postingListWriter.writeInt(flattenedDocIdList.size());
+ for (int flattenedDocId : flattenedDocIdList) {
+ postingListWriter.writeInt(flattenedDocId);
+ }
+ }
+
+ //write flattened doc id to root docId mapping
+ for (int rootDocId : flattenedDocIdList) {
+ flattenedDocId2RootDocIdWriter.writeInt(rootDocId);
+ }
+ chunkLengths.add(postingListMap.size());
+ postingListChunkOffsets.add(postingListWriter.size());
+ postingListMap.clear();
+ flattenedDocIdList.clear();
+ }
+
+ private static List<Map<String, String>> unnestJson(JsonNode root) {
+ Iterator<Map.Entry<String, JsonNode>> fields = root.fields();
+ Map<String, String> flattenedSingleValuesMap = new TreeMap<>();
+ Map<String, JsonNode> arrNodes = new TreeMap<>();
+ Map<String, JsonNode> objectNodes = new TreeMap<>();
+ List<Map<String, String>> resultList = new ArrayList<>();
+ List<Map<String, String>> tempResultList = new ArrayList<>();
+ while (fields.hasNext()) {
+ Map.Entry<String, JsonNode> child = fields.next();
+ if (child.getValue().isValueNode()) {
+ //Normal value node
+ flattenedSingleValuesMap.put(child.getKey(), child.getValue().asText());
+ } else if (child.getValue().isArray()) {
+ //Array Node: Process these nodes later
+ arrNodes.put(child.getKey(), child.getValue());
+ } else {
+ //Object Node
+ objectNodes.put(child.getKey(), child.getValue());
+ }
+ }
+ for (String objectNodeKey : objectNodes.keySet()) {
+ JsonNode objectNode = objectNodes.get(objectNodeKey);
+ modifyKeysInMap(flattenedSingleValuesMap, tempResultList, objectNodeKey, objectNode);
+ }
+ if (tempResultList.isEmpty()) {
+ tempResultList.add(flattenedSingleValuesMap);
+ }
+ if (!arrNodes.isEmpty()) {
+ for (Map<String, String> flattenedMapElement : tempResultList) {
+ for (String arrNodeKey : arrNodes.keySet()) {
+ JsonNode arrNode = arrNodes.get(arrNodeKey);
+ for (JsonNode arrNodeElement : arrNode) {
+ modifyKeysInMap(flattenedMapElement, resultList, arrNodeKey, arrNodeElement);
+ }
+ }
+ }
+ } else {
+ resultList.addAll(tempResultList);
+ }
+ return resultList;
+ }
+
+ private static void modifyKeysInMap(Map<String, String> flattenedMap, List<Map<String, String>> resultList,
+ String arrNodeKey, JsonNode arrNode) {
+ List<Map<String, String>> objectResult = unnestJson(arrNode);
+ for (Map<String, String> flattenedObject : objectResult) {
+ Map<String, String> flattenedObjectCopy = new TreeMap<>(flattenedMap);
+ for (Map.Entry<String, String> entry : flattenedObject.entrySet()) {
+ flattenedObjectCopy.put(arrNodeKey + "." + entry.getKey(), entry.getValue());
+ }
+ resultList.add(flattenedObjectCopy);
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+
+ }
+
+ private class Writer {
+ private DataOutputStream _dictionaryHeaderWriter;
+ private DataOutputStream _dictionaryOffsetWriter;
+ private File _dictionaryOffsetFile;
+ private DataOutputStream _dictionaryWriter;
+ private DataOutputStream _invertedIndexOffsetWriter;
+ private File _invertedIndexOffsetFile;
+ private DataOutputStream _invertedIndexWriter;
+ private int _dictId;
+ private int _dictOffset;
+ private int _invertedIndexOffset;
+ int _maxDictionaryValueLength = Integer.MIN_VALUE;
+
+ public Writer(File dictionaryheaderFile, File dictionaryOffsetFile, File dictionaryFile,
+ File invertedIndexOffsetFile, File invertedIndexFile)
+ throws IOException {
+ _dictionaryHeaderWriter =
+ new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryheaderFile)));
+
+ _dictionaryOffsetWriter =
+ new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryOffsetFile)));
+ _dictionaryOffsetFile = dictionaryOffsetFile;
+ _dictionaryWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryFile)));
+ _invertedIndexOffsetWriter =
+ new DataOutputStream(new BufferedOutputStream(new FileOutputStream(invertedIndexOffsetFile)));
+ _invertedIndexOffsetFile = invertedIndexOffsetFile;
+ _invertedIndexWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(invertedIndexFile)));
+ _dictId = 0;
+ _dictOffset = 0;
+ _invertedIndexOffset = 0;
+ }
+
+ public void add(byte[] key, RoaringBitmap roaringBitmap)
+ throws IOException {
+ if (key.length > _maxDictionaryValueLength) {
+ _maxDictionaryValueLength = key.length;
+ }
+ //write the key to dictionary
+ _dictionaryOffsetWriter.writeInt(_dictOffset);
+ _dictionaryWriter.write(key);
+
+ //write the roaringBitmap to inverted index
+ _invertedIndexOffsetWriter.writeInt(_invertedIndexOffset);
+
+ int serializedSizeInBytes = roaringBitmap.serializedSizeInBytes();
+ byte[] serializedRoaringBitmap = new byte[serializedSizeInBytes];
+ ByteBuffer serializedRoaringBitmapBuffer = ByteBuffer.wrap(serializedRoaringBitmap);
+ roaringBitmap.serialize(serializedRoaringBitmapBuffer);
+ _invertedIndexWriter.write(serializedRoaringBitmap);
+ System.out.println(
+ "dictId = " + _dictId + ", dict offset:" + _dictOffset + ", valueLength:" + key.length + ", inv offset:"
+ + _invertedIndexOffset + ", serializedSizeInBytes:" + serializedSizeInBytes);
+
+ //increment offsets
+ _dictOffset = _dictOffset + key.length;
+ _invertedIndexOffset = _invertedIndexOffset + serializedSizeInBytes;
+ //increment the dictionary id
+ _dictId = _dictId + 1;
+ }
+
+ void finish()
+ throws IOException {
+ //InvertedIndexReader and VarlengthBytesValueReaderWriter needs one extra entry for offsets since it computes the length for index i using offset[i+1] - offset[i]
+ _invertedIndexOffsetWriter.writeInt(_invertedIndexOffset);
+ _dictionaryOffsetWriter.writeInt(_dictOffset);
+
+ byte[] headerBytes = VarLengthBytesValueReaderWriter.getHeaderBytes(_dictId);
+ _dictionaryHeaderWriter.write(headerBytes);
+ System.out.println("headerBytes = " + Arrays.toString(headerBytes));
+
+ _dictionaryHeaderWriter.close();
+ _dictionaryOffsetWriter.close();
+ _dictionaryWriter.close();
+ _invertedIndexOffsetWriter.close();
+ _invertedIndexWriter.close();
+
+ //data offsets started with zero but the actual dictionary and index will contain (header + offsets + data). so all the offsets must be adjusted ( i.e add size(header) + size(offset) to each offset value)
+ PinotDataBuffer dictionaryOffsetBuffer = PinotDataBuffer
+ .mapFile(dictionaryOffsetFile, false, 0, _dictionaryOffsetFile.length(), ByteOrder.BIG_ENDIAN,
+ "dictionary offset file");
+ int dictOffsetBase = _dictionaryHeaderWriter.size() + _dictionaryOffsetWriter.size();
+ for (int i = 0; i < _dictId + 1; i++) {
+ int offset = dictionaryOffsetBuffer.getInt(i * Integer.BYTES);
+ int newOffset = offset + dictOffsetBase;
+ dictionaryOffsetBuffer.putInt(i * Integer.BYTES, offset + dictOffsetBase);
+ System.out.println("dictId = " + i + ", offset = " + offset + ", newOffset = " + newOffset);
+ }
+
+ PinotDataBuffer invIndexOffsetBuffer = PinotDataBuffer
+ .mapFile(invertedIndexOffsetFile, false, 0, invertedIndexOffsetFile.length(), ByteOrder.BIG_ENDIAN,
+ "invertedIndexOffsetFile");
+ int invIndexOffsetBase = _invertedIndexOffsetWriter.size();
+ for (int i = 0; i < _dictId + 1; i++) {
+ int offset = invIndexOffsetBuffer.getInt(i * Integer.BYTES);
+ int newOffset = offset + invIndexOffsetBase;
+ System.out.println("offset = " + offset + ", newOffset = " + newOffset);
+
+ invIndexOffsetBuffer.putInt(i * Integer.BYTES, newOffset);
+ }
+
+ invIndexOffsetBuffer.close();
+ dictionaryOffsetBuffer.close();
+ }
+
+ public int getMaxDictionaryValueLength() {
+ return _maxDictionaryValueLength;
+ }
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+
+ String json0 = " { \"a\" : { \"b\" : 1, \"c\": 3, \"d\": [{\"x\" : 1}, {\"y\" : 1}] }, \"e\": \"f\", \"g\":2.3 }";
+ String json1 =
+ " { \"name\" : \"adam\", \"age\": 30, \"country\": \"us\", \"address\": {\"number\" : 112, \"street\": \"main st\", \"country\": \"us\" } }";
+ String json2 = " { \"name\" : \"adam\", \"age\": 30 }";
+ String json3 = "{\n" + " \"name\" : \"adam\",\n" + " \"age\" : 30,\n" + " \"country\" : \"us\",\n"
+ + " \"addresses\" : [{\n" + " \"number\" : 1,\n" + " \"street\" : \"main st\",\n"
+ + " \"country\" : \"us\"\n" + " }, {\n" + " \"number\" : 2,\n" + " \"street\" : \"second st\",\n"
+ + " \"country\" : \"us\"\n" + " }, {\n" + " \"number\" : 3,\n" + " \"street\" : \"third st\",\n"
+ + " \"country\" : \"us\"\n" + " }]\n" + "}\n";
+
+ String json4 =
+ "{\n" + " \"year\": [\n" + " 2018\n" + " ],\n" + " \"customers\": [\n" + " {\n"
+ + " \"name\": \"John\",\n" + " \"contact\": [\n" + " {\n"
+ + " \"phone\": \"home\",\n" + " \"number\": \"333-3334\"\n"
+ + " }\n" + " ]\n" + " },\n" + " {\n"
+ + " \"name\": \"Jane\",\n" + " \"contact\": [\n" + " {\n"
+ + " \"phone\": \"home\",\n" + " \"number\": \"555-5556\"\n"
+ + " }\n" + " ],\n" + " \"surname\": \"Shaw\"\n" + " }\n"
+ + " ]\n" + "}";
+
+ String json5 = "{ \n" + " \"accounting\" : [ \n" + " { \"firstName\" : \"John\", \n"
+ + " \"lastName\" : \"Doe\",\n" + " \"age\" : 23 },\n" + "\n"
+ + " { \"firstName\" : \"Mary\", \n" + " \"lastName\" : \"Smith\",\n"
+ + " \"age\" : 32 }\n" + " ], \n"
+ + " \"sales\" : [ \n" + " { \"firstName\" : \"Sally\", \n"
+ + " \"lastName\" : \"Green\",\n" + " \"age\" : 27 },\n"
+ + "\n" + " { \"firstName\" : \"Jim\", \n"
+ + " \"lastName\" : \"Galley\",\n" + " \"age\" : 41 }\n"
+ + " ] \n" + "} ";
+
+ String json = json3;
+ System.out.println("json = " + json);
+ JsonNode rawJsonNode = new ObjectMapper().readTree(json);
+
+ System.out.println(
+ "rawJsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(rawJsonNode));
+ String flattenJson = JsonFlattener.flatten(json);
+
+ System.out.println("flattenJson = " + flattenJson);
+ JsonNode jsonNode = new ObjectMapper().readTree(flattenJson);
+
+ System.out
+ .println("jsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode));
+ Map<String, Object> stringObjectMap = JsonFlattener.flattenAsMap(json);
+ System.out.println("JsonFlattener.flattenAsMap(json) = " + stringObjectMap);
+ FieldSpec fieldSpec = new DimensionFieldSpec();
+ fieldSpec.setName("person");
+ File tempDir = Files.createTempDir();
+ JSONIndexCreator creator = new JSONIndexCreator(tempDir, fieldSpec, FieldSpec.DataType.BYTES);
+ List<Map<String, String>> maps = creator.unnestJson(rawJsonNode);
+ System.out.println("maps = " + maps.toString().replaceAll("},", "}\n"));
+ creator.add(json.getBytes());
+
+ creator.seal();
+ System.out.println("Output Dir = " + tempDir);
+ System.out.println("FileUtils.listFiles(tempDir, null, true) = " + FileUtils.listFiles(tempDir, null, true).stream()
+ .map(file -> file.getName()).collect(Collectors.toList()));
+
+ //Test reader
+ PinotDataBuffer buffer =
+ PinotDataBuffer.mapReadOnlyBigEndianFile(new File(tempDir, fieldSpec.getName() + ".nested.idx"));
+ JSONIndexReader reader = new JSONIndexReader(buffer);
+ ExpressionContext lhs = ExpressionContext.forIdentifier("person");
+ Predicate predicate = new EqPredicate(lhs, "addresses.street" + POSTING_LIST_KEY_SEPARATOR + "third st");
+ MutableRoaringBitmap matchingDocIds = reader.getMatchingDocIds(predicate);
+ System.out.println("matchingDocIds = " + matchingDocIds);
+
+ //Test filter operator
+ FilterContext filterContext = QueryContextConverterUtils
+ .getFilter(CalciteSqlParser.compileToExpression("name='adam' AND addresses.street='main st'"));
+ int numDocs = 1;
+ JSONMatchFilterOperator operator = new JSONMatchFilterOperator("person", filterContext, reader, numDocs);
+ FilterBlock filterBlock = operator.nextBlock();
+ BlockDocIdIterator iterator = filterBlock.getBlockDocIdSet().iterator();
+ int docId = -1;
+ while ((docId = iterator.next()) > 0) {
+ System.out.println("docId = " + docId);
+ }
+ }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java
deleted file mode 100644
index adfa19d..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.creator.impl.inv;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.github.wnameless.json.flattener.JsonFlattener;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.pinot.spi.data.FieldSpec;
-
-
-public class NestedObjectIndexCreator {
-
- public NestedObjectIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType) {
-
- }
-
- private static List<Map<String, String>> unnestJson(JsonNode root) {
- Iterator<Map.Entry<String, JsonNode>> fields = root.fields();
- Map<String, String> flattenedSingleValuesMap = new TreeMap<>();
- Map<String, JsonNode> arrNodes = new TreeMap<>();
- Map<String, JsonNode> objectNodes = new TreeMap<>();
- List<Map<String, String>> resultList = new ArrayList<>();
- List<Map<String, String>> tempResultList = new ArrayList<>();
- while (fields.hasNext()) {
- Map.Entry<String, JsonNode> child = fields.next();
- if (child.getValue().isValueNode()) {
- //Normal value node
- flattenedSingleValuesMap.put(child.getKey(), child.getValue().asText());
- } else if (child.getValue().isArray()) {
- //Array Node: Process these nodes later
- arrNodes.put(child.getKey(), child.getValue());
- } else {
- //Object Node
- objectNodes.put(child.getKey(), child.getValue());
- }
- }
- for (String objectNodeKey : objectNodes.keySet()) {
- JsonNode objectNode = objectNodes.get(objectNodeKey);
- modifyKeysInMap(flattenedSingleValuesMap, tempResultList, objectNodeKey, objectNode);
- }
- if (tempResultList.isEmpty()) {
- tempResultList.add(flattenedSingleValuesMap);
- }
- if (!arrNodes.isEmpty()) {
- for (Map<String, String> flattenedMapElement : tempResultList) {
- for (String arrNodeKey : arrNodes.keySet()) {
- JsonNode arrNode = arrNodes.get(arrNodeKey);
- for (JsonNode arrNodeElement : arrNode) {
- modifyKeysInMap(flattenedMapElement, resultList, arrNodeKey, arrNodeElement);
- }
- }
- }
- } else {
- resultList.addAll(tempResultList);
- }
- return resultList;
- }
-
- private static void modifyKeysInMap(Map<String, String> flattenedMap, List<Map<String, String>> resultList,
- String arrNodeKey, JsonNode arrNode) {
- List<Map<String, String>> objectResult = unnestJson(arrNode);
- for (Map<String, String> flattenedObject : objectResult) {
- Map<String, String> flattenedObjectCopy = new TreeMap<>(flattenedMap);
- for (Map.Entry<String, String> entry : flattenedObject.entrySet()) {
- flattenedObjectCopy.put(arrNodeKey + "." + entry.getKey(), entry.getValue());
- }
- resultList.add(flattenedObjectCopy);
- }
- }
-
- public void add(byte[] data)
- throws IOException {
-
- JsonNode jsonNode = new ObjectMapper().readTree(data);
- List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
- for (Map<String, String> map : flattenedMapList) {
-
- }
- }
-
- public static void main(String[] args)
- throws IOException {
-
- String json0 = " { \"a\" : { \"b\" : 1, \"c\": 3, \"d\": [{\"x\" : 1}, {\"y\" : 1}] }, \"e\": \"f\", \"g\":2.3 }";
- String json1 =
- " { \"name\" : \"adam\", \"age\": 30, \"country\": \"us\", \"address\": {\"number\" : 112, \"street\": \"main st\", \"country\": \"us\" } }";
- String json2 = " { \"name\" : \"adam\", \"age\": 30 }";
- String json3 = "{\n" + " \"name\" : \"adam\",\n" + " \"age\" : 30,\n" + " \"country\" : \"us\",\n"
- + " \"addresses\" : [{\n" + " \"number\" : 1,\n" + " \"street\" : \"main st\",\n"
- + " \"country\" : \"us\"\n" + " }, {\n" + " \"number\" : 2,\n" + " \"street\" : \"second st\",\n"
- + " \"country\" : \"us\"\n" + " }, {\n" + " \"number\" : 3,\n" + " \"street\" : \"third st\",\n"
- + " \"country\" : \"us\"\n" + " }]\n" + "}\n";
-
- String json4 =
- "{\n" + " \"year\": [\n" + " 2018\n" + " ],\n" + " \"customers\": [\n" + " {\n"
- + " \"name\": \"John\",\n" + " \"contact\": [\n" + " {\n"
- + " \"phone\": \"home\",\n" + " \"number\": \"333-3334\"\n"
- + " }\n" + " ]\n" + " },\n" + " {\n"
- + " \"name\": \"Jane\",\n" + " \"contact\": [\n" + " {\n"
- + " \"phone\": \"home\",\n" + " \"number\": \"555-5556\"\n"
- + " }\n" + " ],\n" + " \"surname\": \"Shaw\"\n" + " }\n"
- + " ]\n" + "}";
-
- String json5 = "{ \n" + " \"accounting\" : [ \n" + " { \"firstName\" : \"John\", \n"
- + " \"lastName\" : \"Doe\",\n" + " \"age\" : 23 },\n" + "\n"
- + " { \"firstName\" : \"Mary\", \n" + " \"lastName\" : \"Smith\",\n"
- + " \"age\" : 32 }\n" + " ], \n"
- + " \"sales\" : [ \n" + " { \"firstName\" : \"Sally\", \n"
- + " \"lastName\" : \"Green\",\n" + " \"age\" : 27 },\n"
- + "\n" + " { \"firstName\" : \"Jim\", \n"
- + " \"lastName\" : \"Galley\",\n" + " \"age\" : 41 }\n"
- + " ] \n" + "} ";
-
- String json = json3;
- System.out.println("json = " + json);
- JsonNode rawJsonNode = new ObjectMapper().readTree(json);
-
- System.out.println(
- "rawJsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(rawJsonNode));
- String flattenJson = JsonFlattener.flatten(json);
-
- System.out.println("flattenJson = " + flattenJson);
- JsonNode jsonNode = new ObjectMapper().readTree(flattenJson);
-
- System.out
- .println("jsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode));
- Map<String, Object> stringObjectMap = JsonFlattener.flattenAsMap(json);
- System.out.println("JsonFlattener.flattenAsMap(json) = " + stringObjectMap);
- NestedObjectIndexCreator creator = new NestedObjectIndexCreator(null, null, null);
- List<Map<String, String>> maps = creator.unnestJson(rawJsonNode);
- System.out.println("maps = " + maps.toString().replaceAll("},", "}\n"));
-// creator.add(json.getBytes());
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index a6817a0..ecf7e30 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -47,6 +47,7 @@ public class IndexLoadingConfig {
private ReadMode _readMode = ReadMode.DEFAULT_MODE;
private List<String> _sortedColumns = Collections.emptyList();
private Set<String> _invertedIndexColumns = new HashSet<>();
+ private Set<String> _jsonIndexColumns = new HashSet<>();
private Set<String> _textIndexColumns = new HashSet<>();
private Set<String> _rangeIndexColumns = new HashSet<>();
private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
@@ -92,6 +93,11 @@ public class IndexLoadingConfig {
_invertedIndexColumns.addAll(invertedIndexColumns);
}
+ List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
+ if (jsonIndexColumns != null) {
+ _jsonIndexColumns.addAll(jsonIndexColumns);
+ }
+
List<String> rangeIndexColumns = indexingConfig.getRangeIndexColumns();
if (rangeIndexColumns != null) {
_rangeIndexColumns.addAll(rangeIndexColumns);
@@ -222,6 +228,10 @@ public class IndexLoadingConfig {
return _invertedIndexColumns;
}
+ public Set<String> getJsonIndexColumns() {
+ return _invertedIndexColumns;
+ }
+
public Set<String> getRangeIndexColumns() {
return _rangeIndexColumns;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
index ca7f7e9..06a7adb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax
import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory;
import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import org.apache.pinot.core.segment.index.loader.invertedindex.JSONIndexHandler;
import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler;
import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
@@ -113,6 +114,11 @@ public class SegmentPreProcessor implements AutoCloseable {
new RangeIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
rangeIndexHandler.createRangeIndices();
+ // Create json range indices according to the index config.
+ JSONIndexHandler jsonIndexHandler =
+ new JSONIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
+ jsonIndexHandler.createJsonIndices();
+
Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns();
if (textIndexColumns.size() > 0) {
TextIndexHandler textIndexHandler =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java
new file mode 100644
index 0000000..90ca344
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.segment.creator.impl.inv.JSONIndexCreator;
+import org.apache.pinot.core.segment.index.column.PhysicalColumnIndexContainer;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class JSONIndexHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JSONIndexHandler.class);
+
+ private final File _indexDir;
+ private final SegmentDirectory.Writer _segmentWriter;
+ private final String _segmentName;
+ private final SegmentVersion _segmentVersion;
+ private final Set<ColumnMetadata> _jsonIndexColumns = new HashSet<>();
+
+ public JSONIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+ SegmentDirectory.Writer segmentWriter) {
+ _indexDir = indexDir;
+ _segmentWriter = segmentWriter;
+ _segmentName = segmentMetadata.getName();
+ _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+ // Only create json index on dictionary-encoded unsorted columns
+ for (String column : indexLoadingConfig.getJsonIndexColumns()) {
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+ if (columnMetadata != null && !columnMetadata.isSorted()) {
+ _jsonIndexColumns.add(columnMetadata);
+ }
+ }
+ }
+
+ public void createJsonIndices()
+ throws IOException {
+ for (ColumnMetadata columnMetadata : _jsonIndexColumns) {
+ createJSONIndexForColumn(columnMetadata);
+ }
+ }
+
+ private void createJSONIndexForColumn(ColumnMetadata columnMetadata)
+ throws IOException {
+ String column = columnMetadata.getColumnName();
+
+ File inProgress = new File(_indexDir, column + JSON_INDEX_FILE_EXTENSION + ".inprogress");
+ File jsonIndexFile = new File(_indexDir, column + JSON_INDEX_FILE_EXTENSION);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+
+ if (_segmentWriter.hasIndexFor(column, ColumnIndexType.JSON_INDEX)) {
+ // Skip creating json index if already exists.
+
+ LOGGER.info("Found json index for segment: {}, column: {}", _segmentName, column);
+ return;
+ }
+
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run gets interrupted.
+
+ // Remove json index if exists.
+ // For v1 and v2, it's the actual json index. For v3, it's the temporary json index.
+ FileUtils.deleteQuietly(jsonIndexFile);
+ }
+
+ // Create new json index for the column.
+ LOGGER.info("Creating new json index for segment: {}, column: {}", _segmentName, column);
+ if (columnMetadata.hasDictionary()) {
+ handleDictionaryBasedColumn(columnMetadata);
+ } else {
+ handleNonDictionaryBasedColumn(columnMetadata);
+ }
+
+ // For v3, write the generated json index file into the single file and remove it.
+ if (_segmentVersion == SegmentVersion.v3) {
+ LoaderUtils.writeIndexToV3Format(_segmentWriter, column, jsonIndexFile, ColumnIndexType.JSON_INDEX);
+ }
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created json index for segment: {}, column: {}", _segmentName, column);
+ }
+
+ private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ throws IOException {
+ int numDocs = columnMetadata.getTotalDocs();
+ try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+ ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+ Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter);
+ JSONIndexCreator jsonIndexCreator = new JSONIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+ FieldSpec.DataType.BYTES)) {
+ if (columnMetadata.isSingleValue()) {
+ // Single-value column
+ for (int i = 0; i < numDocs; i++) {
+ int dictId = forwardIndexReader.getDictId(i, readerContext);
+ jsonIndexCreator.add(dictionary.getBytesValue(dictId));
+ }
+ } else {
+ // Multi-value column
+ throw new IllegalStateException("JSON Indexing is not supported on multi-valued columns ");
+ }
+ jsonIndexCreator.seal();
+ }
+ }
+
+ private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ throws IOException {
+ FieldSpec.DataType dataType = columnMetadata.getDataType();
+ if(dataType != FieldSpec.DataType.BYTES || dataType != FieldSpec.DataType.STRING) {
+ throw new UnsupportedOperationException("JSON indexing is only supported for STRING/BYTES datatype but found: "+ dataType);
+ }
+ int numDocs = columnMetadata.getTotalDocs();
+ try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+ ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+ JSONIndexCreator jsonIndexCreator = new JSONIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+ dataType)) {
+ if (columnMetadata.isSingleValue()) {
+ // Single-value column.
+ switch (dataType) {
+ case STRING:
+ case BYTES:
+ for (int i = 0; i < numDocs; i++) {
+ jsonIndexCreator.add(forwardIndexReader.getBytes(i, readerContext));
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unsupported data type: " + dataType);
+ }
+ } else {
+ // Multi-value column
+ switch (dataType) {
+ default:
+ throw new IllegalStateException("JSON Indexing is not supported on multi-valued columns ");
+ }
+ }
+ jsonIndexCreator.seal();
+ }
+ }
+
+ private ForwardIndexReader<?> getForwardIndexReader(ColumnMetadata columnMetadata,
+ SegmentDirectory.Writer segmentWriter)
+ throws IOException {
+ PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX);
+ int numRows = columnMetadata.getTotalDocs();
+ int numBitsPerValue = columnMetadata.getBitsPerElement();
+ if (columnMetadata.isSingleValue()) {
+ return new FixedBitSVForwardIndexReader(buffer, numRows, numBitsPerValue);
+ } else {
+ return new FixedBitMVForwardIndexReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(),
+ numBitsPerValue);
+ }
+ }
+
+ private BaseImmutableDictionary getDictionaryReader(ColumnMetadata columnMetadata,
+ SegmentDirectory.Writer segmentWriter)
+ throws IOException {
+ PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
+ BaseImmutableDictionary dictionary = PhysicalColumnIndexContainer.loadDictionary(buffer, columnMetadata, false);
+ return dictionary;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java
new file mode 100644
index 0000000..efb6c8b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class JSONIndexReader implements Closeable {
+
+ private static int EXPECTED_VERSION = 1;
+ private static int DICT_HEADER_INDEX = 0;
+ private static int DICT_OFFSET_INDEX = 1;
+ private static int DICT_DATA_INDEX = 2;
+ private static int INV_OFFSET_INDEX = 3;
+ private static int INV_DATA_INDEX = 4;
+ private static int FLATTENED_2_ROOT_INDEX = 5;
+
+ private final BitmapInvertedIndexReader invertedIndexReader;
+ private final StringDictionary dictionary;
+ private final long cardinality;
+ private final long numFlattenedDocs;
+
+ public JSONIndexReader(PinotDataBuffer pinotDataBuffer) {
+
+ int version = pinotDataBuffer.getInt(0);
+ int maxKeyLength = pinotDataBuffer.getInt(1 * Integer.BYTES);
+
+ Preconditions.checkState(version == EXPECTED_VERSION, String
+ .format("Index version:{} is not supported by this reader. expected version:{}", version, EXPECTED_VERSION));
+
+ // dictionaryHeaderFile, dictionaryOffsetFile, dictionaryFile, invIndexOffsetFile, invIndexFile, FlattenedDocId2DocIdMappingFile
+ int numBuffers = 6;
+ long bufferStartOffsets[] = new long[numBuffers];
+ long bufferSizeArray[] = new long[numBuffers];
+ for (int i = 0; i < numBuffers; i++) {
+ bufferStartOffsets[i] = pinotDataBuffer.getLong(2 * Integer.BYTES + 2 * i * Long.BYTES);
+ bufferSizeArray[i] = pinotDataBuffer.getLong(2 * Integer.BYTES + 2 * i * Long.BYTES + Long.BYTES);
+ }
+ cardinality = bufferSizeArray[DICT_OFFSET_INDEX] / Integer.BYTES - 1;
+ numFlattenedDocs = bufferSizeArray[FLATTENED_2_ROOT_INDEX] / Integer.BYTES;
+
+ long dictionaryStartOffset = bufferStartOffsets[DICT_HEADER_INDEX];
+ long dictionarySize =
+ bufferSizeArray[DICT_HEADER_INDEX] + bufferSizeArray[DICT_OFFSET_INDEX] + bufferSizeArray[DICT_DATA_INDEX];
+
+ //TODO: REMOVE DEBUG START
+ byte[] dictHeaderBytes = new byte[(int) bufferSizeArray[DICT_HEADER_INDEX]];
+ pinotDataBuffer.copyTo(bufferStartOffsets[DICT_HEADER_INDEX], dictHeaderBytes);
+ System.out.println("Arrays.toString(dictHeaderBytes) = " + Arrays.toString(dictHeaderBytes));
+ //TODO: REMOVE DEBUG END
+
+ PinotDataBuffer dictionaryBuffer =
+ pinotDataBuffer.view(dictionaryStartOffset, dictionaryStartOffset + dictionarySize);
+ dictionary = new StringDictionary(dictionaryBuffer, (int) cardinality, maxKeyLength, Byte.valueOf("0"));
+
+ long invIndexStartOffset = bufferStartOffsets[INV_OFFSET_INDEX];
+ long invIndexSize = bufferSizeArray[INV_OFFSET_INDEX] + bufferSizeArray[INV_DATA_INDEX];
+
+ PinotDataBuffer invIndexBuffer = pinotDataBuffer.view(invIndexStartOffset, invIndexStartOffset + invIndexSize);
+ invertedIndexReader = new BitmapInvertedIndexReader(invIndexBuffer, (int) cardinality);
+
+ //TODO: REMOVE DEBUG START
+ for (int dictId = 0; dictId < dictionary.length(); dictId++) {
+ System.out.println("Key = " + new String(dictionary.getBytes(dictId)));
+ System.out.println("Posting List = " + invertedIndexReader.getDocIds(dictId));
+ }
+ //TODO: REMOVE DEBUG END
+
+ }
+
+ /**
+ * Returns the matching document ids for the given search query.
+ */
+ public MutableRoaringBitmap getMatchingDocIds(Predicate predicate) {
+
+ PredicateEvaluator predicateEvaluator =
+ PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dictionary, FieldSpec.DataType.BYTES);
+ boolean exclusive = predicateEvaluator.isExclusive();
+ int[] dictIds = exclusive ? predicateEvaluator.getNonMatchingDictIds() : predicateEvaluator.getMatchingDictIds();
+ int numDictIds = dictIds.length;
+
+ if (numDictIds == 1) {
+ ImmutableRoaringBitmap docIds = (ImmutableRoaringBitmap) invertedIndexReader.getDocIds(dictIds[0]);
+ if (exclusive) {
+ if (docIds instanceof MutableRoaringBitmap) {
+ MutableRoaringBitmap mutableRoaringBitmap = (MutableRoaringBitmap) docIds;
+ mutableRoaringBitmap.flip(0L, numFlattenedDocs);
+ return mutableRoaringBitmap;
+ } else {
+ return ImmutableRoaringBitmap.flip(docIds, 0L, numFlattenedDocs);
+ }
+ } else {
+ return docIds.toMutableRoaringBitmap();
+ }
+ } else {
+ ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[numDictIds];
+ for (int i = 0; i < numDictIds; i++) {
+ bitmaps[i] = (ImmutableRoaringBitmap) invertedIndexReader.getDocIds(dictIds[i]);
+ }
+ MutableRoaringBitmap docIds = ImmutableRoaringBitmap.or(bitmaps);
+ if (exclusive) {
+ docIds.flip(0L, numFlattenedDocs);
+ }
+ return docIds;
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
index dcd21df..f9063b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
@@ -25,7 +25,8 @@ public enum ColumnIndexType {
BLOOM_FILTER("bloom_filter"),
NULLVALUE_VECTOR("nullvalue_vector"),
TEXT_INDEX("text_index"),
- RANGE_INDEX("range_index");
+ RANGE_INDEX("range_index"),
+ JSON_INDEX("json_index");
private final String indexName;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
index 51dd2fb..4ffc133 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -26,6 +26,7 @@ import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -135,6 +136,10 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
case TEXT_INDEX:
filename = column + LuceneTextIndexCreator.LUCENE_TEXT_INDEX_FILE_EXTENSION;
break;
+ case JSON_INDEX:
+ filename = column + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION;
+ break;
+
default:
throw new UnsupportedOperationException("Unknown index type: " + indexType.toString());
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 3dd137b..7965654 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -27,6 +27,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
public class IndexingConfig extends BaseJsonConfig {
private List<String> _invertedIndexColumns;
private List<String> _rangeIndexColumns;
+ private List<String> _jsonIndexColumns;
private boolean _autoGeneratedInvertedIndex;
private boolean _createInvertedIndexDuringSegmentGeneration;
private List<String> _sortedColumn;
@@ -71,6 +72,14 @@ public class IndexingConfig extends BaseJsonConfig {
_rangeIndexColumns = rangeIndexColumns;
}
+ public List<String> getJsonIndexColumns() {
+ return _jsonIndexColumns;
+ }
+
+ public void setJsonIndexColumns(List<String> jsonIndexColumns) {
+ _jsonIndexColumns = jsonIndexColumns;
+ }
+
public boolean isAutoGeneratedInvertedIndex() {
return _autoGeneratedInvertedIndex;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index b04ecb7..c9c245b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -337,7 +337,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec> {
public enum DataType {
// LIST is for complex lists which is different from multi-value column of primitives
// STRUCT, MAP and LIST are composable to form a COMPLEX field
- INT, LONG, FLOAT, DOUBLE, BOOLEAN/* Stored as STRING */, STRING, BYTES, STRUCT, MAP, LIST;
+ INT, LONG, FLOAT, DOUBLE, BOOLEAN/* Stored as STRING */, STRING, BYTES, STRUCT, MAP, LIST, JSON;
/**
* Returns the data type stored in Pinot.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org