You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/10/28 20:00:02 UTC

[incubator-pinot] 02/02: Adding index creator and reader

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch json-indexing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit bdfd3f105d170f3dc02f0a25feb3b922fe3ffc6c
Author: kishoreg <g....@gmail.com>
AuthorDate: Sun Oct 18 22:05:51 2020 -0700

    Adding index creator and reader
---
 .../io/util/VarLengthBytesValueReaderWriter.java   |  28 +-
 .../operator/filter/BitmapBasedFilterOperator.java |   4 +-
 .../core/operator/filter/FilterOperatorUtils.java  |   2 +-
 .../operator/filter/JSONMatchFilterOperator.java   | 138 +++++
 .../transform/function/CastTransformFunction.java  |   2 +-
 .../{Predicate.java => JSONMatchPredicate.java}    |  57 +-
 .../query/request/context/predicate/Predicate.java |   3 +-
 .../core/segment/creator/impl/V1Constants.java     |   1 +
 .../segment/creator/impl/inv/JSONIndexCreator.java | 656 +++++++++++++++++++++
 .../creator/impl/inv/NestedObjectIndexCreator.java | 158 -----
 .../segment/index/loader/IndexLoadingConfig.java   |  10 +
 .../segment/index/loader/SegmentPreProcessor.java  |   6 +
 .../loader/invertedindex/JSONIndexHandler.java     | 205 +++++++
 .../segment/index/readers/JSONIndexReader.java     | 147 +++++
 .../pinot/core/segment/store/ColumnIndexType.java  |   3 +-
 .../core/segment/store/FilePerIndexDirectory.java  |   5 +
 .../pinot/spi/config/table/IndexingConfig.java     |   9 +
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |   2 +-
 18 files changed, 1249 insertions(+), 187 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
index 5a6a25a..88f56f6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.core.io.util;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Arrays;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -63,19 +66,19 @@ public class VarLengthBytesValueReaderWriter implements ValueReader {
   /**
    * Magic bytes used to identify the dictionary files written in variable length bytes format.
    */
-  private static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;");
+  public static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;");
 
   /**
    * Increment this version if there are any structural changes in the store format and
    * deal with backward compatibility correctly based on old versions.
    */
-  private static final int VERSION = 1;
+  public static final int VERSION = 1;
 
   // Offsets of different fields in the header. Having as constants for readability.
-  private static final int VERSION_OFFSET = MAGIC_BYTES.length;
-  private static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES;
-  private static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES;
-  private static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES;
+  public static final int VERSION_OFFSET = MAGIC_BYTES.length;
+  public static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES;
+  public static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES;
+  public static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES;
 
   private final PinotDataBuffer _dataBuffer;
 
@@ -144,6 +147,19 @@ public class VarLengthBytesValueReaderWriter implements ValueReader {
     return false;
   }
 
+  public static byte[] getHeaderBytes(int numElements)
+      throws IOException {
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream(HEADER_LENGTH);
+    DataOutputStream dos = new DataOutputStream(out);
+    dos.write(MAGIC_BYTES);
+    dos.writeInt(VERSION);
+    dos.writeInt(numElements);
+    dos.writeInt(HEADER_LENGTH);
+    dos.close();
+    return out.toByteArray();
+  }
+
   private void writeHeader() {
     for (int offset = 0; offset < MAGIC_BYTES.length; offset++) {
       _dataBuffer.putByte(offset, MAGIC_BYTES[offset]);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
index d9c25e1..9c307a3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
@@ -38,9 +38,9 @@ public class BitmapBasedFilterOperator extends BaseFilterOperator {
   private final boolean _exclusive;
   private final int _numDocs;
 
-  BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs) {
+  public BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, InvertedIndexReader invertedIndexReader, int numDocs) {
     _predicateEvaluator = predicateEvaluator;
-    _invertedIndexReader = dataSource.getInvertedIndex();
+    _invertedIndexReader = invertedIndexReader;
     _docIds = null;
     _exclusive = predicateEvaluator.isExclusive();
     _numDocs = numDocs;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
index 3bc676e..5076ef6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
@@ -65,7 +65,7 @@ public class FilterOperatorUtils {
         return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
       }
       if (dataSource.getInvertedIndex() != null) {
-        return new BitmapBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
+        return new BitmapBasedFilterOperator(predicateEvaluator, dataSource.getInvertedIndex(), numDocs);
       }
       return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java
new file mode 100644
index 0000000..adb0550
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JSONMatchFilterOperator.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.EqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.InPredicate;
+import org.apache.pinot.core.query.request.context.predicate.NotEqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.NotInPredicate;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.segment.creator.impl.inv.JSONIndexCreator;
+import org.apache.pinot.core.segment.index.readers.JSONIndexReader;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+@SuppressWarnings("rawtypes")
+public class JSONMatchFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "JSONMatchFilterOperator";
+
+  private final JSONIndexReader _nestedObjectIndexReader;
+  private final int _numDocs;
+  private String _column;
+  private final FilterContext _filterContext;
+
+  public JSONMatchFilterOperator(String column, FilterContext filterContext, JSONIndexReader nestedObjectIndexReader,
+      int numDocs) {
+    _column = column;
+    _filterContext = filterContext;
+    _nestedObjectIndexReader = nestedObjectIndexReader;
+    _numDocs = numDocs;
+  }
+
+  @Override
+  protected FilterBlock getNextBlock() {
+    ImmutableRoaringBitmap docIds = process(_filterContext);
+    System.out.println("docIds = " + docIds);
+    return new FilterBlock(new BitmapDocIdSet(docIds, _numDocs));
+  }
+
+  private MutableRoaringBitmap process(FilterContext filterContext) {
+    List<FilterContext> children = _filterContext.getChildren();
+    MutableRoaringBitmap resultBitmap = null;
+
+    switch (filterContext.getType()) {
+      case AND:
+        for (FilterContext child : children) {
+          if (resultBitmap == null) {
+            resultBitmap = process(child);
+          } else {
+            resultBitmap.and(process(child));
+          }
+        }
+        break;
+      case OR:
+        for (FilterContext child : children) {
+          if (resultBitmap == null) {
+            resultBitmap = process(child);
+          } else {
+            resultBitmap.or(process(child));
+          }
+        }
+        break;
+      case PREDICATE:
+        Predicate predicate = filterContext.getPredicate();
+        Predicate newPredicate = null;
+        switch (predicate.getType()) {
+
+          case EQ:
+            EqPredicate eqPredicate = (EqPredicate) predicate;
+            newPredicate = new EqPredicate(ExpressionContext.forIdentifier(_column),
+                eqPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR + eqPredicate
+                    .getValue());
+            break;
+          case NOT_EQ:
+            NotEqPredicate nEqPredicate = (NotEqPredicate) predicate;
+            newPredicate = new NotEqPredicate(ExpressionContext.forIdentifier(_column),
+                nEqPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR
+                    + nEqPredicate.getValue());
+            break;
+          case IN:
+            InPredicate inPredicate = (InPredicate) predicate;
+            List<String> newInValues = inPredicate.getValues().stream().map(
+                value -> inPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR
+                    + value).collect(Collectors.toList());
+            newPredicate = new InPredicate(ExpressionContext.forIdentifier(_column), newInValues);
+            break;
+          case NOT_IN:
+            NotInPredicate notInPredicate = (NotInPredicate) predicate;
+            List<String> newNotInValues = inPredicate.getValues().stream().map(
+                value -> notInPredicate.getLhs().getIdentifier() + JSONIndexCreator.POSTING_LIST_KEY_SEPARATOR
+                    + value).collect(Collectors.toList());
+            newPredicate = new InPredicate(ExpressionContext.forIdentifier(_column), newNotInValues);
+            break;
+          case IS_NULL:
+            newPredicate = predicate;
+            break;
+          case IS_NOT_NULL:
+            newPredicate = predicate;
+            break;
+          case RANGE:
+          case REGEXP_LIKE:
+          case TEXT_MATCH:
+            throw new UnsupportedOperationException("JSON Match does not support RANGE, REGEXP or TEXTMATCH");
+        }
+
+        resultBitmap = _nestedObjectIndexReader.getMatchingDocIds(newPredicate);
+        break;
+    }
+    return resultBitmap;
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java
index c826446..cb6227f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CastTransformFunction.java
@@ -30,7 +30,7 @@ import org.apache.pinot.core.operator.transform.transformer.datetime.SDFToSDFTra
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 
 
-public class CastTransformFunction extends BaseTransformFunction {
+  public class CastTransformFunction extends BaseTransformFunction {
   public static final String FUNCTION_NAME = "cast";
 
   private TransformFunction _transformFunction;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java
index 203e950..97ad787 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/JSONMatchPredicate.java
@@ -18,30 +18,55 @@
  */
 package org.apache.pinot.core.query.request.context.predicate;
 
+import java.util.Objects;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 
 
 /**
- * The {@code Predicate} class represents the predicate in the filter.
- * <p>Currently the query engine only accepts string literals as the right-hand side of the predicate, so we store the
- * right-hand side of the predicate as string or list of strings.
+ * Predicate for JSON_MATCH.
  */
-public interface Predicate {
-  enum Type {
-    EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL;
+public class JSONMatchPredicate implements Predicate {
+  private final ExpressionContext _lhs;
+  private final String _value;
 
-    public boolean isExclusive() {
-      return this == NOT_EQ || this == NOT_IN || this == IS_NOT_NULL;
+  public JSONMatchPredicate(ExpressionContext lhs, String value) {
+    _lhs = lhs;
+    _value = value;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.JSON_MATCH;
+  }
+
+  @Override
+  public ExpressionContext getLhs() {
+    return _lhs;
+  }
+
+  public String getValue() {
+    return _value;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof JSONMatchPredicate)) {
+      return false;
     }
+    JSONMatchPredicate that = (JSONMatchPredicate) o;
+    return Objects.equals(_lhs, that._lhs) && Objects.equals(_value, that._value);
   }
 
-  /**
-   * Returns the type of the predicate.
-   */
-  Type getType();
+  @Override
+  public int hashCode() {
+    return Objects.hash(_lhs, _value);
+  }
 
-  /**
-   * Returns the left-hand side expression of the predicate.
-   */
-  ExpressionContext getLhs();
+  @Override
+  public String toString() {
+    return "json_match(" + _lhs + ",'" + _value + "')";
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
index 203e950..a204fc6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/Predicate.java
@@ -28,7 +28,7 @@ import org.apache.pinot.core.query.request.context.ExpressionContext;
  */
 public interface Predicate {
   enum Type {
-    EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL;
+    EQ, NOT_EQ, IN, NOT_IN, RANGE, REGEXP_LIKE, TEXT_MATCH, IS_NULL, IS_NOT_NULL, JSON_MATCH;
 
     public boolean isExclusive() {
       return this == NOT_EQ || this == NOT_IN || this == IS_NOT_NULL;
@@ -44,4 +44,5 @@ public interface Predicate {
    * Returns the left-hand side expression of the predicate.
    */
   ExpressionContext getLhs();
+
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 0498f8c..e745120 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -36,6 +36,7 @@ public class V1Constants {
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
+    public static final String JSON_INDEX_FILE_EXTENSION = ".json.idx";
     public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
     public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
     public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java
new file mode 100644
index 0000000..705f690
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java
@@ -0,0 +1,656 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.wnameless.json.flattener.JsonFlattener;
+import com.google.common.io.Files;
+import com.google.common.primitives.UnsignedBytes;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.filter.JSONMatchFilterOperator;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.EqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.segment.index.readers.JSONIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class JSONIndexCreator implements Closeable {
+
+  //separator used to join the key and value to create posting list key
+  public static String POSTING_LIST_KEY_SEPARATOR = "|";
+  static int FLUSH_THRESHOLD = 50_000;
+  static int VERSION = 1;
+  private final File flattenedDocId2RootDocIdMappingFile;
+  private final File postingListFile;
+  private File dictionaryheaderFile;
+  private File dictionaryOffsetFile;
+  private File dictionaryFile;
+  private File invertedIndexOffsetFile;
+  private File invertedIndexFile;
+  private File outputIndexFile;
+
+  private int docId = 0;
+  private int numFlatennedDocId = 0;
+  int chunkId = 0;
+
+  private DataOutputStream postingListWriter;
+  private DataOutputStream flattenedDocId2RootDocIdWriter;
+
+  Map<String, List<Integer>> postingListMap = new TreeMap<>();
+  List<Integer> flattenedDocIdList = new ArrayList<>();
+  List<Integer> postingListChunkOffsets = new ArrayList<>();
+  List<Integer> chunkLengths = new ArrayList<>();
+  private FieldSpec fieldSpec;
+
+  public JSONIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType)
+      throws IOException {
+    this.fieldSpec = fieldSpec;
+    System.out.println("indexDir = " + indexDir);
+
+    String name = fieldSpec.getName();
+    postingListFile = new File(indexDir + name + "_postingList.buf");
+    postingListWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(postingListFile)));
+    postingListChunkOffsets.add(postingListWriter.size());
+
+    dictionaryheaderFile = new File(indexDir, name + "_dictionaryHeader.buf");
+    dictionaryOffsetFile = new File(indexDir, name + "_dictionaryOffset.buf");
+    dictionaryFile = new File(indexDir, name + "_dictionary.buf");
+    invertedIndexOffsetFile = new File(indexDir, name + "_invertedIndexOffset.buf");
+    invertedIndexFile = new File(indexDir, name + "_invertedIndex.buf");
+    flattenedDocId2RootDocIdMappingFile = new File(indexDir, name + "_flattenedDocId.buf");
+    flattenedDocId2RootDocIdWriter =
+        new DataOutputStream(new BufferedOutputStream(new FileOutputStream(flattenedDocId2RootDocIdMappingFile)));
+
+    //output file
+    outputIndexFile = new File(indexDir, name + ".nested.idx");
+  }
+
+  public void add(byte[] data)
+      throws IOException {
+
+    JsonNode jsonNode = new ObjectMapper().readTree(data);
+    List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
+    for (Map<String, String> map : flattenedMapList) {
+      //
+      for (Map.Entry<String, String> entry : map.entrySet()) {
+        //handle key posting list
+        String key = entry.getKey();
+
+        List<Integer> keyPostingList = postingListMap.get(key);
+        if (keyPostingList == null) {
+          keyPostingList = new ArrayList<>();
+          postingListMap.put(key, keyPostingList);
+        }
+        keyPostingList.add(numFlatennedDocId);
+
+        //handle keyvalue posting list
+        String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue();
+        List<Integer> keyValuePostingList = postingListMap.get(keyValue);
+        if (keyValuePostingList == null) {
+          keyValuePostingList = new ArrayList<>();
+          postingListMap.put(keyValue, keyValuePostingList);
+        }
+        keyValuePostingList.add(numFlatennedDocId);
+      }
+      //flattenedDocId2RootDocIdMapping
+      flattenedDocIdList.add(numFlatennedDocId);
+
+      numFlatennedDocId++;
+    }
+    docId++;
+
+    //flush data
+    if (docId % FLUSH_THRESHOLD == 0) {
+      flush();
+    }
+  }
+
+  /**
+   * Multi value
+   * @param dataArray
+   * @param length
+   * @throws IOException
+   */
+  public void add(byte[][] dataArray, int length)
+      throws IOException {
+
+    for (int i = 0; i < length; i++) {
+      byte[] data = dataArray[i];
+      JsonNode jsonNode = new ObjectMapper().readTree(data);
+      List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
+      for (Map<String, String> map : flattenedMapList) {
+        //
+        for (Map.Entry<String, String> entry : map.entrySet()) {
+          //handle key posting list
+          String key = entry.getKey();
+
+          List<Integer> keyPostingList = postingListMap.get(key);
+          if (keyPostingList == null) {
+            keyPostingList = new ArrayList<>();
+            postingListMap.put(key, keyPostingList);
+          }
+          keyPostingList.add(numFlatennedDocId);
+
+          //handle keyvalue posting list
+          String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue();
+          List<Integer> keyValuePostingList = postingListMap.get(keyValue);
+          if (keyValuePostingList == null) {
+            keyValuePostingList = new ArrayList<>();
+            postingListMap.put(keyValue, keyValuePostingList);
+          }
+          keyValuePostingList.add(numFlatennedDocId);
+        }
+        //flattenedDocId2RootDocIdMapping
+        flattenedDocIdList.add(numFlatennedDocId);
+
+        numFlatennedDocId++;
+      }
+    }
+    docId++;
+
+    //flush data
+    if (docId % FLUSH_THRESHOLD == 0) {
+      flush();
+    }
+  }
+
+  public void seal()
+      throws IOException {
+
+    flush();
+
+    flattenedDocId2RootDocIdWriter.close();
+    postingListWriter.close();
+
+    //key posting list merging
+    System.out.println("InvertedIndex");
+    System.out.println("=================");
+
+    int maxKeyLength = createInvertedIndex(postingListFile, postingListChunkOffsets, chunkLengths);
+    System.out.println("=================");
+
+    int flattenedDocid = 0;
+    DataInputStream flattenedDocId2RootDocIdReader =
+        new DataInputStream(new BufferedInputStream(new FileInputStream(flattenedDocId2RootDocIdMappingFile)));
+    int[] rootDocIdArray = new int[numFlatennedDocId];
+    while (flattenedDocid < numFlatennedDocId) {
+      rootDocIdArray[flattenedDocid++] = flattenedDocId2RootDocIdReader.readInt();
+    }
+    System.out.println("FlattenedDocId  to RootDocId Mapping = ");
+    System.out.println(Arrays.toString(rootDocIdArray));
+
+    //PUT all contents into one file
+
+    //header
+    // version + maxDictionaryLength + [store the offsets + length for each one (dictionary offset file, dictionaryFile, index offset file, index file, flattened docId to rootDocId file)]
+    long headerSize = 2 * Integer.BYTES + 6 * 2 * Long.BYTES;
+
+    long dataSize =
+        dictionaryheaderFile.length() + dictionaryOffsetFile.length() + dictionaryFile.length() + invertedIndexFile
+            .length() + invertedIndexOffsetFile.length() + flattenedDocId2RootDocIdMappingFile.length();
+
+    long totalSize = headerSize + dataSize;
+    PinotDataBuffer pinotDataBuffer =
+        PinotDataBuffer.mapFile(outputIndexFile, false, 0, totalSize, ByteOrder.BIG_ENDIAN, "Nested inverted index");
+
+    pinotDataBuffer.putInt(0, VERSION);
+    pinotDataBuffer.putInt(1 * Integer.BYTES, maxKeyLength);
+    long writtenBytes = headerSize;
+
+    //add dictionary header
+    int bufferId = 0;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryheaderFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, dictionaryheaderFile, 0, dictionaryheaderFile.length());
+    writtenBytes += dictionaryheaderFile.length();
+
+    //add dictionary offset
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryOffsetFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, dictionaryOffsetFile, 0, dictionaryOffsetFile.length());
+    writtenBytes += dictionaryOffsetFile.length();
+
+    //add dictionary
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, dictionaryFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, dictionaryFile, 0, dictionaryFile.length());
+    writtenBytes += dictionaryFile.length();
+
+    //add index offset
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, invertedIndexOffsetFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, invertedIndexOffsetFile, 0, invertedIndexOffsetFile.length());
+    writtenBytes += invertedIndexOffsetFile.length();
+
+    //add index data
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, invertedIndexFile.length());
+    pinotDataBuffer.readFrom(writtenBytes, invertedIndexFile, 0, invertedIndexFile.length());
+    writtenBytes += invertedIndexFile.length();
+
+    //add flattened docid to root doc id mapping
+    bufferId = bufferId + 1;
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId), writtenBytes);
+    pinotDataBuffer.putLong(getBufferStartOffset(bufferId) + Long.BYTES, flattenedDocId2RootDocIdMappingFile.length());
+    pinotDataBuffer
+        .readFrom(writtenBytes, flattenedDocId2RootDocIdMappingFile, 0, flattenedDocId2RootDocIdMappingFile.length());
+    writtenBytes += flattenedDocId2RootDocIdMappingFile.length();
+  }
+
+  private long getBufferStartOffset(int bufferId) {
+    return 2 * Integer.BYTES + 2 * bufferId * Long.BYTES;
+  }
+
+  private int createInvertedIndex(File postingListFile, List<Integer> postingListChunkOffsets,
+      List<Integer> chunkLengthList)
+      throws IOException {
+
+    List<Iterator<ImmutablePair<byte[], int[]>>> chunkIterators = new ArrayList<>();
+
+    for (int i = 0; i < chunkLengthList.size(); i++) {
+
+      final DataInputStream postingListFileReader =
+          new DataInputStream(new BufferedInputStream(new FileInputStream(postingListFile)));
+      postingListFileReader.skipBytes(postingListChunkOffsets.get(i));
+      final int length = chunkLengthList.get(i);
+      chunkIterators.add(new Iterator<ImmutablePair<byte[], int[]>>() {
+        int index = 0;
+
+        @Override
+        public boolean hasNext() {
+          return index < length;
+        }
+
+        @Override
+        public ImmutablePair<byte[], int[]> next() {
+          try {
+            int keyLength = postingListFileReader.readInt();
+            byte[] keyBytes = new byte[keyLength];
+            postingListFileReader.read(keyBytes);
+
+            int postingListLength = postingListFileReader.readInt();
+            int[] postingList = new int[postingListLength];
+            for (int i = 0; i < postingListLength; i++) {
+              postingList[i] = postingListFileReader.readInt();
+            }
+            index++;
+            return ImmutablePair.of(keyBytes, postingList);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+    final Comparator<byte[]> byteArrayComparator = UnsignedBytes.lexicographicalComparator();
+
+    PriorityQueue<ImmutablePair<Integer, ImmutablePair<byte[], int[]>>> queue =
+        new PriorityQueue<>(chunkLengthList.size(),
+            (o1, o2) -> byteArrayComparator.compare(o1.getRight().getLeft(), o2.getRight().getLeft()));
+    for (int i = 0; i < chunkIterators.size(); i++) {
+      Iterator<ImmutablePair<byte[], int[]>> iterator = chunkIterators.get(i);
+      if (iterator.hasNext()) {
+        queue.offer(ImmutablePair.of(i, iterator.next()));
+      }
+    }
+    byte[] prevKey = null;
+    RoaringBitmap roaringBitmap = new RoaringBitmap();
+
+    Writer writer = new Writer(dictionaryheaderFile, dictionaryOffsetFile, dictionaryFile, invertedIndexOffsetFile,
+        invertedIndexFile);
+    while (!queue.isEmpty()) {
+      ImmutablePair<Integer, ImmutablePair<byte[], int[]>> poll = queue.poll();
+      byte[] currKey = poll.getRight().getLeft();
+      if (prevKey != null && byteArrayComparator.compare(prevKey, currKey) != 0) {
+        System.out.println(new String(prevKey) + ":" + roaringBitmap);
+        writer.add(prevKey, roaringBitmap);
+        roaringBitmap.clear();
+      }
+
+      roaringBitmap.add(poll.getRight().getRight());
+      prevKey = currKey;
+
+      //add the next key from the chunk where the currKey was removed from
+      Iterator<ImmutablePair<byte[], int[]>> iterator = chunkIterators.get(poll.getLeft());
+      if (iterator.hasNext()) {
+        queue.offer(ImmutablePair.of(poll.getLeft(), iterator.next()));
+      }
+    }
+
+    if (prevKey != null) {
+      writer.add(prevKey, roaringBitmap);
+    }
+    writer.finish();
+    return writer.getMaxDictionaryValueLength();
+  }
+
+  private void flush()
+      throws IOException {
+    //write the key (length|actual bytes) - posting list(length, flattenedDocIds)
+    System.out.println("postingListMap = " + postingListMap);
+    for (Map.Entry<String, List<Integer>> entry : postingListMap.entrySet()) {
+      byte[] keyBytes = entry.getKey().getBytes(Charset.forName("UTF-8"));
+      postingListWriter.writeInt(keyBytes.length);
+      postingListWriter.write(keyBytes);
+      List<Integer> flattenedDocIdList = entry.getValue();
+      postingListWriter.writeInt(flattenedDocIdList.size());
+      for (int flattenedDocId : flattenedDocIdList) {
+        postingListWriter.writeInt(flattenedDocId);
+      }
+    }
+
+    //write flattened doc id to root docId mapping
+    for (int rootDocId : flattenedDocIdList) {
+      flattenedDocId2RootDocIdWriter.writeInt(rootDocId);
+    }
+    chunkLengths.add(postingListMap.size());
+    postingListChunkOffsets.add(postingListWriter.size());
+    postingListMap.clear();
+    flattenedDocIdList.clear();
+  }
+
+  private static List<Map<String, String>> unnestJson(JsonNode root) {
+    Iterator<Map.Entry<String, JsonNode>> fields = root.fields();
+    Map<String, String> flattenedSingleValuesMap = new TreeMap<>();
+    Map<String, JsonNode> arrNodes = new TreeMap<>();
+    Map<String, JsonNode> objectNodes = new TreeMap<>();
+    List<Map<String, String>> resultList = new ArrayList<>();
+    List<Map<String, String>> tempResultList = new ArrayList<>();
+    while (fields.hasNext()) {
+      Map.Entry<String, JsonNode> child = fields.next();
+      if (child.getValue().isValueNode()) {
+        //Normal value node
+        flattenedSingleValuesMap.put(child.getKey(), child.getValue().asText());
+      } else if (child.getValue().isArray()) {
+        //Array Node: Process these nodes later
+        arrNodes.put(child.getKey(), child.getValue());
+      } else {
+        //Object Node
+        objectNodes.put(child.getKey(), child.getValue());
+      }
+    }
+    for (String objectNodeKey : objectNodes.keySet()) {
+      JsonNode objectNode = objectNodes.get(objectNodeKey);
+      modifyKeysInMap(flattenedSingleValuesMap, tempResultList, objectNodeKey, objectNode);
+    }
+    if (tempResultList.isEmpty()) {
+      tempResultList.add(flattenedSingleValuesMap);
+    }
+    if (!arrNodes.isEmpty()) {
+      for (Map<String, String> flattenedMapElement : tempResultList) {
+        for (String arrNodeKey : arrNodes.keySet()) {
+          JsonNode arrNode = arrNodes.get(arrNodeKey);
+          for (JsonNode arrNodeElement : arrNode) {
+            modifyKeysInMap(flattenedMapElement, resultList, arrNodeKey, arrNodeElement);
+          }
+        }
+      }
+    } else {
+      resultList.addAll(tempResultList);
+    }
+    return resultList;
+  }
+
+  private static void modifyKeysInMap(Map<String, String> flattenedMap, List<Map<String, String>> resultList,
+      String arrNodeKey, JsonNode arrNode) {
+    List<Map<String, String>> objectResult = unnestJson(arrNode);
+    for (Map<String, String> flattenedObject : objectResult) {
+      Map<String, String> flattenedObjectCopy = new TreeMap<>(flattenedMap);
+      for (Map.Entry<String, String> entry : flattenedObject.entrySet()) {
+        flattenedObjectCopy.put(arrNodeKey + "." + entry.getKey(), entry.getValue());
+      }
+      resultList.add(flattenedObjectCopy);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+
+  }
+
+  private class Writer {
+    private DataOutputStream _dictionaryHeaderWriter;
+    private DataOutputStream _dictionaryOffsetWriter;
+    private File _dictionaryOffsetFile;
+    private DataOutputStream _dictionaryWriter;
+    private DataOutputStream _invertedIndexOffsetWriter;
+    private File _invertedIndexOffsetFile;
+    private DataOutputStream _invertedIndexWriter;
+    private int _dictId;
+    private int _dictOffset;
+    private int _invertedIndexOffset;
+    int _maxDictionaryValueLength = Integer.MIN_VALUE;
+
+    public Writer(File dictionaryheaderFile, File dictionaryOffsetFile, File dictionaryFile,
+        File invertedIndexOffsetFile, File invertedIndexFile)
+        throws IOException {
+      _dictionaryHeaderWriter =
+          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryheaderFile)));
+
+      _dictionaryOffsetWriter =
+          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryOffsetFile)));
+      _dictionaryOffsetFile = dictionaryOffsetFile;
+      _dictionaryWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dictionaryFile)));
+      _invertedIndexOffsetWriter =
+          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(invertedIndexOffsetFile)));
+      _invertedIndexOffsetFile = invertedIndexOffsetFile;
+      _invertedIndexWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(invertedIndexFile)));
+      _dictId = 0;
+      _dictOffset = 0;
+      _invertedIndexOffset = 0;
+    }
+
+    public void add(byte[] key, RoaringBitmap roaringBitmap)
+        throws IOException {
+      if (key.length > _maxDictionaryValueLength) {
+        _maxDictionaryValueLength = key.length;
+      }
+      //write the key to dictionary
+      _dictionaryOffsetWriter.writeInt(_dictOffset);
+      _dictionaryWriter.write(key);
+
+      //write the roaringBitmap to inverted index
+      _invertedIndexOffsetWriter.writeInt(_invertedIndexOffset);
+
+      int serializedSizeInBytes = roaringBitmap.serializedSizeInBytes();
+      byte[] serializedRoaringBitmap = new byte[serializedSizeInBytes];
+      ByteBuffer serializedRoaringBitmapBuffer = ByteBuffer.wrap(serializedRoaringBitmap);
+      roaringBitmap.serialize(serializedRoaringBitmapBuffer);
+      _invertedIndexWriter.write(serializedRoaringBitmap);
+      System.out.println(
+          "dictId = " + _dictId + ", dict offset:" + _dictOffset + ", valueLength:" + key.length + ", inv offset:"
+              + _invertedIndexOffset + ", serializedSizeInBytes:" + serializedSizeInBytes);
+
+      //increment offsets
+      _dictOffset = _dictOffset + key.length;
+      _invertedIndexOffset = _invertedIndexOffset + serializedSizeInBytes;
+      //increment the dictionary id
+      _dictId = _dictId + 1;
+    }
+
+    void finish()
+        throws IOException {
+      //InvertedIndexReader and VarlengthBytesValueReaderWriter needs one extra entry for offsets since it computes the length for index i using offset[i+1] - offset[i]
+      _invertedIndexOffsetWriter.writeInt(_invertedIndexOffset);
+      _dictionaryOffsetWriter.writeInt(_dictOffset);
+
+      byte[] headerBytes = VarLengthBytesValueReaderWriter.getHeaderBytes(_dictId);
+      _dictionaryHeaderWriter.write(headerBytes);
+      System.out.println("headerBytes = " + Arrays.toString(headerBytes));
+
+      _dictionaryHeaderWriter.close();
+      _dictionaryOffsetWriter.close();
+      _dictionaryWriter.close();
+      _invertedIndexOffsetWriter.close();
+      _invertedIndexWriter.close();
+
+      //data offsets started with zero but the actual dictionary and index will contain (header + offsets + data). so all the offsets must be adjusted ( i.e add size(header) + size(offset) to each offset value)
+      PinotDataBuffer dictionaryOffsetBuffer = PinotDataBuffer
+          .mapFile(dictionaryOffsetFile, false, 0, _dictionaryOffsetFile.length(), ByteOrder.BIG_ENDIAN,
+              "dictionary offset file");
+      int dictOffsetBase = _dictionaryHeaderWriter.size() + _dictionaryOffsetWriter.size();
+      for (int i = 0; i < _dictId + 1; i++) {
+        int offset = dictionaryOffsetBuffer.getInt(i * Integer.BYTES);
+        int newOffset = offset + dictOffsetBase;
+        dictionaryOffsetBuffer.putInt(i * Integer.BYTES, offset + dictOffsetBase);
+        System.out.println("dictId = " + i + ", offset = " + offset + ", newOffset = " + newOffset);
+      }
+
+      PinotDataBuffer invIndexOffsetBuffer = PinotDataBuffer
+          .mapFile(invertedIndexOffsetFile, false, 0, invertedIndexOffsetFile.length(), ByteOrder.BIG_ENDIAN,
+              "invertedIndexOffsetFile");
+      int invIndexOffsetBase = _invertedIndexOffsetWriter.size();
+      for (int i = 0; i < _dictId + 1; i++) {
+        int offset = invIndexOffsetBuffer.getInt(i * Integer.BYTES);
+        int newOffset = offset + invIndexOffsetBase;
+        System.out.println("offset = " + offset + ", newOffset = " + newOffset);
+
+        invIndexOffsetBuffer.putInt(i * Integer.BYTES, newOffset);
+      }
+
+      invIndexOffsetBuffer.close();
+      dictionaryOffsetBuffer.close();
+    }
+
+    public int getMaxDictionaryValueLength() {
+      return _maxDictionaryValueLength;
+    }
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+
+    String json0 = " { \"a\" : { \"b\" : 1, \"c\": 3, \"d\": [{\"x\" : 1}, {\"y\" : 1}] }, \"e\": \"f\", \"g\":2.3 }";
+    String json1 =
+        " { \"name\" : \"adam\", \"age\": 30, \"country\": \"us\", \"address\": {\"number\" : 112, \"street\": \"main st\", \"country\": \"us\"  } }";
+    String json2 = " { \"name\" : \"adam\", \"age\": 30 }";
+    String json3 = "{\n" + "  \"name\" : \"adam\",\n" + "  \"age\" : 30,\n" + "  \"country\" : \"us\",\n"
+        + "  \"addresses\" : [{\n" + "    \"number\" : 1,\n" + "    \"street\" : \"main st\",\n"
+        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 2,\n" + "    \"street\" : \"second st\",\n"
+        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 3,\n" + "    \"street\" : \"third st\",\n"
+        + "    \"country\" : \"us\"\n" + "  }]\n" + "}\n";
+
+    String json4 =
+        "{\n" + "    \"year\": [\n" + "        2018\n" + "    ],\n" + "    \"customers\": [\n" + "        {\n"
+            + "            \"name\": \"John\",\n" + "            \"contact\": [\n" + "                {\n"
+            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"333-3334\"\n"
+            + "                }\n" + "            ]\n" + "        },\n" + "        {\n"
+            + "            \"name\": \"Jane\",\n" + "            \"contact\": [\n" + "                {\n"
+            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"555-5556\"\n"
+            + "                }\n" + "            ],\n" + "            \"surname\": \"Shaw\"\n" + "        }\n"
+            + "    ]\n" + "}";
+
+    String json5 = "{ \n" + "  \"accounting\" : [   \n" + "                     { \"firstName\" : \"John\",  \n"
+        + "                       \"lastName\"  : \"Doe\",\n" + "                       \"age\"       : 23 },\n" + "\n"
+        + "                     { \"firstName\" : \"Mary\",  \n" + "                       \"lastName\"  : \"Smith\",\n"
+        + "                        \"age\"      : 32 }\n" + "                 ],                            \n"
+        + "  \"sales\"      : [ \n" + "                     { \"firstName\" : \"Sally\", \n"
+        + "                       \"lastName\"  : \"Green\",\n" + "                        \"age\"      : 27 },\n"
+        + "\n" + "                     { \"firstName\" : \"Jim\",   \n"
+        + "                       \"lastName\"  : \"Galley\",\n" + "                       \"age\"       : 41 }\n"
+        + "                 ] \n" + "} ";
+
+    String json = json3;
+    System.out.println("json = " + json);
+    JsonNode rawJsonNode = new ObjectMapper().readTree(json);
+
+    System.out.println(
+        "rawJsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(rawJsonNode));
+    String flattenJson = JsonFlattener.flatten(json);
+
+    System.out.println("flattenJson = " + flattenJson);
+    JsonNode jsonNode = new ObjectMapper().readTree(flattenJson);
+
+    System.out
+        .println("jsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode));
+    Map<String, Object> stringObjectMap = JsonFlattener.flattenAsMap(json);
+    System.out.println("JsonFlattener.flattenAsMap(json) = " + stringObjectMap);
+    FieldSpec fieldSpec = new DimensionFieldSpec();
+    fieldSpec.setName("person");
+    File tempDir = Files.createTempDir();
+    JSONIndexCreator creator = new JSONIndexCreator(tempDir, fieldSpec, FieldSpec.DataType.BYTES);
+    List<Map<String, String>> maps = creator.unnestJson(rawJsonNode);
+    System.out.println("maps = " + maps.toString().replaceAll("},", "}\n"));
+    creator.add(json.getBytes());
+
+    creator.seal();
+    System.out.println("Output Dir = " + tempDir);
+    System.out.println("FileUtils.listFiles(tempDir, null, true) = " + FileUtils.listFiles(tempDir, null, true).stream()
+        .map(file -> file.getName()).collect(Collectors.toList()));
+
+    //Test reader
+    PinotDataBuffer buffer =
+        PinotDataBuffer.mapReadOnlyBigEndianFile(new File(tempDir, fieldSpec.getName() + ".nested.idx"));
+    JSONIndexReader reader = new JSONIndexReader(buffer);
+    ExpressionContext lhs = ExpressionContext.forIdentifier("person");
+    Predicate predicate = new EqPredicate(lhs, "addresses.street" + POSTING_LIST_KEY_SEPARATOR + "third st");
+    MutableRoaringBitmap matchingDocIds = reader.getMatchingDocIds(predicate);
+    System.out.println("matchingDocIds = " + matchingDocIds);
+
+    //Test filter operator
+    FilterContext filterContext = QueryContextConverterUtils
+        .getFilter(CalciteSqlParser.compileToExpression("name='adam' AND addresses.street='main st'"));
+    int numDocs = 1;
+    JSONMatchFilterOperator operator = new JSONMatchFilterOperator("person", filterContext, reader, numDocs);
+    FilterBlock filterBlock = operator.nextBlock();
+    BlockDocIdIterator iterator = filterBlock.getBlockDocIdSet().iterator();
+    int docId = -1;
+    while ((docId = iterator.next()) > 0) {
+      System.out.println("docId = " + docId);
+    }
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java
deleted file mode 100644
index adfa19d..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/NestedObjectIndexCreator.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.creator.impl.inv;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.github.wnameless.json.flattener.JsonFlattener;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.pinot.spi.data.FieldSpec;
-
-
-public class NestedObjectIndexCreator {
-
-  public NestedObjectIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType) {
-
-  }
-
-  private static List<Map<String, String>> unnestJson(JsonNode root) {
-    Iterator<Map.Entry<String, JsonNode>> fields = root.fields();
-    Map<String, String> flattenedSingleValuesMap = new TreeMap<>();
-    Map<String, JsonNode> arrNodes = new TreeMap<>();
-    Map<String, JsonNode> objectNodes = new TreeMap<>();
-    List<Map<String, String>> resultList = new ArrayList<>();
-    List<Map<String, String>> tempResultList = new ArrayList<>();
-    while (fields.hasNext()) {
-      Map.Entry<String, JsonNode> child = fields.next();
-      if (child.getValue().isValueNode()) {
-        //Normal value node
-        flattenedSingleValuesMap.put(child.getKey(), child.getValue().asText());
-      } else if (child.getValue().isArray()) {
-        //Array Node: Process these nodes later
-        arrNodes.put(child.getKey(), child.getValue());
-      } else {
-        //Object Node
-        objectNodes.put(child.getKey(), child.getValue());
-      }
-    }
-    for (String objectNodeKey : objectNodes.keySet()) {
-      JsonNode objectNode = objectNodes.get(objectNodeKey);
-      modifyKeysInMap(flattenedSingleValuesMap, tempResultList, objectNodeKey, objectNode);
-    }
-    if (tempResultList.isEmpty()) {
-      tempResultList.add(flattenedSingleValuesMap);
-    }
-    if (!arrNodes.isEmpty()) {
-      for (Map<String, String> flattenedMapElement : tempResultList) {
-        for (String arrNodeKey : arrNodes.keySet()) {
-          JsonNode arrNode = arrNodes.get(arrNodeKey);
-          for (JsonNode arrNodeElement : arrNode) {
-            modifyKeysInMap(flattenedMapElement, resultList, arrNodeKey, arrNodeElement);
-          }
-        }
-      }
-    } else {
-      resultList.addAll(tempResultList);
-    }
-    return resultList;
-  }
-
-  private static void modifyKeysInMap(Map<String, String> flattenedMap, List<Map<String, String>> resultList,
-      String arrNodeKey, JsonNode arrNode) {
-    List<Map<String, String>> objectResult = unnestJson(arrNode);
-    for (Map<String, String> flattenedObject : objectResult) {
-      Map<String, String> flattenedObjectCopy = new TreeMap<>(flattenedMap);
-      for (Map.Entry<String, String> entry : flattenedObject.entrySet()) {
-        flattenedObjectCopy.put(arrNodeKey + "." + entry.getKey(), entry.getValue());
-      }
-      resultList.add(flattenedObjectCopy);
-    }
-  }
-
-  public void add(byte[] data)
-      throws IOException {
-
-    JsonNode jsonNode = new ObjectMapper().readTree(data);
-    List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
-    for (Map<String, String> map : flattenedMapList) {
-
-    }
-  }
-
-  public static void main(String[] args)
-      throws IOException {
-
-    String json0 = " { \"a\" : { \"b\" : 1, \"c\": 3, \"d\": [{\"x\" : 1}, {\"y\" : 1}] }, \"e\": \"f\", \"g\":2.3 }";
-    String json1 =
-        " { \"name\" : \"adam\", \"age\": 30, \"country\": \"us\", \"address\": {\"number\" : 112, \"street\": \"main st\", \"country\": \"us\"  } }";
-    String json2 = " { \"name\" : \"adam\", \"age\": 30 }";
-    String json3 = "{\n" + "  \"name\" : \"adam\",\n" + "  \"age\" : 30,\n" + "  \"country\" : \"us\",\n"
-        + "  \"addresses\" : [{\n" + "    \"number\" : 1,\n" + "    \"street\" : \"main st\",\n"
-        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 2,\n" + "    \"street\" : \"second st\",\n"
-        + "    \"country\" : \"us\"\n" + "  }, {\n" + "    \"number\" : 3,\n" + "    \"street\" : \"third st\",\n"
-        + "    \"country\" : \"us\"\n" + "  }]\n" + "}\n";
-
-    String json4 =
-        "{\n" + "    \"year\": [\n" + "        2018\n" + "    ],\n" + "    \"customers\": [\n" + "        {\n"
-            + "            \"name\": \"John\",\n" + "            \"contact\": [\n" + "                {\n"
-            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"333-3334\"\n"
-            + "                }\n" + "            ]\n" + "        },\n" + "        {\n"
-            + "            \"name\": \"Jane\",\n" + "            \"contact\": [\n" + "                {\n"
-            + "                    \"phone\": \"home\",\n" + "                    \"number\": \"555-5556\"\n"
-            + "                }\n" + "            ],\n" + "            \"surname\": \"Shaw\"\n" + "        }\n"
-            + "    ]\n" + "}";
-
-    String json5 = "{ \n" + "  \"accounting\" : [   \n" + "                     { \"firstName\" : \"John\",  \n"
-        + "                       \"lastName\"  : \"Doe\",\n" + "                       \"age\"       : 23 },\n" + "\n"
-        + "                     { \"firstName\" : \"Mary\",  \n" + "                       \"lastName\"  : \"Smith\",\n"
-        + "                        \"age\"      : 32 }\n" + "                 ],                            \n"
-        + "  \"sales\"      : [ \n" + "                     { \"firstName\" : \"Sally\", \n"
-        + "                       \"lastName\"  : \"Green\",\n" + "                        \"age\"      : 27 },\n"
-        + "\n" + "                     { \"firstName\" : \"Jim\",   \n"
-        + "                       \"lastName\"  : \"Galley\",\n" + "                       \"age\"       : 41 }\n"
-        + "                 ] \n" + "} ";
-
-    String json = json3;
-    System.out.println("json = " + json);
-    JsonNode rawJsonNode = new ObjectMapper().readTree(json);
-
-    System.out.println(
-        "rawJsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(rawJsonNode));
-    String flattenJson = JsonFlattener.flatten(json);
-
-    System.out.println("flattenJson = " + flattenJson);
-    JsonNode jsonNode = new ObjectMapper().readTree(flattenJson);
-
-    System.out
-        .println("jsonNode = " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode));
-    Map<String, Object> stringObjectMap = JsonFlattener.flattenAsMap(json);
-    System.out.println("JsonFlattener.flattenAsMap(json) = " + stringObjectMap);
-    NestedObjectIndexCreator creator = new NestedObjectIndexCreator(null, null, null);
-    List<Map<String, String>> maps = creator.unnestJson(rawJsonNode);
-    System.out.println("maps = " + maps.toString().replaceAll("},", "}\n"));
-//    creator.add(json.getBytes());
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index a6817a0..ecf7e30 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -47,6 +47,7 @@ public class IndexLoadingConfig {
   private ReadMode _readMode = ReadMode.DEFAULT_MODE;
   private List<String> _sortedColumns = Collections.emptyList();
   private Set<String> _invertedIndexColumns = new HashSet<>();
+  private Set<String> _jsonIndexColumns = new HashSet<>();
   private Set<String> _textIndexColumns = new HashSet<>();
   private Set<String> _rangeIndexColumns = new HashSet<>();
   private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
@@ -92,6 +93,11 @@ public class IndexLoadingConfig {
       _invertedIndexColumns.addAll(invertedIndexColumns);
     }
 
+    List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
+    if (jsonIndexColumns != null) {
+      _jsonIndexColumns.addAll(jsonIndexColumns);
+    }
+
     List<String> rangeIndexColumns = indexingConfig.getRangeIndexColumns();
     if (rangeIndexColumns != null) {
       _rangeIndexColumns.addAll(rangeIndexColumns);
@@ -222,6 +228,10 @@ public class IndexLoadingConfig {
     return _invertedIndexColumns;
   }
 
+  public Set<String> getJsonIndexColumns() {
+    return _invertedIndexColumns;
+  }
+
   public Set<String> getRangeIndexColumns() {
     return _rangeIndexColumns;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
index ca7f7e9..06a7adb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax
 import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
 import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory;
 import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import org.apache.pinot.core.segment.index.loader.invertedindex.JSONIndexHandler;
 import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler;
 import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
@@ -113,6 +114,11 @@ public class SegmentPreProcessor implements AutoCloseable {
           new RangeIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
       rangeIndexHandler.createRangeIndices();
 
+      // Create json range indices according to the index config.
+      JSONIndexHandler jsonIndexHandler =
+          new JSONIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
+      jsonIndexHandler.createJsonIndices();
+
       Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns();
       if (textIndexColumns.size() > 0) {
         TextIndexHandler textIndexHandler =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java
new file mode 100644
index 0000000..90ca344
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/JSONIndexHandler.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.segment.creator.impl.inv.JSONIndexCreator;
+import org.apache.pinot.core.segment.index.column.PhysicalColumnIndexContainer;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class JSONIndexHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(JSONIndexHandler.class);
+
+  private final File _indexDir;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final String _segmentName;
+  private final SegmentVersion _segmentVersion;
+  private final Set<ColumnMetadata> _jsonIndexColumns = new HashSet<>();
+
+  public JSONIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+      SegmentDirectory.Writer segmentWriter) {
+    _indexDir = indexDir;
+    _segmentWriter = segmentWriter;
+    _segmentName = segmentMetadata.getName();
+    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+    // Only create json index on dictionary-encoded unsorted columns
+    for (String column : indexLoadingConfig.getJsonIndexColumns()) {
+      ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+      if (columnMetadata != null && !columnMetadata.isSorted()) {
+        _jsonIndexColumns.add(columnMetadata);
+      }
+    }
+  }
+
+  public void createJsonIndices()
+      throws IOException {
+    for (ColumnMetadata columnMetadata : _jsonIndexColumns) {
+      createJSONIndexForColumn(columnMetadata);
+    }
+  }
+
+  private void createJSONIndexForColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    String column = columnMetadata.getColumnName();
+
+    File inProgress = new File(_indexDir, column + JSON_INDEX_FILE_EXTENSION + ".inprogress");
+    File jsonIndexFile = new File(_indexDir, column + JSON_INDEX_FILE_EXTENSION);
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+
+      if (_segmentWriter.hasIndexFor(column, ColumnIndexType.JSON_INDEX)) {
+        // Skip creating json index if already exists.
+
+        LOGGER.info("Found json index for segment: {}, column: {}", _segmentName, column);
+        return;
+      }
+
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run gets interrupted.
+
+      // Remove json index if exists.
+      // For v1 and v2, it's the actual json index. For v3, it's the temporary json index.
+      FileUtils.deleteQuietly(jsonIndexFile);
+    }
+
+    // Create new json index for the column.
+    LOGGER.info("Creating new json index for segment: {}, column: {}", _segmentName, column);
+    if (columnMetadata.hasDictionary()) {
+      handleDictionaryBasedColumn(columnMetadata);
+    } else {
+      handleNonDictionaryBasedColumn(columnMetadata);
+    }
+
+    // For v3, write the generated json index file into the single file and remove it.
+    if (_segmentVersion == SegmentVersion.v3) {
+      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, jsonIndexFile, ColumnIndexType.JSON_INDEX);
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created json index for segment: {}, column: {}", _segmentName, column);
+  }
+
+  private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    int numDocs = columnMetadata.getTotalDocs();
+    try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+        ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+        Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter);
+        JSONIndexCreator jsonIndexCreator = new JSONIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+            FieldSpec.DataType.BYTES)) {
+      if (columnMetadata.isSingleValue()) {
+        // Single-value column
+        for (int i = 0; i < numDocs; i++) {
+          int dictId = forwardIndexReader.getDictId(i, readerContext);
+          jsonIndexCreator.add(dictionary.getBytesValue(dictId));
+        }
+      } else {
+        // Multi-value column
+        throw new IllegalStateException("JSON Indexing is not supported on multi-valued columns ");
+      }
+      jsonIndexCreator.seal();
+    }
+  }
+
+  private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    FieldSpec.DataType dataType = columnMetadata.getDataType();
+    if(dataType != FieldSpec.DataType.BYTES || dataType != FieldSpec.DataType.STRING) {
+      throw new UnsupportedOperationException("JSON indexing is only supported for STRING/BYTES datatype but found: "+ dataType);
+    }
+    int numDocs = columnMetadata.getTotalDocs();
+    try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+        ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+        JSONIndexCreator jsonIndexCreator = new JSONIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+            dataType)) {
+      if (columnMetadata.isSingleValue()) {
+        // Single-value column.
+        switch (dataType) {
+          case STRING:
+          case BYTES:
+            for (int i = 0; i < numDocs; i++) {
+              jsonIndexCreator.add(forwardIndexReader.getBytes(i, readerContext));
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unsupported data type: " + dataType);
+        }
+      } else {
+        // Multi-value column
+        switch (dataType) {
+          default:
+            throw new IllegalStateException("JSON Indexing is not supported on multi-valued columns ");
+        }
+      }
+      jsonIndexCreator.seal();
+    }
+  }
+
+  private ForwardIndexReader<?> getForwardIndexReader(ColumnMetadata columnMetadata,
+      SegmentDirectory.Writer segmentWriter)
+      throws IOException {
+    PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX);
+    int numRows = columnMetadata.getTotalDocs();
+    int numBitsPerValue = columnMetadata.getBitsPerElement();
+    if (columnMetadata.isSingleValue()) {
+      return new FixedBitSVForwardIndexReader(buffer, numRows, numBitsPerValue);
+    } else {
+      return new FixedBitMVForwardIndexReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(),
+          numBitsPerValue);
+    }
+  }
+
+  private BaseImmutableDictionary getDictionaryReader(ColumnMetadata columnMetadata,
+      SegmentDirectory.Writer segmentWriter)
+      throws IOException {
+    PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
+    BaseImmutableDictionary dictionary = PhysicalColumnIndexContainer.loadDictionary(buffer, columnMetadata, false);
+    return dictionary;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java
new file mode 100644
index 0000000..efb6c8b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/JSONIndexReader.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class JSONIndexReader implements Closeable {
+
+  private static int EXPECTED_VERSION = 1;
+  private static int DICT_HEADER_INDEX = 0;
+  private static int DICT_OFFSET_INDEX = 1;
+  private static int DICT_DATA_INDEX = 2;
+  private static int INV_OFFSET_INDEX = 3;
+  private static int INV_DATA_INDEX = 4;
+  private static int FLATTENED_2_ROOT_INDEX = 5;
+
+  private final BitmapInvertedIndexReader invertedIndexReader;
+  private final StringDictionary dictionary;
+  private final long cardinality;
+  private final long numFlattenedDocs;
+
+  public JSONIndexReader(PinotDataBuffer pinotDataBuffer) {
+
+    int version = pinotDataBuffer.getInt(0);
+    int maxKeyLength = pinotDataBuffer.getInt(1 * Integer.BYTES);
+
+    Preconditions.checkState(version == EXPECTED_VERSION, String
+        .format("Index version:{} is not supported by this reader. expected version:{}", version, EXPECTED_VERSION));
+
+    // dictionaryHeaderFile, dictionaryOffsetFile, dictionaryFile, invIndexOffsetFile, invIndexFile, FlattenedDocId2DocIdMappingFile
+    int numBuffers = 6;
+    long bufferStartOffsets[] = new long[numBuffers];
+    long bufferSizeArray[] = new long[numBuffers];
+    for (int i = 0; i < numBuffers; i++) {
+      bufferStartOffsets[i] = pinotDataBuffer.getLong(2 * Integer.BYTES + 2 * i * Long.BYTES);
+      bufferSizeArray[i] = pinotDataBuffer.getLong(2 * Integer.BYTES + 2 * i * Long.BYTES + Long.BYTES);
+    }
+    cardinality = bufferSizeArray[DICT_OFFSET_INDEX] / Integer.BYTES - 1;
+    numFlattenedDocs = bufferSizeArray[FLATTENED_2_ROOT_INDEX] / Integer.BYTES;
+
+    long dictionaryStartOffset = bufferStartOffsets[DICT_HEADER_INDEX];
+    long dictionarySize =
+        bufferSizeArray[DICT_HEADER_INDEX] + bufferSizeArray[DICT_OFFSET_INDEX] + bufferSizeArray[DICT_DATA_INDEX];
+
+    //TODO: REMOVE DEBUG START
+    byte[] dictHeaderBytes = new byte[(int) bufferSizeArray[DICT_HEADER_INDEX]];
+    pinotDataBuffer.copyTo(bufferStartOffsets[DICT_HEADER_INDEX], dictHeaderBytes);
+    System.out.println("Arrays.toString(dictHeaderBytes) = " + Arrays.toString(dictHeaderBytes));
+    //TODO: REMOVE DEBUG  END
+
+    PinotDataBuffer dictionaryBuffer =
+        pinotDataBuffer.view(dictionaryStartOffset, dictionaryStartOffset + dictionarySize);
+    dictionary = new StringDictionary(dictionaryBuffer, (int) cardinality, maxKeyLength, Byte.valueOf("0"));
+
+    long invIndexStartOffset = bufferStartOffsets[INV_OFFSET_INDEX];
+    long invIndexSize = bufferSizeArray[INV_OFFSET_INDEX] + bufferSizeArray[INV_DATA_INDEX];
+
+    PinotDataBuffer invIndexBuffer = pinotDataBuffer.view(invIndexStartOffset, invIndexStartOffset + invIndexSize);
+    invertedIndexReader = new BitmapInvertedIndexReader(invIndexBuffer, (int) cardinality);
+
+    //TODO: REMOVE DEBUG START
+    for (int dictId = 0; dictId < dictionary.length(); dictId++) {
+      System.out.println("Key = " + new String(dictionary.getBytes(dictId)));
+      System.out.println("Posting List = " + invertedIndexReader.getDocIds(dictId));
+    }
+    //TODO: REMOVE DEBUG  END
+
+  }
+
+  /**
+   * Returns the matching document ids for the given search query.
+   */
+  public MutableRoaringBitmap getMatchingDocIds(Predicate predicate) {
+
+    PredicateEvaluator predicateEvaluator =
+        PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dictionary, FieldSpec.DataType.BYTES);
+    boolean exclusive = predicateEvaluator.isExclusive();
+    int[] dictIds = exclusive ? predicateEvaluator.getNonMatchingDictIds() : predicateEvaluator.getMatchingDictIds();
+    int numDictIds = dictIds.length;
+
+    if (numDictIds == 1) {
+      ImmutableRoaringBitmap docIds = (ImmutableRoaringBitmap) invertedIndexReader.getDocIds(dictIds[0]);
+      if (exclusive) {
+        if (docIds instanceof MutableRoaringBitmap) {
+          MutableRoaringBitmap mutableRoaringBitmap = (MutableRoaringBitmap) docIds;
+          mutableRoaringBitmap.flip(0L, numFlattenedDocs);
+          return mutableRoaringBitmap;
+        } else {
+          return ImmutableRoaringBitmap.flip(docIds, 0L, numFlattenedDocs);
+        }
+      } else {
+        return docIds.toMutableRoaringBitmap();
+      }
+    } else {
+      ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[numDictIds];
+      for (int i = 0; i < numDictIds; i++) {
+        bitmaps[i] = (ImmutableRoaringBitmap) invertedIndexReader.getDocIds(dictIds[i]);
+      }
+      MutableRoaringBitmap docIds = ImmutableRoaringBitmap.or(bitmaps);
+      if (exclusive) {
+        docIds.flip(0L, numFlattenedDocs);
+      }
+      return docIds;
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
index dcd21df..f9063b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
@@ -25,7 +25,8 @@ public enum ColumnIndexType {
   BLOOM_FILTER("bloom_filter"),
   NULLVALUE_VECTOR("nullvalue_vector"),
   TEXT_INDEX("text_index"),
-  RANGE_INDEX("range_index");
+  RANGE_INDEX("range_index"),
+  JSON_INDEX("json_index");
 
   private final String indexName;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
index 51dd2fb..4ffc133 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -26,6 +26,7 @@ import java.nio.ByteOrder;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -135,6 +136,10 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
       case TEXT_INDEX:
         filename = column + LuceneTextIndexCreator.LUCENE_TEXT_INDEX_FILE_EXTENSION;
         break;
+      case JSON_INDEX:
+        filename = column + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION;
+        break;
+
       default:
         throw new UnsupportedOperationException("Unknown index type: " + indexType.toString());
     }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 3dd137b..7965654 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -27,6 +27,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
 public class IndexingConfig extends BaseJsonConfig {
   private List<String> _invertedIndexColumns;
   private List<String> _rangeIndexColumns;
+  private List<String> _jsonIndexColumns;
   private boolean _autoGeneratedInvertedIndex;
   private boolean _createInvertedIndexDuringSegmentGeneration;
   private List<String> _sortedColumn;
@@ -71,6 +72,14 @@ public class IndexingConfig extends BaseJsonConfig {
     _rangeIndexColumns = rangeIndexColumns;
   }
 
+  public List<String> getJsonIndexColumns() {
+    return _jsonIndexColumns;
+  }
+
+  public void setJsonIndexColumns(List<String> jsonIndexColumns) {
+    _jsonIndexColumns = jsonIndexColumns;
+  }
+
   public boolean isAutoGeneratedInvertedIndex() {
     return _autoGeneratedInvertedIndex;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index b04ecb7..c9c245b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -337,7 +337,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec> {
   public enum DataType {
     // LIST is for complex lists which is different from multi-value column of primitives
     // STRUCT, MAP and LIST are composable to form a COMPLEX field
-    INT, LONG, FLOAT, DOUBLE, BOOLEAN/* Stored as STRING */, STRING, BYTES, STRUCT, MAP, LIST;
+    INT, LONG, FLOAT, DOUBLE, BOOLEAN/* Stored as STRING */, STRING, BYTES, STRUCT, MAP, LIST, JSON;
 
     /**
      * Returns the data type stored in Pinot.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org