You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/10/28 20:00:00 UTC

[incubator-pinot] branch json-indexing created (now bdfd3f1)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a change to branch json-indexing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at bdfd3f1  Adding index creator and reader

This branch includes the following new commits:

     new fd32a3e  Initial commit
     new bdfd3f1  Adding index creator and reader

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 02/02: Adding index creator and reader

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch json-indexing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit bdfd3f105d170f3dc02f0a25feb3b922fe3ffc6c
Author: kishoreg <g....@gmail.com>
AuthorDate: Sun Oct 18 22:05:51 2020 -0700

    Adding index creator and reader
---
 .../io/util/VarLengthBytesValueReaderWriter.java   |  28 +-
 .../operator/filter/BitmapBasedFilterOperator.java |   4 +-
 .../core/operator/filter/FilterOperatorUtils.java  |   2 +-
 .../operator/filter/JSONMatchFilterOperator.java   | 138 +++++
 .../transform/function/CastTransformFunction.java  |   2 +-
 .../{Predicate.java => JSONMatchPredicate.java}    |  57 +-
 .../query/request/context/predicate/Predicate.java |   3 +-
 .../core/segment/creator/impl/V1Constants.java     |   1 +
 .../segment/creator/impl/inv/JSONIndexCreator.java | 656 +++++++++++++++++++++
 .../creator/impl/inv/NestedObjectIndexCreator.java | 158 -----
 .../segment/index/loader/IndexLoadingConfig.java   |  10 +
 .../segment/index/loader/SegmentPreProcessor.java  |   6 +
 .../loader/invertedindex/JSONIndexHandler.java     | 205 +++++++
 .../segment/index/readers/JSONIndexReader.java     | 147 +++++
 .../pinot/core/segment/store/ColumnIndexType.java  |   3 +-
 .../core/segment/store/FilePerIndexDirectory.java  |   5 +
 .../pinot/spi/config/table/IndexingConfig.java     |   9 +
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |   2 +-
 18 files changed, 1249 insertions(+), 187 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
index 5a6a25a..88f56f6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.core.io.util;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Arrays;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -63,19 +66,19 @@ public class VarLengthBytesValueReaderWriter implements ValueReader {
   /**
    * Magic bytes used to identify the dictionary files written in variable length bytes format.
    */
-  private static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;");
+  public static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;");
 
   /**
    * Increment this version if there are any structural changes in the store format and
    * deal with backward compatibility correctly based on old versions.
    */
-  private static final int VERSION = 1;
+  public static final int VERSION = 1;
 
   // Offsets of different fields in the header. Having as constants for readability.
-  private static final int VERSION_OFFSET = MAGIC_BYTES.length;
-  private static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES;
-  private static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES;
-  private static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES;
+  public static final int VERSION_OFFSET = MAGIC_BYTES.length;
+  public static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES;
+  public static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES;
+  public static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES;
 
   private final PinotDataBuffer _dataBuffer;
 
@@ -144,6 +147,19 @@ public class VarLengthBytesValueReaderWriter implements ValueReader {
     return false;
   }
 
+  public static byte[] getHeaderBytes(int numElements)
+      throws IOException {
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream(HEADER_LENGTH);
+    DataOutputStream dos = new DataOutputStream(out);
+    dos.write(MAGIC_BYTES);
+    dos.writeInt(VERSION);
+    dos.writeInt(numElements);
+    dos.writeInt(HEADER_LENGTH);
+    dos.close();
+    return out.toByteArray();
+  }
+
   private void writeHeader() {
     for (int offset = 0; offset < MAGIC_BYTES.length; offset++) {
       _dataBuffer.putByte(offset, MAGIC_BYTES[offset]);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
index d9c25e1..9c307a3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
@@ -38,9 +38,9 @@ public class BitmapBasedFilterOperator extends BaseFilterOperator {
   private final boolean _exclusive;
   private final int _numDocs;
 
-  BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs) {
+  public BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, InvertedIndexReader invertedIndexReader, int numDocs) {
     _predicateEvaluator = predicateEvaluator;
-    _invertedIndexReader = dataSource.getInvertedIndex();
+    _invertedIndexReader = invertedIndexReader;
     _docIds = null;
     _exclusive = predicateEvaluator.isExclusive();
     _numDocs = numDocs;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
index 3bc676e..5076ef6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
@@ -65,7 +65,7 @@ public class FilterOperatorUtils {
         return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
       }
       if (dataSource.getInvertedIndex() != null) {
-        return new BitmapBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
+        return new BitmapBasedFilterOperator(predicateEvaluator, dataSource.getInvertedIndex(), numDocs);
       }
       return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java
new file mode 100644
index 0000000..adb0550
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.EqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.InPredicate;
+import org.apache.pinot.core.query.request.context.predicate.NotEqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.NotInPredicate;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.segment.creator.impl.inv.JSONIndexCreator;
+import org.apache.pinot.core.segment.index.readers.JSONIndexReader;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+@SuppressWarnings("rawtypes")
+public class JSONMatchFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "JSONMatchFilterOperator";
+
+  private final JSONIndexReader _nestedObjectIndexReader;
+  private final int _numDocs;
+  private String _column;
+  private final FilterContext _filterContext;
+
+  public JSONMatchFilterOperator(String column, FilterContext filterContext, JSONIndexReader nestedObjectIndexReader,
+      int numDocs) {
+    _column = column;
+    _filterContext = filterContext;
+    _nestedObjectIndexReader = nestedObjectIndexReader;
+    _numDocs = numDocs;
+  }
+
+  @Override
+  protected FilterBlock getNextBlock() {
+    ImmutableRoaringBitmap docIds = process(_filterContext);
+    System.out.println("docIds = " + docIds);
+    return new FilterBlock(new BitmapDocIdSet(docIds, _numDocs));
+  }
+
+  private MutableRoaringBitmap process(FilterContext filterContext) {
+    List<FilterContext> children = _filterContext.getChildren();
+    MutableRoaringBitmap resultBitmap = null;
+
+    switch (filterContext.getType()) {
+      case AND:
+        for (FilterContext child : children) {
+          if (resultBitmap == null) {
+            resultBitmap = process(child);
+          } else {
+            resultBitmap.and(process(child));
+          }
+        }
+        break;
+      case OR:
+        for (FilterContext child : children) {
+          if (resultBitmap == null) {
+            resultBitmap = process(child);
+          } else {
+            resultBitmap.or(process(child));
+          }
+        }
+        break;
+      case PREDICATE:
+        Predicate predicate = filterContext.getPredicate();
+        Predicate newPredicate = null;
+        switch (predicate.getType()) {
+
+          case EQ:
+            EqPredicate eqPredicate = (EqPredicate) predicate;
+            newPredicate = new EqPredicate(ExpressionContext.forIdentifier(_column),
+                eqPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR + eqPredicate
+                    .getValue());
+            break;
+          case NOT_EQ:
+            NotEqPredicate nEqPredicate = (NotEqPredicate) predicate;
+            newPredicate = new NotEqPredicate(ExpressionContext.forIdentifier(_column),
+                nEqPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR
+                    + nEqPredicate.getValue());
+            break;
+          case IN:
+            InPredicate inPredicate = (InPredicate) predicate;
+            List<String> newInValues = inPredicate.getValues().stream().map(
+                value -> inPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR
+                    + value).collect(Collectors.toList());
+            newPredicate = new InPredicate(ExpressionContext.forIdentifier(_column), newInValues);
+            break;
+          case NOT_IN:
+            NotInPredicate notInPredicate = (NotInPredicate) predicate;
+            List<String> newNotInValues = inPredicate.getValues().stream().map(
+                value -> notInPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR
+                    + value).collect(Collectors.toList());
+            newPredicate = new InPredicate(ExpressionContext.forIdentifier(_column), newNotInValues);
+            break;
+          case IS_NULL:
+            newPredicate = predicate;
+            break;
+          case IS_NOT_NULL:
+            newPredicate = predicate;
+            break;
+          case RANGE:
+          case REGEXP_LIKE:
+          case TEXT_MATCH:
+            throw new UnsupportedOperationException("JSON Match does not support RANGE, REGEXP or TEXTMATCH");
+        }
+
+        resultBitmap = _nestedObjectIndexReader.getMatchingDocIds(newPredicate);
+        break;
+    }
+    return resultBitmap;
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java
index c826446..cb6227f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java
@@ -30,7 +30,7 @@ import org.apache.pinot.core.operator.transform.transformer.datetime.SDFToSDFTra
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 
 
-public class CastTransformFunction extends BaseTransformFunction {
+  public class CastTransformFunction extends BaseTransformFunction {
   public static final String FUNCTION_NAME = "cast";
 
   private TransformFunction _transformFunction;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java
index 203e950..97ad787 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java
@@ -18,30 +18,55 @@
  */
 package org.apache.pinot.core.query.request.context.predicate;
 
+import java.util.Objects;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 
 
 /**
- * The {@code Predicate} class represents the predicate in the filter.
- * <p>Currently the query engine only accepts string literals as the right-hand side of the predicate, so we store the
- * right-hand side of the predicate as string or list of strings.
+ * Predicate for JSON_MATCH.
  */
-public interface Predicate {
-  enum Type {
-    EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL;
+public class JSONMatchPredicate implements Predicate {
+  private final ExpressionContext _lhs;
+  private final String _value;
 
-    public boolean isExclusive() {
-      return this == NOT_EQ || this == NOT_IN || this == IS_NOT_NULL;
+  public JSONMatchPredicate(ExpressionContext lhs, String value) {
+    _lhs = lhs;
+    _value = value;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.JSON_MATCH;
+  }
+
+  @Override
+  public ExpressionContext getLhs() {
+    return _lhs;
+  }
+
+  public String getValue() {
+    return _value;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof JSONMatchPredicate)) {
+      return false;
     }
+    JSONMatchPredicate that = (JSONMatchPredicate) o;
+    return Objects.equals(_lhs, that._lhs) && Objects.equals(_value, that._value);
   }
 
-  /**
-   * Returns the type of the predicate.
-   */
-  Type getType();
+  @Override
+  public int hashCode() {
+    return Objects.hash(_lhs, _value);
+  }
 
-  /**
-   * Returns the left-hand side expression of the predicate.
-   */
-  ExpressionContext getLhs();
+  @Override
+  public String toString() {
+    return "json_match(" + _lhs + ",'" + _value + "')";
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
index 203e950..a204fc6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
@@ -28,7 +28,7 @@ import org.apache.pinot.core.query.request.context.ExpressionContext;
  */
 public interface Predicate {
   enum Type {
-    EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL;
+    EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL, JSON_MATCH;
 
     public boolean isExclusive() {
       return this == NOT_EQ || this == NOT_IN || this == IS_NOT_NULL;
@@ -44,4 +44,5 @@ public interface Predicate {
    * Returns the left-hand side expression of the predicate.
    */
   ExpressionContext getLhs();
+
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 0498f8c..e745120 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -36,6 +36,7 @@ public class V1Constants {
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
+    public static final String JSON_INDEX_FILE_EXTENSION = ".json.idx";
     public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
     public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
     public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java
new file mode 100644
index 0000000..705f690
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java
@@ -0,0 +1,656 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.wnameless.json.flattener.JsonFlattener;
+import com.google.common.io.Files;
+import com.google.common.primitives.UnsignedBytes;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.filter.JSONMatchFilterOperator;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.EqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.segment.index.readers.JSONIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class JSONIndexCreator implements Closeable {
+
+  //separator used to join the key and value to create posting list key
+  public static String POSTING_LIST_KEY_SEPARATOR = "|";
+  static int FLUSH_THRESHOLD = 50_000;
+  static int VERSION = 1;
+  private final File flattenedDocId2RootDocIdMappingFile;
+  private final File postingListFile;
+  private File dictionaryheaderFile;
+  private File dictionaryOffsetFile;
+  private File dictionaryFile;
+  private File invertedIndexOffsetFile;
+  private File invertedIndexFile;
+  private File outputIndexFile;
+
+  private int docId = 0;
+  private int numFlatennedDocId = 0;
+  int chunkId = 0;
+
+  private DataOutputStream postingListWriter;
+  private DataOutputStream flattenedDocId2RootDocIdWriter;
+
+  Map<String, List<Integer>> postingListMap = new TreeMap<>();
+  List<Integer> flattenedDocIdList = new ArrayList<>();
+  List<Integer> postingListChunkOffsets = new ArrayList<>();
+  List<Integer> chunkLengths = new ArrayList<>();
+  private FieldSpec fieldSpec;
+
+  public JSONIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType)
+      throws IOException {
+    this.fieldSpec = fieldSpec;
+    System.out.println("indexDir = " + indexDir);
+
+    String name = fieldSpec.getName();
+    postingListFile = new File(indexDir + name + "_postingList.buf");
+    postingListWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(postingListFile)));
+    postingListChunkOffsets.add(postingListWriter.size());
+
+    dictionaryheaderFile = new File(indexDir, name + "_dictionaryHeader.buf");
+    dictionaryOffsetFile = new File(indexDir, name + "_dictionaryOffset.buf");
+    dictionaryFile = new File(indexDir, name + "_dictionary.buf");
+    invertedIndexOffsetFile = new File(indexDir, name + "_invertedIndexOffset.buf");
+    invertedIndexFile = new File(indexDir, name + "_invertedIndex.buf");
+    flattenedDocId2RootDocIdMappingFile = new File(indexDir, name + "_flattenedDocId.buf");
+    flattenedDocId2RootDocIdWriter =
+        new DataOutputStream(new BufferedOutputStream(new FileOutputStream(flattenedDocId2RootDocIdMappingFile)));
+
+    //output file
+    outputIndexFile = new File(indexDir, name + ".nested.idx");
+  }
+
+  public void add(byte[] data)
+      throws IOException {
+
+    JsonNode jsonNode = new ObjectMapper().readTree(data);
+    List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
+    for (Map<String, String> map : flattenedMapList) {
+      //
+      for (Map.Entry<String, String> entry : map.entrySet()) {
+        //handle key posting list
+        String key = entry.getKey();
+
+        List<Integer> keyPostingList = postingListMap.get(key);
+        if (keyPostingList == null) {
+          keyPostingList = new ArrayList<>();
+          postingListMap.put(key, keyPostingList);
+        }
+        keyPostingList.add(numFlatennedDocId);
+
+        //handle keyvalue posting list
+        String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue();
+        List<Integer> keyValuePostingList = postingListMap.get(keyValue);
+        if (keyValuePostingList == null) {
+          keyValuePostingList = new ArrayList<>();
+          postingListMap.put(keyValue, keyValuePostingList);
+        }
+        keyValuePostingList.add(numFlatennedDocId);
+      }
+      //flattenedDocId2RootDocIdMapping
+      flattenedDocIdList.add(numFlatennedDocId);
+
+      numFlatennedDocId++;
+    }
+    docId++;
+
+    //flush data
+    if (docId % FLUSH_THRESHOLD == 0) {
+      flush();
+    }
+  }
+
+  /**
+   * Multi value
+   * @param dataArray
+   * @param length
+   * @throws IOException
+   */
+  public void add(byte[][] dataArray, int length)
+      throws IOException {
+
+    for (int i = 0; i < length; i++) {
+      byte[] data = dataArray[i];
+      JsonNode jsonNode = new ObjectMapper().readTree(data);
+      List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
+      for (Map<String, String> map : flattenedMapList) {
+        //
+        for (Map.Entry<String, String> entry : map.entrySet()) {
+          //handle key posting list
+          String key = entry.getKey();
+
+          List<Integer> keyPostingList = postingListMap.get(key);
+          if (keyPostingList == null) {
+            keyPostingList = new ArrayList<>();
+            postingListMap.put(key, keyPostingList);
+          }
+          keyPostingList.add(numFlatennedDocId);
+
+          //handle keyvalue posting list
+          String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue();
+          List<Integer> keyValuePostingList = postingListMap.get(keyValue);
+          if (keyValuePostingList == null) {
+            keyValuePostingList = new ArrayList<>();
+            postingListMap.put(keyValue, keyValuePostingList);
+          }
+          keyValuePostingList.add(numFlatennedDocId);
+        }
+        //flattenedDocId2RootDocIdMapping
+        flattenedDocIdList.add(numFlatennedDocId);
+
+        numFlatennedDocId++;
+      }
+    }
+    docId++;
+
+    //flush data
+    if (docId % FLUSH_THRESHOLD == 0) {
+      flush();
+    }
+  }
+
+  public void seal()
+      throws IOException {
+
+    flush();
+
+    flattenedDocId2RootDocIdWriter.close();
+    postingListWriter.close();
+
+    //key posting list merging
+    System.out.println("InvertedIndex");
+    System.out.println("=================");
+
+    int maxKeyLength = createInvertedIndex(postingListFile, postingListChunkOffsets, chunkLengths);
+    System.out.println("=================");
+
+    int flattenedDocid = 0;
+    DataInputStream flattenedDocId2RootDocIdReader =
+        new DataInputStream(new BufferedInputStream(new FileInputStream(flattenedDocId2RootDocIdMappingFile)));
+    int[] rootDocIdArray = new int[numFlatennedDocId];
+    while (flattenedDocid < numFlatennedDocId) {
+      rootDocIdArray[flattenedDocid++] = flattenedDocId2RootDocIdReader.readInt();
+    }
+    System.out.println("FlattenedDocId  to RootDocId Mapping = ");
+    System.out.println(Arrays.toString(rootDocIdArray));
+
+    //PUT all contents into one file
+
+    //header
+    // version + maxDictionaryLength + [store the offsets + length for each one (dictionary offset file, dictionaryFile, index offset file, index file, flattened docId to rootDocId file)]
+    long headerSize = 2 * Integer.BYTES + 6 * 2 * Long.BYTES;
+
+    long dataSize =
+        dictionaryheaderFile.length() + dictionaryOffsetFile.length() + dictionaryFile.length() + invertedIndexFile
+            .length() + invertedIndexOffsetFile.length() + flattenedDocId2RootDocIdMappingFile.length();
+
+    long totalSize = headerSize + dataSize;
+    PinotDataBuffer pinotDataBuffer =
+        PinotDataBuffer.mapFile(outputIndexFile, false, 0, totalSize, ByteOrder.BIG_ENDIAN, "Nested inverted index");
+
+    pinotDataBuffer.putInt(0, VERSION);
+    pinotDataBuffer.putInt(1 * Integer.BYTES, maxKeyLength);
+    long writtenBytes = headerSize;
+
+    //add dictionary header
+    int bufferId = 0;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryheaderFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, dictionaryheaderFile, 0, dictionaryheaderFile.length());
+    writtenBytes += dictionaryheaderFile.length();
+
+    //add dictionary offset
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryOffsetFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, dictionaryOffsetFile, 0, dictionaryOffsetFile.length());
+    writtenBytes += dictionaryOffsetFile.length();
+
+    //add dictionary
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, dictionaryFile, 0, dictionaryFile.length());
+    writtenBytes += dictionaryFile.length();
+
+    //add index offset
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, invertedIndexOffsetFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, invertedIndexOffsetFile, 0, invertedIndexOffsetFile.length());
+    writtenBytes += invertedIndexOffsetFile.length();
+
+    //add index data
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, invertedIndexFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, invertedIndexFile, 0, invertedIndexFile.length());
+    writtenBytes += invertedIndexFile.length();
+
+    //add flattened docid to root doc id mapping
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, flattenedDocId2RootDocIdMappingFile.length());
+    pinotDataBuffer
+        .readFrom(writtenBytes, flattenedDocId2RootDocIdMappingFile, 0, flattenedDocId2RootDocIdMappingFile.length());
+    writtenBytes += flattenedDocId2RootDocIdMappingFile.length();
+  }
+
+  private long getBufferStartOffset(int bufferId) {
+    return 2 * Integer.BYTES + 2 * bufferId * Long.BYTES;
+  }
+
+  private int createInvertedIndex(File postingListFile, List<Integer> postingListChunkOffsets,
+      List<Integer> chunkLengthList)
+      throws IOException {
+
+    List<Iterator<ImmutablePair<byte[], int[]>>> chunkIterators = new ArrayList<>();
+
+    for (int i = 0; i < chunkLengthList.size(); i++) {
+
+      final DataInputStream postingListFileReader =
+          new DataInputStream(new BufferedInputStream(new FileInputStream(postingListFile)));
+      postingListFileReader.skipBytes(postingListChunkOffsets.get(i));
+      final int length = chunkLengthList.get(i);
+      chunkIterators.add(new Iterator<ImmutablePair<byte[], int[]>>() {
+        int index = 0;
+
+        @Override
+        public boolean hasNext() {
+          return index < length;
+        }
+
+        @Override
+        public ImmutablePair<byte[], int[]> next() {
+          try {
+            int keyLength = postingListFileReader.readInt();
+            byte[] keyBytes = new byte[keyLength];
+            postingListFileReader.read(keyBytes);
+
+            int postingListLength = postingListFileReader.readInt();
+            int[] postingList = new int[postingListLength];
+            for (int i = 0; i < postingListLength; i++) {
+              postingList[i] = postingListFileReader.readInt();
+            }
+            index++;
+            return ImmutablePair.of(keyBytes, postingList);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+    final Comparator<byte[]> byteArrayComparator = UnsignedBytes.lexicographicalComparator();
+
+    PriorityQueue<ImmutablePair<Integer, ImmutablePair<byte[], int[]>>> queue =
+        new PriorityQueue<>(chunkLengthList.size(),
+            (o1, o2) -> byteArrayComparator.compare(o1.getRight().getLeft(), o2.getRight().getLeft()));
+    for (int i = 0; i < chunkIterators.size(); i++) {
+      Iterator<ImmutablePair<byte[], int[]>> iterator = chunkIterators.get(i);
+      if (iterator.hasNext()) {
+        queue.offer(ImmutablePair.of(i, iterator.next()));
+      }
+    }
+    byte[] prevKey = null;
+    RoaringBitmap roaringBitmap = new RoaringBitmap();
+
+    Writer writer = new Writer(dictionaryheaderFile, dictionaryOffsetFile, dictionaryFile, invertedIndexOffsetFile,
+        invertedIndexFile);
+    while (!queue.isEmpty()) {
+      ImmutablePair<Integer, ImmutablePair<byte[], int[]>> poll = queue.poll();
+      byte[] currKey = poll.getRight().getLeft();
+      if (prevKey != null && byteArrayComparator.compare(prevKey, currKey) != 0) {
+        System.out.println(new String(prevKey) + ":" + roaringBitmap);
+        writer.add(prevKey, roaringBitmap);
+        roaringBitmap.clear();
+      }
+
+      roaringBitmap.add(poll.getRight().getRight());
+      prevKey = currKey;
+
+      //add the next key from the chunk where the currKey was removed from
+      Iterator<ImmutablePair<byte[], int[]>> iterator = chunkIterators.get(poll.getLeft());
+      if (iterator.hasNext()) {
+        queue.offer(ImmutablePair.of(poll.getLeft(), iterator.next()));
+      }
+    }
+
+    if (prevKey != null) {
+      writer.add(prevKey, roaringBitmap);
+    }
+    writer.finish();
+    return writer.getMaxDictionaryValueLength();
+  }
+
+  private void flush()
+      throws IOException {
+    //write the key (length|actual bytes) - posting list(length, flattenedDocIds)
+    System.out.println("postingListMap = " + postingListMap);
+    for (Map.Entry<String, List<Integer>> entry : postingListMap.entrySet()) {
+      byte[] keyBytes = entry.getKey().getBytes(Charset.forName("UTF-8"));
+      postingListWriter.writeInt(keyBytes.length);
+      postingListWriter.write(keyBytes);
+      List<Integer> flattenedDocIdList = entry.getValue();
+      postingListWriter.writeInt(flattenedDocIdList.size());
+      for (int flattenedDocId : flattenedDocIdList) {
+        postingListWriter.writeInt(flattenedDocId);
+      }
+    }
+
+    //write flattened doc id to root docId mapping
+    for (int rootDocId : flattenedDocIdList) {
+      flattenedDocId2RootDocIdWriter.writeInt(rootDocId);
+    }
+    chunkLengths.add(postingListMap.size());
+    postingListChunkOffsets.add(postingListWriter.size());
+    postingListMap.clear();
+    flattenedDocIdList.clear();
+  }
+
+  private static List<Map<String, String>> unnestJson(JsonNode root) {
+    Iterator<Map.Entry<String, JsonNode>> fields = root.fields();
+    Map<String, String> flattenedSingleValuesMap = new TreeMap<>();
+    Map<String, JsonNode> arrNodes = new TreeMap<>();
+    Map<String, JsonNode> objectNodes = new TreeMap<>();
+    List<Map<String, String>> resultList = new ArrayList<>();
+    List<Map<String, String>> tempResultList = new ArrayList<>();
+    while (fields.hasNext()) {
+      Map.Entry<String, JsonNode> child = fields.next();
+      if (child.getValue().isValueNode()) {
+        //Normal value node
+        flattenedSingleValuesMap.put(child.getKey(), child.getValue().asText());
+      } else if (child.getValue().isArray()) {
+        //Array Node: Process these nodes later
+        arrNodes.put(child.getKey(), child.getValue());
+      } else {
+        //Object Node
+        objectNodes.put(child.getKey(), child.getValue());
+      }
+    }
+    for (String objectNodeKey : objectNodes.keySet()) {
+      JsonNode objectNode = objectNodes.get(objectNodeKey);
+      modifyKeysInMap(flattenedSingleValuesMap, tempResultList, objectNodeKey, objectNode);
+    }
+    if (tempResultList.isEmpty()) {
+      tempResultList.add(flattenedSingleValuesMap);
+    }
+    if (!arrNodes.isEmpty()) {
+      for (Map<String, String> flattenedMapElement : tempResultList) {
+        for (String arrNodeKey : arrNodes.keySet()) {
+          JsonNode arrNode = arrNodes.get(arrNodeKey);
+          for (JsonNode arrNodeElement : arrNode) {
+            modifyKeysInMap(flattenedMapElement, resultList, arrNodeKey, arrNodeElement);
+          }
+        }
+      }
+    } else {
+      resultList.addAll(tempResultList);
+    }
+    return resultList;
+  }
+
+  private static void modifyKeysInMap(Map<String, String> flattenedMap, List<Map<String, String>> resultList,
+      String arrNodeKey, JsonNode arrNode) {
+    List<Map<String, String>> objectResult = unnestJson(arrNode);
+    for (Map<String, String> flattenedObject : objectResult) {
+      Map<String, String> flattenedObjectCopy = new TreeMap<>(flattenedMap);
+      for (Map.Entry<String, String> entry : flattenedObject.entrySet()) {
+        flattenedObjectCopy.put(arrNodeKey + "." + entry.getKey(), entry.getValue());
+      }
+      resultList.add(flattenedObjectCopy);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+
+  }
+
+  private class Writer {
+    private DataOutputStream _dictionaryHeaderWriter;
+    private DataOutputStream _dictionaryOffsetWriter;
+    private File _dictionaryOffsetFile;
+    private DataOutputStream _dictionaryWriter;
+    private DataOutputStream _invertedIndexOffsetWriter;
+    private File _invertedIndexOffsetFile;
+    private DataOutputStream _invertedIndexWriter;
+    private int _dictId;
+    private int _dictOffset;
+    private int _invertedIndexOffset;
+    int _maxDictionaryValueLength = Integer.MIN_VALUE;
+
+    public Writer(File dictionaryheaderFile, File dictionaryOffsetFile, File dictionaryFile,
+        File invertedIndexOffsetFile, File invertedIndexFile)
+        throws IOException {
+      _dictionaryHeaderWriter =
+          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryheaderFile)));
+
+      _dictionaryOffsetWriter =
+          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryOffsetFile)));
+      _dictionaryOffsetFile = dictionaryOffsetFile;
+      _dictionaryWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryFile)));
+      _invertedIndexOffsetWriter =
+          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(invertedIndexOffsetFile)));
+      _invertedIndexOffsetFile = invertedIndexOffsetFile;
+      _invertedIndexWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(invertedIndexFile)));
+      _dictId = 0;
+      _dictOffset = 0;
+      _invertedIndexOffset = 0;
+    }
+
+    public void add(byte[] key, RoaringBitmap roaringBitmap)
+        throws IOException {
+      if (key.length > _maxDictionaryValueLength) {
+        _maxDictionaryValueLength = key.length;
+      }
+      //write the key to dictionary
+      _dictionaryOffsetWriter.writeInt(_dictOffset);
+      _dictionaryWriter.write(key);
+
+      //write the roaringBitmap to inverted index
+      _invertedIndexOffsetWriter.writeInt(_invertedIndexOffset);
+
+      int serializedSizeInBytes = roaringBitmap.serializedSizeInBytes();
+      byte[] serializedRoaringBitmap = new byte[serializedSizeInBytes];
+      ByteBuffer serializedRoaringBitmapBuffer = ByteBuffer.wrap(serializedRoaringBitmap);
+      roaringBitmap.serialize(serializedRoaringBitmapBuffer);
+      _invertedIndexWriter.write(serializedRoaringBitmap);
+      System.out.println(
+          "dictId = " + _dictId + ", dict offset:" + _dictOffset + ", valueLength:" + key.length + ", inv offset:"
+              + _invertedIndexOffset + ", serializedSizeInBytes:" + serializedSizeInBytes);
+
+      //increment offsets
+      _dictOffset = _dictOffset + key.length;
+      _invertedIndexOffset = _invertedIndexOffset + serializedSizeInBytes;
+      //increment the dictionary id
+      _dictId = _dictId + 1;
+    }
+
+    void finish()
+        throws IOException {
+      //InvertedIndexReader and VarlengthBytesValueReaderWriter needs one extra entry for offsets since it computes the length for index i using offset[i+1] - offset[i]
+      _invertedIndexOffsetWriter.writeInt(_invertedIndexOffset);
+      _dictionaryOffsetWriter.writeInt(_dictOffset);
+
+      byte[] headerBytes = VarLengthBytesValueReaderWriter.getHeaderBytes(_dictId);
+      _dictionaryHeaderWriter.write(headerBytes);
+      System.out.println("headerBytes = " + Arrays.toString(headerBytes));
+
+      _dictionaryHeaderWriter.close();
+      _dictionaryOffsetWriter.close();
+      _dictionaryWriter.close();
+      _invertedIndexOffsetWriter.close();
+      _invertedIndexWriter.close();
+
+      //data offsets started with zero but the actual dictionary and index will contain (header + offsets + data). so all the offsets must be adjusted ( i.e add size(header) + size(offset) to each offset value)
+      PinotDataBuffer dictionaryOffsetBuffer = PinotDataBuffer
+          .mapFile(dictionaryOffsetFile, false, 0, _dictionaryOffsetFile.length(), ByteOrder.BIG_ENDIAN,
+              "dictionary offset file");
+      int dictOffsetBase = _dictionaryHeaderWriter.size() + _dictionaryOffsetWriter.size();
+      for (int i = 0; i < _dictId + 1; i++) {
+        int offset = dictionaryOffsetBuffer.getInt(i * Integer.BYTES);
+        int newOffset = offset + dictOffsetBase;
+        dictionaryOffsetBuffer.putInt(i * Integer.BYTES, offset + dictOffsetBase);
+        System.out.println("dictId = " + i + ", offset = " + offset + ", newOffset = " + newOffset);
+      }
+
+      PinotDataBuffer invIndexOffsetBuffer = PinotDataBuffer
+          .mapFile(invertedIndexOffsetFile, false, 0, invertedIndexOffsetFile.length(), ByteOrder.BIG_ENDIAN,
+              "invertedIndexOffsetFile");
+      int invIndexOffsetBase = _invertedIndexOffsetWriter.size();
+      for (int i = 0; i < _dictId + 1; i++) {
+        int offset = invIndexOffsetBuffer.getInt(i * Integer.BYTES);
+        int newOffset = offset + invIndexOffsetBase;
+        System.out.println("offset = " + offset + ", newOffset = " + newOffset);
+
+        invIndexOffsetBuffer.putInt(i * Integer.BYTES, newOffset);
+      }
+
+      invIndexOffsetBuffer.close();
+      dictionaryOffsetBuffer.close();
+    }
+
+    public int getMaxDictionaryValueLength() {
+      return _maxDictionaryValueLength;
+    }
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+
+    String json0 = " { \"a\" : { \"b\" : 1, \"c\": 3, \"d\": [{\"x\" : 1}, {\"y\" : 1}] }, \"e\": \"f\", \"g\":2.3 }";
+    String json1 =
+        " { \"name\" : \"adam\", \"age\": 30, \"country\": \"us\", \"address\": {\"number\" : 112, \"street\": \"main st\", \"country\": \"us\"  } }";
+    String json2 = " { \"name\" : \"adam\", \"age\": 30 }";
+    String json3 = "{\n" + "  \"name\" : \"adam\",\n" + "  \"age\" : 30,\n" + "  \"country\" : \"us\",\n"
+        + "  \"addresses\" : [{\n" + "    \"number\" : 1,\n" + "    \"street\" : \"main st\",\n"
+        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 2,\n" + "    \"street\" : \"second st\",\n"
+        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 3,\n" + "    \"street\" : \"third st\",\n"
+        + "    \"country\" : \"us\"\n" + "  }]\n" + "}\n";
+
+    String json4 =
+        "{\n" + "    \"year\": [\n" + "        2018\n" + "    ],\n" + "    \"customers\": [\n" + "        {\n"
+            + "            \"name\": \"John\",\n" + "            \"contact\": [\n" + "                {\n"
+            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"333-3334\"\n"
+            + "                }\n" + "            ]\n" + "        },\n" + "        {\n"
+            + "            \"name\": \"Jane\",\n" + "            \"contact\": [\n" + "                {\n"
+            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"555-5556\"\n"
+            + "                }\n" + "            ],\n" + "            \"surname\": \"Shaw\"\n" + "        }\n"
+            + "    ]\n" + "}";
+
+    String json5 = "{ \n" + "  \"accounting\" : [   \n" + "                     { \"firstName\" : \"John\",  \n"
+        + "                       \"lastName\"  : \"Doe\",\n" + "                       \"age\"       : 23 },\n" + "\n"
+        + "                     { \"firstName\" : \"Mary\",  \n" + "                       \"lastName\"  : \"Smith\",\n"
+        + "                        \"age\"      : 32 }\n" + "                 ],                            \n"
+        + "  \"sales\"      : [ \n" + "                     { \"firstName\" : \"Sally\", \n"
+        + "                       \"lastName\"  : \"Green\",\n" + "                        \"age\"      : 27 },\n"
+        + "\n" + "                     { \"firstName\" : \"Jim\",   \n"
+        + "                       \"lastName\"  : \"Galley\",\n" + "                       \"age\"       : 41 }\n"
+        + "                 ] \n" + "} ";
+
+    String json = json3;
+    System.out.println("json = " + json);
+    JsonNode rawJsonNode = new ObjectMapper().readTree(json);
+
+    System.out.println(
+        "rawJsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(rawJsonNode));
+    String flattenJson = JsonFlattener.flatten(json);
+
+    System.out.println("flattenJson = " + flattenJson);
+    JsonNode jsonNode = new ObjectMapper().readTree(flattenJson);
+
+    System.out
+        .println("jsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode));
+    Map<String, Object> stringObjectMap = JsonFlattener.flattenAsMap(json);
+    System.out.println("JsonFlattener.flattenAsMap(json) = " + stringObjectMap);
+    FieldSpec fieldSpec = new DimensionFieldSpec();
+    fieldSpec.setName("person");
+    File tempDir = Files.createTempDir();
+    JSONIndexCreator creator = new JSONIndexCreator(tempDir, fieldSpec, FieldSpec.DataType.BYTES);
+    List<Map<String, String>> maps = creator.unnestJson(rawJsonNode);
+    System.out.println("maps = " + maps.toString().replaceAll("},", "}\n"));
+    creator.add(json.getBytes());
+
+    creator.seal();
+    System.out.println("Output Dir = " + tempDir);
+    System.out.println("FileUtils.listFiles(tempDir, null, true) = " + FileUtils.listFiles(tempDir, null, true).stream()
+        .map(file -> file.getName()).collect(Collectors.toList()));
+
+    //Test reader
+    PinotDataBuffer buffer =
+        PinotDataBuffer.mapReadOnlyBigEndianFile(new File(tempDir, fieldSpec.getName() + ".nested.idx"));
+    JSONIndexReader reader = new JSONIndexReader(buffer);
+    ExpressionContext lhs = ExpressionContext.forIdentifier("person");
+    Predicate predicate = new EqPredicate(lhs, "addresses.street" + POSTING_LIST_KEY_SEPARATOR + "third st");
+    MutableRoaringBitmap matchingDocIds = reader.getMatchingDocIds(predicate);
+    System.out.println("matchingDocIds = " + matchingDocIds);
+
+    //Test filter operator
+    FilterContext filterContext = QueryContextConverterUtils
+        .getFilter(CalciteSqlParser.compileToExpression("name='adam' AND addresses.street='main st'"));
+    int numDocs = 1;
+    JSONMatchFilterOperator operator = new JSONMatchFilterOperator("person", filterContext, reader, numDocs);
+    FilterBlock filterBlock = operator.nextBlock();
+    BlockDocIdIterator iterator = filterBlock.getBlockDocIdSet().iterator();
+    int docId = -1;
+    while ((docId = iterator.next()) > 0) {
+      System.out.println("docId = " + docId);
+    }
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java
deleted file mode 100644
index adfa19d..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.creator.impl.inv;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.github.wnameless.json.flattener.JsonFlattener;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.pinot.spi.data.FieldSpec;
-
-
-public class NestedObjectIndexCreator {
-
-  public NestedObjectIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType) {
-
-  }
-
-  private static List<Map<String, String>> unnestJson(JsonNode root) {
-    Iterator<Map.Entry<String, JsonNode>> fields = root.fields();
-    Map<String, String> flattenedSingleValuesMap = new TreeMap<>();
-    Map<String, JsonNode> arrNodes = new TreeMap<>();
-    Map<String, JsonNode> objectNodes = new TreeMap<>();
-    List<Map<String, String>> resultList = new ArrayList<>();
-    List<Map<String, String>> tempResultList = new ArrayList<>();
-    while (fields.hasNext()) {
-      Map.Entry<String, JsonNode> child = fields.next();
-      if (child.getValue().isValueNode()) {
-        //Normal value node
-        flattenedSingleValuesMap.put(child.getKey(), child.getValue().asText());
-      } else if (child.getValue().isArray()) {
-        //Array Node: Process these nodes later
-        arrNodes.put(child.getKey(), child.getValue());
-      } else {
-        //Object Node
-        objectNodes.put(child.getKey(), child.getValue());
-      }
-    }
-    for (String objectNodeKey : objectNodes.keySet()) {
-      JsonNode objectNode = objectNodes.get(objectNodeKey);
-      modifyKeysInMap(flattenedSingleValuesMap, tempResultList, objectNodeKey, objectNode);
-    }
-    if (tempResultList.isEmpty()) {
-      tempResultList.add(flattenedSingleValuesMap);
-    }
-    if (!arrNodes.isEmpty()) {
-      for (Map<String, String> flattenedMapElement : tempResultList) {
-        for (String arrNodeKey : arrNodes.keySet()) {
-          JsonNode arrNode = arrNodes.get(arrNodeKey);
-          for (JsonNode arrNodeElement : arrNode) {
-            modifyKeysInMap(flattenedMapElement, resultList, arrNodeKey, arrNodeElement);
-          }
-        }
-      }
-    } else {
-      resultList.addAll(tempResultList);
-    }
-    return resultList;
-  }
-
-  private static void modifyKeysInMap(Map<String, String> flattenedMap, List<Map<String, String>> resultList,
-      String arrNodeKey, JsonNode arrNode) {
-    List<Map<String, String>> objectResult = unnestJson(arrNode);
-    for (Map<String, String> flattenedObject : objectResult) {
-      Map<String, String> flattenedObjectCopy = new TreeMap<>(flattenedMap);
-      for (Map.Entry<String, String> entry : flattenedObject.entrySet()) {
-        flattenedObjectCopy.put(arrNodeKey + "." + entry.getKey(), entry.getValue());
-      }
-      resultList.add(flattenedObjectCopy);
-    }
-  }
-
-  public void add(byte[] data)
-      throws IOException {
-
-    JsonNode jsonNode = new ObjectMapper().readTree(data);
-    List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
-    for (Map<String, String> map : flattenedMapList) {
-
-    }
-  }
-
-  public static void main(String[] args)
-      throws IOException {
-
-    String json0 = " { \"a\" : { \"b\" : 1, \"c\": 3, \"d\": [{\"x\" : 1}, {\"y\" : 1}] }, \"e\": \"f\", \"g\":2.3 }";
-    String json1 =
-        " { \"name\" : \"adam\", \"age\": 30, \"country\": \"us\", \"address\": {\"number\" : 112, \"street\": \"main st\", \"country\": \"us\"  } }";
-    String json2 = " { \"name\" : \"adam\", \"age\": 30 }";
-    String json3 = "{\n" + "  \"name\" : \"adam\",\n" + "  \"age\" : 30,\n" + "  \"country\" : \"us\",\n"
-        + "  \"addresses\" : [{\n" + "    \"number\" : 1,\n" + "    \"street\" : \"main st\",\n"
-        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 2,\n" + "    \"street\" : \"second st\",\n"
-        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 3,\n" + "    \"street\" : \"third st\",\n"
-        + "    \"country\" : \"us\"\n" + "  }]\n" + "}\n";
-
-    String json4 =
-        "{\n" + "    \"year\": [\n" + "        2018\n" + "    ],\n" + "    \"customers\": [\n" + "        {\n"
-            + "            \"name\": \"John\",\n" + "            \"contact\": [\n" + "                {\n"
-            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"333-3334\"\n"
-            + "                }\n" + "            ]\n" + "        },\n" + "        {\n"
-            + "            \"name\": \"Jane\",\n" + "            \"contact\": [\n" + "                {\n"
-            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"555-5556\"\n"
-            + "                }\n" + "            ],\n" + "            \"surname\": \"Shaw\"\n" + "        }\n"
-            + "    ]\n" + "}";
-
-    String json5 = "{ \n" + "  \"accounting\" : [   \n" + "                     { \"firstName\" : \"John\",  \n"
-        + "                       \"lastName\"  : \"Doe\",\n" + "                       \"age\"       : 23 },\n" + "\n"
-        + "                     { \"firstName\" : \"Mary\",  \n" + "                       \"lastName\"  : \"Smith\",\n"
-        + "                        \"age\"      : 32 }\n" + "                 ],                            \n"
-        + "  \"sales\"      : [ \n" + "                     { \"firstName\" : \"Sally\", \n"
-        + "                       \"lastName\"  : \"Green\",\n" + "                        \"age\"      : 27 },\n"
-        + "\n" + "                     { \"firstName\" : \"Jim\",   \n"
-        + "                       \"lastName\"  : \"Galley\",\n" + "                       \"age\"       : 41 }\n"
-        + "                 ] \n" + "} ";
-
-    String json = json3;
-    System.out.println("json = " + json);
-    JsonNode rawJsonNode = new ObjectMapper().readTree(json);
-
-    System.out.println(
-        "rawJsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(rawJsonNode));
-    String flattenJson = JsonFlattener.flatten(json);
-
-    System.out.println("flattenJson = " + flattenJson);
-    JsonNode jsonNode = new ObjectMapper().readTree(flattenJson);
-
-    System.out
-        .println("jsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode));
-    Map<String, Object> stringObjectMap = JsonFlattener.flattenAsMap(json);
-    System.out.println("JsonFlattener.flattenAsMap(json) = " + stringObjectMap);
-    NestedObjectIndexCreator creator = new NestedObjectIndexCreator(null, null, null);
-    List<Map<String, String>> maps = creator.unnestJson(rawJsonNode);
-    System.out.println("maps = " + maps.toString().replaceAll("},", "}\n"));
-//    creator.add(json.getBytes());
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index a6817a0..ecf7e30 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -47,6 +47,7 @@ public class IndexLoadingConfig {
   private ReadMode _readMode = ReadMode.DEFAULT_MODE;
   private List<String> _sortedColumns = Collections.emptyList();
   private Set<String> _invertedIndexColumns = new HashSet<>();
+  private Set<String> _jsonIndexColumns = new HashSet<>();
   private Set<String> _textIndexColumns = new HashSet<>();
   private Set<String> _rangeIndexColumns = new HashSet<>();
   private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
@@ -92,6 +93,11 @@ public class IndexLoadingConfig {
       _invertedIndexColumns.addAll(invertedIndexColumns);
     }
 
+    List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
+    if (jsonIndexColumns != null) {
+      _jsonIndexColumns.addAll(jsonIndexColumns);
+    }
+
     List<String> rangeIndexColumns = indexingConfig.getRangeIndexColumns();
     if (rangeIndexColumns != null) {
       _rangeIndexColumns.addAll(rangeIndexColumns);
@@ -222,6 +228,10 @@ public class IndexLoadingConfig {
     return _invertedIndexColumns;
   }
 
+  public Set<String> getJsonIndexColumns() {
+    return _invertedIndexColumns;
+  }
+
   public Set<String> getRangeIndexColumns() {
     return _rangeIndexColumns;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
index ca7f7e9..06a7adb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax
 import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
 import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory;
 import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import org.apache.pinot.core.segment.index.loader.invertedindex.JSONIndexHandler;
 import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler;
 import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
@@ -113,6 +114,11 @@ public class SegmentPreProcessor implements AutoCloseable {
           new RangeIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
       rangeIndexHandler.createRangeIndices();
 
+      // Create json range indices according to the index config.
+      JSONIndexHandler jsonIndexHandler =
+          new JSONIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
+      jsonIndexHandler.createJsonIndices();
+
       Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns();
       if (textIndexColumns.size() > 0) {
         TextIndexHandler textIndexHandler =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java
new file mode 100644
index 0000000..90ca344
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.segment.creator.impl.inv.JSONIndexCreator;
+import org.apache.pinot.core.segment.index.column.PhysicalColumnIndexContainer;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class JSONIndexHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(JSONIndexHandler.class);
+
+  private final File _indexDir;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final String _segmentName;
+  private final SegmentVersion _segmentVersion;
+  private final Set<ColumnMetadata> _jsonIndexColumns = new HashSet<>();
+
+  public JSONIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+      SegmentDirectory.Writer segmentWriter) {
+    _indexDir = indexDir;
+    _segmentWriter = segmentWriter;
+    _segmentName = segmentMetadata.getName();
+    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+    // Only create json index on dictionary-encoded unsorted columns
+    for (String column : indexLoadingConfig.getJsonIndexColumns()) {
+      ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+      if (columnMetadata != null && !columnMetadata.isSorted()) {
+        _jsonIndexColumns.add(columnMetadata);
+      }
+    }
+  }
+
+  public void createJsonIndices()
+      throws IOException {
+    for (ColumnMetadata columnMetadata : _jsonIndexColumns) {
+      createJSONIndexForColumn(columnMetadata);
+    }
+  }
+
+  private void createJSONIndexForColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    String column = columnMetadata.getColumnName();
+
+    File inProgress = new File(_indexDir, column + JSON_INDEX_FILE_EXTENSION + ".inprogress");
+    File jsonIndexFile = new File(_indexDir, column + JSON_INDEX_FILE_EXTENSION);
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+
+      if (_segmentWriter.hasIndexFor(column, ColumnIndexType.JSON_INDEX)) {
+        // Skip creating json index if already exists.
+
+        LOGGER.info("Found json index for segment: {}, column: {}", _segmentName, column);
+        return;
+      }
+
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run gets interrupted.
+
+      // Remove json index if exists.
+      // For v1 and v2, it's the actual json index. For v3, it's the temporary json index.
+      FileUtils.deleteQuietly(jsonIndexFile);
+    }
+
+    // Create new json index for the column.
+    LOGGER.info("Creating new json index for segment: {}, column: {}", _segmentName, column);
+    if (columnMetadata.hasDictionary()) {
+      handleDictionaryBasedColumn(columnMetadata);
+    } else {
+      handleNonDictionaryBasedColumn(columnMetadata);
+    }
+
+    // For v3, write the generated json index file into the single file and remove it.
+    if (_segmentVersion == SegmentVersion.v3) {
+      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, jsonIndexFile, ColumnIndexType.JSON_INDEX);
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created json index for segment: {}, column: {}", _segmentName, column);
+  }
+
+  private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    int numDocs = columnMetadata.getTotalDocs();
+    try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+        ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+        Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter);
+        JSONIndexCreator jsonIndexCreator = new JSONIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+            FieldSpec.DataType.BYTES)) {
+      if (columnMetadata.isSingleValue()) {
+        // Single-value column
+        for (int i = 0; i < numDocs; i++) {
+          int dictId = forwardIndexReader.getDictId(i, readerContext);
+          jsonIndexCreator.add(dictionary.getBytesValue(dictId));
+        }
+      } else {
+        // Multi-value column
+        throw new IllegalStateException("JSON Indexing is not supported on multi-valued columns ");
+      }
+      jsonIndexCreator.seal();
+    }
+  }
+
+  private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    FieldSpec.DataType dataType = columnMetadata.getDataType();
+    if(dataType != FieldSpec.DataType.BYTES || dataType != FieldSpec.DataType.STRING) {
+      throw new UnsupportedOperationException("JSON indexing is only supported for STRING/BYTES datatype but found: "+ dataType);
+    }
+    int numDocs = columnMetadata.getTotalDocs();
+    try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+        ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+        JSONIndexCreator jsonIndexCreator = new JSONIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+            dataType)) {
+      if (columnMetadata.isSingleValue()) {
+        // Single-value column.
+        switch (dataType) {
+          case STRING:
+          case BYTES:
+            for (int i = 0; i < numDocs; i++) {
+              jsonIndexCreator.add(forwardIndexReader.getBytes(i, readerContext));
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unsupported data type: " + dataType);
+        }
+      } else {
+        // Multi-value column
+        switch (dataType) {
+          default:
+            throw new IllegalStateException("JSON Indexing is not supported on multi-valued columns ");
+        }
+      }
+      jsonIndexCreator.seal();
+    }
+  }
+
+  private ForwardIndexReader<?> getForwardIndexReader(ColumnMetadata columnMetadata,
+      SegmentDirectory.Writer segmentWriter)
+      throws IOException {
+    PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX);
+    int numRows = columnMetadata.getTotalDocs();
+    int numBitsPerValue = columnMetadata.getBitsPerElement();
+    if (columnMetadata.isSingleValue()) {
+      return new FixedBitSVForwardIndexReader(buffer, numRows, numBitsPerValue);
+    } else {
+      return new FixedBitMVForwardIndexReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(),
+          numBitsPerValue);
+    }
+  }
+
+  private BaseImmutableDictionary getDictionaryReader(ColumnMetadata columnMetadata,
+      SegmentDirectory.Writer segmentWriter)
+      throws IOException {
+    PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
+    BaseImmutableDictionary dictionary = PhysicalColumnIndexContainer.loadDictionary(buffer, columnMetadata, false);
+    return dictionary;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java
new file mode 100644
index 0000000..efb6c8b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class JSONIndexReader implements Closeable {
+
+  private static int EXPECTED_VERSION = 1;
+  private static int DICT_HEADER_INDEX = 0;
+  private static int DICT_OFFSET_INDEX = 1;
+  private static int DICT_DATA_INDEX = 2;
+  private static int INV_OFFSET_INDEX = 3;
+  private static int INV_DATA_INDEX = 4;
+  private static int FLATTENED_2_ROOT_INDEX = 5;
+
+  private final BitmapInvertedIndexReader invertedIndexReader;
+  private final StringDictionary dictionary;
+  private final long cardinality;
+  private final long numFlattenedDocs;
+
+  public JSONIndexReader(PinotDataBuffer pinotDataBuffer) {
+
+    int version = pinotDataBuffer.getInt(0);
+    int maxKeyLength = pinotDataBuffer.getInt(1 * Integer.BYTES);
+
+    Preconditions.checkState(version == EXPECTED_VERSION, String
+        .format("Index version:{} is not supported by this reader. expected version:{}", version, EXPECTED_VERSION));
+
+    // dictionaryHeaderFile, dictionaryOffsetFile, dictionaryFile, invIndexOffsetFile, invIndexFile, FlattenedDocId2DocIdMappingFile
+    int numBuffers = 6;
+    long bufferStartOffsets[] = new long[numBuffers];
+    long bufferSizeArray[] = new long[numBuffers];
+    for (int i = 0; i < numBuffers; i++) {
+      bufferStartOffsets[i] = pinotDataBuffer.getLong(2 * Integer.BYTES + 2 * i * Long.BYTES);
+      bufferSizeArray[i] = pinotDataBuffer.getLong(2 * Integer.BYTES + 2 * i * Long.BYTES + Long.BYTES);
+    }
+    cardinality = bufferSizeArray[DICT_OFFSET_INDEX] / Integer.BYTES - 1;
+    numFlattenedDocs = bufferSizeArray[FLATTENED_2_ROOT_INDEX] / Integer.BYTES;
+
+    long dictionaryStartOffset = bufferStartOffsets[DICT_HEADER_INDEX];
+    long dictionarySize =
+        bufferSizeArray[DICT_HEADER_INDEX] + bufferSizeArray[DICT_OFFSET_INDEX] + bufferSizeArray[DICT_DATA_INDEX];
+
+    //TODO: REMOVE DEBUG START
+    byte[] dictHeaderBytes = new byte[(int) bufferSizeArray[DICT_HEADER_INDEX]];
+    pinotDataBuffer.copyTo(bufferStartOffsets[DICT_HEADER_INDEX], dictHeaderBytes);
+    System.out.println("Arrays.toString(dictHeaderBytes) = " + Arrays.toString(dictHeaderBytes));
+    //TODO: REMOVE DEBUG  END
+
+    PinotDataBuffer dictionaryBuffer =
+        pinotDataBuffer.view(dictionaryStartOffset, dictionaryStartOffset + dictionarySize);
+    dictionary = new StringDictionary(dictionaryBuffer, (int) cardinality, maxKeyLength, Byte.valueOf("0"));
+
+    long invIndexStartOffset = bufferStartOffsets[INV_OFFSET_INDEX];
+    long invIndexSize = bufferSizeArray[INV_OFFSET_INDEX] + bufferSizeArray[INV_DATA_INDEX];
+
+    PinotDataBuffer invIndexBuffer = pinotDataBuffer.view(invIndexStartOffset, invIndexStartOffset + invIndexSize);
+    invertedIndexReader = new BitmapInvertedIndexReader(invIndexBuffer, (int) cardinality);
+
+    //TODO: REMOVE DEBUG START
+    for (int dictId = 0; dictId < dictionary.length(); dictId++) {
+      System.out.println("Key = " + new String(dictionary.getBytes(dictId)));
+      System.out.println("Posting List = " + invertedIndexReader.getDocIds(dictId));
+    }
+    //TODO: REMOVE DEBUG  END
+
+  }
+
+  /**
+   * Returns the matching document ids for the given search query.
+   */
+  public MutableRoaringBitmap getMatchingDocIds(Predicate predicate) {
+
+    PredicateEvaluator predicateEvaluator =
+        PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dictionary, FieldSpec.DataType.BYTES);
+    boolean exclusive = predicateEvaluator.isExclusive();
+    int[] dictIds = exclusive ? predicateEvaluator.getNonMatchingDictIds() : predicateEvaluator.getMatchingDictIds();
+    int numDictIds = dictIds.length;
+
+    if (numDictIds == 1) {
+      ImmutableRoaringBitmap docIds = (ImmutableRoaringBitmap) invertedIndexReader.getDocIds(dictIds[0]);
+      if (exclusive) {
+        if (docIds instanceof MutableRoaringBitmap) {
+          MutableRoaringBitmap mutableRoaringBitmap = (MutableRoaringBitmap) docIds;
+          mutableRoaringBitmap.flip(0L, numFlattenedDocs);
+          return mutableRoaringBitmap;
+        } else {
+          return ImmutableRoaringBitmap.flip(docIds, 0L, numFlattenedDocs);
+        }
+      } else {
+        return docIds.toMutableRoaringBitmap();
+      }
+    } else {
+      ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[numDictIds];
+      for (int i = 0; i < numDictIds; i++) {
+        bitmaps[i] = (ImmutableRoaringBitmap) invertedIndexReader.getDocIds(dictIds[i]);
+      }
+      MutableRoaringBitmap docIds = ImmutableRoaringBitmap.or(bitmaps);
+      if (exclusive) {
+        docIds.flip(0L, numFlattenedDocs);
+      }
+      return docIds;
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
index dcd21df..f9063b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
@@ -25,7 +25,8 @@ public enum ColumnIndexType {
   BLOOM_FILTER("bloom_filter"),
   NULLVALUE_VECTOR("nullvalue_vector"),
   TEXT_INDEX("text_index"),
-  RANGE_INDEX("range_index");
+  RANGE_INDEX("range_index"),
+  JSON_INDEX("json_index");
 
   private final String indexName;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
index 51dd2fb..4ffc133 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -26,6 +26,7 @@ import java.nio.ByteOrder;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -135,6 +136,10 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
       case TEXT_INDEX:
         filename = column + LuceneTextIndexCreator.LUCENE_TEXT_INDEX_FILE_EXTENSION;
         break;
+      case JSON_INDEX:
+        filename = column + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION;
+        break;
+
       default:
         throw new UnsupportedOperationException("Unknown index type: " + indexType.toString());
     }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 3dd137b..7965654 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -27,6 +27,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
 public class IndexingConfig extends BaseJsonConfig {
   private List<String> _invertedIndexColumns;
   private List<String> _rangeIndexColumns;
+  private List<String> _jsonIndexColumns;
   private boolean _autoGeneratedInvertedIndex;
   private boolean _createInvertedIndexDuringSegmentGeneration;
   private List<String> _sortedColumn;
@@ -71,6 +72,14 @@ public class IndexingConfig extends BaseJsonConfig {
     _rangeIndexColumns = rangeIndexColumns;
   }
 
+  public List<String> getJsonIndexColumns() {
+    return _jsonIndexColumns;
+  }
+
+  public void setJsonIndexColumns(List<String> jsonIndexColumns) {
+    _jsonIndexColumns = jsonIndexColumns;
+  }
+
   public boolean isAutoGeneratedInvertedIndex() {
     return _autoGeneratedInvertedIndex;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index b04ecb7..c9c245b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -337,7 +337,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec> {
   public enum DataType {
     // LIST is for complex lists which is different from multi-value column of primitives
     // STRUCT, MAP and LIST are composable to form a COMPLEX field
-    INT, LONG, FLOAT, DOUBLE, BOOLEAN/* Stored as STRING */, STRING, BYTES, STRUCT, MAP, LIST;
+    INT, LONG, FLOAT, DOUBLE, BOOLEAN/* Stored as STRING */, STRING, BYTES, STRUCT, MAP, LIST, JSON;
 
     /**
      * Returns the data type stored in Pinot.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/02: Initial commit

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch json-indexing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit fd32a3e2df4ee4373e95a18493473b75b823ccbf
Author: kishoreg <g....@gmail.com>
AuthorDate: Wed Oct 7 08:25:34 2020 -0700

    Initial commit
---
 pinot-core/pom.xml                                 |   5 +
 .../creator/impl/inv/NestedObjectIndexCreator.java | 158 +++++++++++++++++++++
 2 files changed, 163 insertions(+)

diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index c3bc4a1..4980269 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -164,6 +164,11 @@
       <artifactId>json-path</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.github.wnameless.json</groupId>
+      <artifactId>json-flattener</artifactId>
+      <version>0.9.0</version>
+    </dependency>
+    <dependency>
       <groupId>org.locationtech.jts</groupId>
       <artifactId>jts-core</artifactId>
     </dependency>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java
new file mode 100644
index 0000000..adfa19d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.wnameless.json.flattener.JsonFlattener;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class NestedObjectIndexCreator {
+
+  public NestedObjectIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType) {
+
+  }
+
+  private static List<Map<String, String>> unnestJson(JsonNode root) {
+    Iterator<Map.Entry<String, JsonNode>> fields = root.fields();
+    Map<String, String> flattenedSingleValuesMap = new TreeMap<>();
+    Map<String, JsonNode> arrNodes = new TreeMap<>();
+    Map<String, JsonNode> objectNodes = new TreeMap<>();
+    List<Map<String, String>> resultList = new ArrayList<>();
+    List<Map<String, String>> tempResultList = new ArrayList<>();
+    while (fields.hasNext()) {
+      Map.Entry<String, JsonNode> child = fields.next();
+      if (child.getValue().isValueNode()) {
+        //Normal value node
+        flattenedSingleValuesMap.put(child.getKey(), child.getValue().asText());
+      } else if (child.getValue().isArray()) {
+        //Array Node: Process these nodes later
+        arrNodes.put(child.getKey(), child.getValue());
+      } else {
+        //Object Node
+        objectNodes.put(child.getKey(), child.getValue());
+      }
+    }
+    for (String objectNodeKey : objectNodes.keySet()) {
+      JsonNode objectNode = objectNodes.get(objectNodeKey);
+      modifyKeysInMap(flattenedSingleValuesMap, tempResultList, objectNodeKey, objectNode);
+    }
+    if (tempResultList.isEmpty()) {
+      tempResultList.add(flattenedSingleValuesMap);
+    }
+    if (!arrNodes.isEmpty()) {
+      for (Map<String, String> flattenedMapElement : tempResultList) {
+        for (String arrNodeKey : arrNodes.keySet()) {
+          JsonNode arrNode = arrNodes.get(arrNodeKey);
+          for (JsonNode arrNodeElement : arrNode) {
+            modifyKeysInMap(flattenedMapElement, resultList, arrNodeKey, arrNodeElement);
+          }
+        }
+      }
+    } else {
+      resultList.addAll(tempResultList);
+    }
+    return resultList;
+  }
+
+  private static void modifyKeysInMap(Map<String, String> flattenedMap, List<Map<String, String>> resultList,
+      String arrNodeKey, JsonNode arrNode) {
+    List<Map<String, String>> objectResult = unnestJson(arrNode);
+    for (Map<String, String> flattenedObject : objectResult) {
+      Map<String, String> flattenedObjectCopy = new TreeMap<>(flattenedMap);
+      for (Map.Entry<String, String> entry : flattenedObject.entrySet()) {
+        flattenedObjectCopy.put(arrNodeKey + "." + entry.getKey(), entry.getValue());
+      }
+      resultList.add(flattenedObjectCopy);
+    }
+  }
+
+  public void add(byte[] data)
+      throws IOException {
+
+    JsonNode jsonNode = new ObjectMapper().readTree(data);
+    List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
+    for (Map<String, String> map : flattenedMapList) {
+
+    }
+  }
+
+  public static void main(String[] args)
+      throws IOException {
+
+    String json0 = " { \"a\" : { \"b\" : 1, \"c\": 3, \"d\": [{\"x\" : 1}, {\"y\" : 1}] }, \"e\": \"f\", \"g\":2.3 }";
+    String json1 =
+        " { \"name\" : \"adam\", \"age\": 30, \"country\": \"us\", \"address\": {\"number\" : 112, \"street\": \"main st\", \"country\": \"us\"  } }";
+    String json2 = " { \"name\" : \"adam\", \"age\": 30 }";
+    String json3 = "{\n" + "  \"name\" : \"adam\",\n" + "  \"age\" : 30,\n" + "  \"country\" : \"us\",\n"
+        + "  \"addresses\" : [{\n" + "    \"number\" : 1,\n" + "    \"street\" : \"main st\",\n"
+        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 2,\n" + "    \"street\" : \"second st\",\n"
+        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 3,\n" + "    \"street\" : \"third st\",\n"
+        + "    \"country\" : \"us\"\n" + "  }]\n" + "}\n";
+
+    String json4 =
+        "{\n" + "    \"year\": [\n" + "        2018\n" + "    ],\n" + "    \"customers\": [\n" + "        {\n"
+            + "            \"name\": \"John\",\n" + "            \"contact\": [\n" + "                {\n"
+            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"333-3334\"\n"
+            + "                }\n" + "            ]\n" + "        },\n" + "        {\n"
+            + "            \"name\": \"Jane\",\n" + "            \"contact\": [\n" + "                {\n"
+            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"555-5556\"\n"
+            + "                }\n" + "            ],\n" + "            \"surname\": \"Shaw\"\n" + "        }\n"
+            + "    ]\n" + "}";
+
+    String json5 = "{ \n" + "  \"accounting\" : [   \n" + "                     { \"firstName\" : \"John\",  \n"
+        + "                       \"lastName\"  : \"Doe\",\n" + "                       \"age\"       : 23 },\n" + "\n"
+        + "                     { \"firstName\" : \"Mary\",  \n" + "                       \"lastName\"  : \"Smith\",\n"
+        + "                        \"age\"      : 32 }\n" + "                 ],                            \n"
+        + "  \"sales\"      : [ \n" + "                     { \"firstName\" : \"Sally\", \n"
+        + "                       \"lastName\"  : \"Green\",\n" + "                        \"age\"      : 27 },\n"
+        + "\n" + "                     { \"firstName\" : \"Jim\",   \n"
+        + "                       \"lastName\"  : \"Galley\",\n" + "                       \"age\"       : 41 }\n"
+        + "                 ] \n" + "} ";
+
+    String json = json3;
+    System.out.println("json = " + json);
+    JsonNode rawJsonNode = new ObjectMapper().readTree(json);
+
+    System.out.println(
+        "rawJsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(rawJsonNode));
+    String flattenJson = JsonFlattener.flatten(json);
+
+    System.out.println("flattenJson = " + flattenJson);
+    JsonNode jsonNode = new ObjectMapper().readTree(flattenJson);
+
+    System.out
+        .println("jsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode));
+    Map<String, Object> stringObjectMap = JsonFlattener.flattenAsMap(json);
+    System.out.println("JsonFlattener.flattenAsMap(json) = " + stringObjectMap);
+    NestedObjectIndexCreator creator = new NestedObjectIndexCreator(null, null, null);
+    List<Map<String, String>> maps = creator.unnestJson(rawJsonNode);
+    System.out.println("maps = " + maps.toString().replaceAll("},", "}\n"));
+//    creator.add(json.getBytes());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org