You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/10/10 23:57:40 UTC

[pinot] branch master updated: Add more options to json index (#9543)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 65e01f1307 Add more options to json index (#9543)
65e01f1307 is described below

commit 65e01f1307c349d7cf46f257e0bb76d5f23d0b2d
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Oct 10 16:57:36 2022 -0700

    Add more options to json index (#9543)
---
 .../realtime/LLRealtimeSegmentDataManager.java     |   2 +-
 .../statement/JsonStatementOptimizer.java          |  14 +-
 .../combine/SelectionCombineOperatorTest.java      |   2 +-
 .../mutable/DefaultMutableIndexProvider.java       |   2 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |   6 +-
 .../local/realtime/impl/RealtimeSegmentConfig.java |  32 +-
 .../realtime/impl/json/MutableJsonIndexImpl.java   |  27 +-
 .../creator/impl/DefaultIndexCreatorProvider.java  |   6 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  14 +-
 .../impl/inv/json/BaseJsonIndexCreator.java        |  14 +-
 .../impl/inv/json/OffHeapJsonIndexCreator.java     |  15 +-
 .../impl/inv/json/OnHeapJsonIndexCreator.java      |   5 +-
 .../index/column/PhysicalColumnIndexContainer.java |   2 +-
 .../segment/index/loader/IndexLoadingConfig.java   |  35 +-
 .../loader/invertedindex/JsonIndexHandler.java     |  22 +-
 .../segment/local/utils/TableConfigUtils.java      |  25 +-
 .../indexsegment/mutable/IndexingFailureTest.java  |  15 +-
 .../mutable/MutableSegmentImplTestUtils.java       |  20 +-
 .../segment/local/segment/index/JsonIndexTest.java |  20 +-
 .../segment/spi/creator/IndexCreationContext.java  |  13 +-
 .../spi/creator/SegmentGeneratorConfig.java        |  20 +-
 .../mutable/provider/MutableIndexContext.java      |  13 +-
 .../pinot/spi/config/table/IndexingConfig.java     |  10 +
 .../pinot/spi/config/table/JsonIndexConfig.java    | 104 ++++
 .../java/org/apache/pinot/spi/utils/JsonUtils.java | 144 ++++--
 .../org/apache/pinot/spi/utils/JsonUtilsTest.java  | 536 ++++++++++++++++-----
 26 files changed, 837 insertions(+), 281 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 6f59c79709..5f2d733fe6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1346,7 +1346,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             .setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(indexLoadingConfig.getTextIndexColumns())
             .setFSTIndexColumns(indexLoadingConfig.getFSTIndexColumns())
-            .setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
+            .setJsonIndexConfigs(indexLoadingConfig.getJsonIndexConfigs())
             .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata)
             .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizer.java
index 8be4d2147c..138e55b3b3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizer.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
@@ -37,6 +38,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -436,12 +438,18 @@ public class JsonStatementOptimizer implements StatementOptimizer {
       return false;
     }
 
+    // Ignore jsonIndexColumns when jsonIndexConfigs is configured
+    Map<String, JsonIndexConfig> jsonIndexConfigs = indexingConfig.getJsonIndexConfigs();
+    if (jsonIndexConfigs != null) {
+      return jsonIndexConfigs.containsKey(columnName);
+    }
+
     List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
-    if (jsonIndexColumns == null) {
-      return false;
+    if (jsonIndexColumns != null) {
+      return jsonIndexColumns.contains(columnName);
     }
 
-    return jsonIndexColumns.contains(columnName);
+    return false;
   }
 
   /** @return symbolic representation of function operator delimited by spaces. */
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index 81947e25d0..e8db2051bf 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -116,7 +116,7 @@ public class SelectionCombineOperatorTest {
     RealtimeSegmentConfig realtimeSegmentConfig = new RealtimeSegmentConfig.Builder()
         .setTableNameWithType(REALTIME_TABLE_NAME).setSegmentName(segmentName).setSchema(SCHEMA).setCapacity(100000)
         .setAvgNumMultiValues(2).setNoDictionaryColumns(Collections.emptySet())
-        .setJsonIndexColumns(Collections.emptySet()).setVarLengthDictionaryColumns(Collections.emptySet())
+        .setJsonIndexConfigs(Collections.emptyMap()).setVarLengthDictionaryColumns(Collections.emptySet())
         .setInvertedIndexColumns(Collections.emptySet()).setSegmentZKMetadata(new SegmentZKMetadata(segmentName))
         .setMemoryManager(new DirectMemoryManager(segmentName)).setStatsHistory(statsHistory).setAggregateMetrics(false)
         .setNullHandlingEnabled(true).setIngestionAggregationConfigs(Collections.emptyList()).build();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
index 9a03397157..820c878c4e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
@@ -105,7 +105,7 @@ public class DefaultMutableIndexProvider implements MutableIndexProvider {
 
   @Override
   public MutableJsonIndex newJsonIndex(MutableIndexContext.Json context) {
-    return new MutableJsonIndexImpl();
+    return new MutableJsonIndexImpl(context.getJsonIndexConfig());
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index a91076b093..6e03a0e878 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -90,6 +90,7 @@ import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
@@ -244,7 +245,7 @@ public class MutableSegmentImpl implements MutableSegment {
     Set<String> invertedIndexColumns = config.getInvertedIndexColumns();
     Set<String> textIndexColumns = config.getTextIndexColumns();
     Set<String> fstIndexColumns = config.getFSTIndexColumns();
-    Set<String> jsonIndexColumns = config.getJsonIndexColumns();
+    Map<String, JsonIndexConfig> jsonIndexConfigs = config.getJsonIndexConfigs();
     Map<String, H3IndexConfig> h3IndexConfigs = config.getH3IndexConfigs();
 
     int avgNumMultiValues = config.getAvgNumMultiValues();
@@ -354,8 +355,9 @@ public class MutableSegmentImpl implements MutableSegment {
       }
 
       // Json index
+      JsonIndexConfig jsonIndexConfig = jsonIndexConfigs.get(column);
       MutableJsonIndex jsonIndex =
-          jsonIndexColumns.contains(column) ? indexProvider.newJsonIndex(context.forJsonIndex()) : null;
+          jsonIndexConfig != null ? indexProvider.newJsonIndex(context.forJsonIndex(jsonIndexConfig)) : null;
 
       // H3 index
       // TODO consider making this overridable
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 19886adcca..9af8367b49 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -31,6 +31,7 @@ import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -49,7 +50,7 @@ public class RealtimeSegmentConfig {
   private final Set<String> _invertedIndexColumns;
   private final Set<String> _textIndexColumns;
   private final Set<String> _fstIndexColumns;
-  private final Set<String> _jsonIndexColumns;
+  private final Map<String, JsonIndexConfig> _jsonIndexConfigs;
   private final Map<String, H3IndexConfig> _h3IndexConfigs;
   private final SegmentZKMetadata _segmentZKMetadata;
   private final boolean _offHeap;
@@ -72,11 +73,11 @@ public class RealtimeSegmentConfig {
   private RealtimeSegmentConfig(String tableNameWithType, String segmentName, String streamName, Schema schema,
       String timeColumnName, int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns,
       Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, Set<String> textIndexColumns,
-      Set<String> fstIndexColumns, Set<String> jsonIndexColumns, Map<String, H3IndexConfig> h3IndexConfigs,
-      SegmentZKMetadata segmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager,
-      RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction,
-      int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir,
-      UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
+      Set<String> fstIndexColumns, Map<String, JsonIndexConfig> jsonIndexConfigs,
+      Map<String, H3IndexConfig> h3IndexConfigs, SegmentZKMetadata segmentZKMetadata, boolean offHeap,
+      PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
+      PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
+      String consumerDir, UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
       PartitionUpsertMetadataManager partitionUpsertMetadataManager,
       PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
       List<AggregationConfig> ingestionAggregationConfigs) {
@@ -92,7 +93,7 @@ public class RealtimeSegmentConfig {
     _invertedIndexColumns = invertedIndexColumns;
     _textIndexColumns = textIndexColumns;
     _fstIndexColumns = fstIndexColumns;
-    _jsonIndexColumns = jsonIndexColumns;
+    _jsonIndexConfigs = jsonIndexConfigs;
     _h3IndexConfigs = h3IndexConfigs;
     _segmentZKMetadata = segmentZKMetadata;
     _offHeap = offHeap;
@@ -165,8 +166,8 @@ public class RealtimeSegmentConfig {
     return _fstIndexColumns;
   }
 
-  public Set<String> getJsonIndexColumns() {
-    return _jsonIndexColumns;
+  public Map<String, JsonIndexConfig> getJsonIndexConfigs() {
+    return _jsonIndexConfigs;
   }
 
   public Map<String, H3IndexConfig> getH3IndexConfigs() {
@@ -254,7 +255,7 @@ public class RealtimeSegmentConfig {
     private Set<String> _invertedIndexColumns;
     private Set<String> _textIndexColumns = new HashSet<>();
     private Set<String> _fstIndexColumns = new HashSet<>();
-    private Set<String> _jsonIndexColumns = new HashSet<>();
+    private Map<String, JsonIndexConfig> _jsonIndexConfigs = new HashMap<>();
     private Map<String, H3IndexConfig> _h3IndexConfigs = new HashMap<>();
     private SegmentZKMetadata _segmentZKMetadata;
     private boolean _offHeap;
@@ -344,8 +345,8 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
-    public Builder setJsonIndexColumns(Set<String> jsonIndexColumns) {
-      _jsonIndexColumns = jsonIndexColumns;
+    public Builder setJsonIndexConfigs(Map<String, JsonIndexConfig> jsonIndexConfigs) {
+      _jsonIndexConfigs = jsonIndexConfigs;
       return this;
     }
 
@@ -437,11 +438,10 @@ public class RealtimeSegmentConfig {
     public RealtimeSegmentConfig build() {
       return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, _streamName, _schema, _timeColumnName,
           _capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
-          _textIndexColumns, _fstIndexColumns, _jsonIndexColumns, _h3IndexConfigs, _segmentZKMetadata, _offHeap,
+          _textIndexColumns, _fstIndexColumns, _jsonIndexConfigs, _h3IndexConfigs, _segmentZKMetadata, _offHeap,
           _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
-          _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn,
-          _partitionUpsertMetadataManager, _partitionDedupMetadataManager, _fieldConfigList,
-          _ingestionAggregationConfigs);
+          _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn, _partitionUpsertMetadataManager,
+          _partitionDedupMetadataManager, _fieldConfigList, _ingestionAggregationConfigs);
     }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index 49588358e5..c8a0bfc166 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.json.BaseJsonIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -49,6 +50,7 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  * Json index for mutable segment.
  */
 public class MutableJsonIndexImpl implements MutableJsonIndex {
+  private final JsonIndexConfig _jsonIndexConfig;
   private final Map<String, RoaringBitmap> _postingListMap;
   private final IntList _docIdMapping;
   private final ReentrantReadWriteLock.ReadLock _readLock;
@@ -57,7 +59,8 @@ public class MutableJsonIndexImpl implements MutableJsonIndex {
   private int _nextDocId;
   private int _nextFlattenedDocId;
 
-  public MutableJsonIndexImpl() {
+  public MutableJsonIndexImpl(JsonIndexConfig jsonIndexConfig) {
+    _jsonIndexConfig = jsonIndexConfig;
     _postingListMap = new HashMap<>();
     _docIdMapping = new IntArrayList();
 
@@ -73,7 +76,8 @@ public class MutableJsonIndexImpl implements MutableJsonIndex {
   public void add(String jsonString)
       throws IOException {
     try {
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString));
+      List<Map<String, String>> flattenedRecords =
+          JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig);
       _writeLock.lock();
       try {
         addFlattenedRecords(flattenedRecords);
@@ -90,8 +94,8 @@ public class MutableJsonIndexImpl implements MutableJsonIndex {
    */
   private void addFlattenedRecords(List<Map<String, String>> records) {
     int numRecords = records.size();
-    Preconditions
-        .checkState(_nextFlattenedDocId + numRecords >= 0, "Got more than %s flattened records", Integer.MAX_VALUE);
+    Preconditions.checkState(_nextFlattenedDocId + numRecords >= 0, "Got more than %s flattened records",
+        Integer.MAX_VALUE);
     for (int i = 0; i < numRecords; i++) {
       _docIdMapping.add(_nextDocId);
     }
@@ -125,15 +129,15 @@ public class MutableJsonIndexImpl implements MutableJsonIndex {
         // order to get the correct result, and it cannot be nested
         RoaringBitmap matchingFlattenedDocIds = getMatchingFlattenedDocIds(filter.getPredicate());
         MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
-        matchingFlattenedDocIds
-            .forEach((IntConsumer) flattenedDocId -> matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
+        matchingFlattenedDocIds.forEach(
+            (IntConsumer) flattenedDocId -> matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
         matchingDocIds.flip(0, (long) _nextDocId);
         return matchingDocIds;
       } else {
         RoaringBitmap matchingFlattenedDocIds = getMatchingFlattenedDocIds(filter);
         MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
-        matchingFlattenedDocIds
-            .forEach((IntConsumer) flattenedDocId -> matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
+        matchingFlattenedDocIds.forEach(
+            (IntConsumer) flattenedDocId -> matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
         return matchingDocIds;
       }
     } finally {
@@ -174,8 +178,8 @@ public class MutableJsonIndexImpl implements MutableJsonIndex {
       }
       case PREDICATE: {
         Predicate predicate = filter.getPredicate();
-        Preconditions
-            .checkArgument(!isExclusive(predicate.getType()), "Exclusive predicate: %s cannot be nested", predicate);
+        Preconditions.checkArgument(!isExclusive(predicate.getType()), "Exclusive predicate: %s cannot be nested",
+            predicate);
         return getMatchingFlattenedDocIds(predicate);
       }
       default:
@@ -192,8 +196,7 @@ public class MutableJsonIndexImpl implements MutableJsonIndex {
     ExpressionContext lhs = predicate.getLhs();
     Preconditions.checkArgument(lhs.getType() == ExpressionContext.Type.IDENTIFIER,
         "Left-hand side of the predicate must be an identifier, got: %s (%s). Put double quotes around the identifier"
-            + " if needed.",
-        lhs, lhs.getType());
+            + " if needed.", lhs, lhs.getType());
     String key = lhs.getIdentifier();
 
     // Support 2 formats:
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
index cd3a5603db..7cb36a4db6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
@@ -122,8 +122,10 @@ public final class DefaultIndexCreatorProvider implements IndexCreatorProvider {
         "Json index is currently only supported on single-value columns");
     Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING,
         "Json index is currently only supported on STRING columns");
-    return context.isOnHeap() ? new OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName())
-        : new OffHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName());
+    return context.isOnHeap() ? new OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+        context.getJsonIndexConfig())
+        : new OffHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+            context.getJsonIndexConfig());
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 7de25bcebc..d27b33e7a5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -60,6 +60,7 @@ import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.config.table.FSTType;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
@@ -167,11 +168,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       fstIndexColumns.add(columnName);
     }
 
-    Set<String> jsonIndexColumns = new HashSet<>();
-    for (String columnName : _config.getJsonIndexCreationColumns()) {
+    Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs();
+    for (String columnName : jsonIndexConfigs.keySet()) {
       Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create text index for column: %s because it is not in schema", columnName);
-      jsonIndexColumns.add(columnName);
+          "Cannot create json index for column: %s because it is not in schema", columnName);
     }
 
     Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
@@ -278,8 +278,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
                 (String[]) columnIndexCreationInfo.getSortedUniqueElementsArray())));
       }
 
-      if (jsonIndexColumns.contains(columnName)) {
-        _jsonIndexCreatorMap.put(columnName, _indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex()));
+      JsonIndexConfig jsonIndexConfig = jsonIndexConfigs.get(columnName);
+      if (jsonIndexConfig != null) {
+        _jsonIndexCreatorMap.put(columnName,
+            _indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex(jsonIndexConfig)));
       }
 
       H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
index 88d74a7884..e2afca19fa 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
@@ -34,6 +34,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.roaringbitmap.Container;
 import org.roaringbitmap.RoaringBitmap;
@@ -61,20 +62,21 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator {
   static final String DICTIONARY_FILE_NAME = "dictionary.buf";
   static final String INVERTED_INDEX_FILE_NAME = "inverted.index.buf";
 
+  final JsonIndexConfig _jsonIndexConfig;
   final File _indexFile;
   final File _tempDir;
   final File _dictionaryFile;
   final File _invertedIndexFile;
   final IntList _numFlattenedRecordsList = new IntArrayList();
   final Map<String, RoaringBitmapWriter<RoaringBitmap>> _postingListMap = new TreeMap<>();
-  final RoaringBitmapWriter.Wizard<Container, RoaringBitmap> _bitmapWriterWizard =
-      RoaringBitmapWriter.writer();
+  final RoaringBitmapWriter.Wizard<Container, RoaringBitmap> _bitmapWriterWizard = RoaringBitmapWriter.writer();
 
   int _nextFlattenedDocId;
   int _maxValueLength;
 
-  BaseJsonIndexCreator(File indexDir, String columnName)
+  BaseJsonIndexCreator(File indexDir, String columnName, JsonIndexConfig jsonIndexConfig)
       throws IOException {
+    _jsonIndexConfig = jsonIndexConfig;
     _indexFile = new File(indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
     _tempDir = new File(indexDir, columnName + TEMP_DIR_SUFFIX);
     if (_tempDir.exists()) {
@@ -89,7 +91,7 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator {
   @Override
   public void add(String jsonString)
       throws IOException {
-    addFlattenedRecords(JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString)));
+    addFlattenedRecords(JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig));
   }
 
   /**
@@ -98,8 +100,8 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator {
   void addFlattenedRecords(List<Map<String, String>> records)
       throws IOException {
     int numRecords = records.size();
-    Preconditions
-        .checkState(_nextFlattenedDocId + numRecords >= 0, "Got more than %s flattened records", Integer.MAX_VALUE);
+    Preconditions.checkState(_nextFlattenedDocId + numRecords >= 0, "Got more than %s flattened records",
+        Integer.MAX_VALUE);
     _numFlattenedRecordsList.add(numRecords);
     for (Map<String, String> record : records) {
       for (Map.Entry<String, String> entry : record.entrySet()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
index a136eccd31..30c5d2b63b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
@@ -35,6 +35,7 @@ import org.apache.pinot.segment.local.io.util.VarLengthValueWriter;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.roaringbitmap.RoaringBitmap;
 import org.roaringbitmap.RoaringBitmapWriter;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@@ -69,9 +70,9 @@ public class OffHeapJsonIndexCreator extends BaseJsonIndexCreator {
   private int _numPostingListsInLastChunk;
   private int _numPostingLists;
 
-  public OffHeapJsonIndexCreator(File indexDir, String columnName)
+  public OffHeapJsonIndexCreator(File indexDir, String columnName, JsonIndexConfig jsonIndexConfig)
       throws IOException {
-    super(indexDir, columnName);
+    super(indexDir, columnName, jsonIndexConfig);
     _postingListFile = new File(_tempDir, POSTING_LIST_FILE_NAME);
     _postingListOutputStream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(_postingListFile)));
   }
@@ -133,9 +134,8 @@ public class OffHeapJsonIndexCreator extends BaseJsonIndexCreator {
     }
 
     // Read the final posting list file and create the dictionary and inverted index file
-    try (PinotDataBuffer finalPostingListBuffer = PinotDataBuffer
-        .mapFile(finalPostingListFile, true, 0, finalPostingListFile.length(), ByteOrder.BIG_ENDIAN,
-            "Json index final posting list");
+    try (PinotDataBuffer finalPostingListBuffer = PinotDataBuffer.mapFile(finalPostingListFile, true, 0,
+        finalPostingListFile.length(), ByteOrder.BIG_ENDIAN, "Json index final posting list");
         VarLengthValueWriter dictionaryWriter = new VarLengthValueWriter(_dictionaryFile, _numPostingLists);
         BitmapInvertedIndexWriter invertedIndexWriter = new BitmapInvertedIndexWriter(_invertedIndexFile,
             _numPostingLists)) {
@@ -162,9 +162,8 @@ public class OffHeapJsonIndexCreator extends BaseJsonIndexCreator {
   private File createFinalPostingListFile(byte[] valueBytesBuffer)
       throws IOException {
     File finalPostingListFile = new File(_tempDir, FINAL_POSTING_LIST_FILE_NAME);
-    try (PinotDataBuffer postingListBuffer = PinotDataBuffer
-        .mapFile(_postingListFile, true, 0, _postingListFile.length(), ByteOrder.BIG_ENDIAN,
-            "Json index posting list")) {
+    try (PinotDataBuffer postingListBuffer = PinotDataBuffer.mapFile(_postingListFile, true, 0,
+        _postingListFile.length(), ByteOrder.BIG_ENDIAN, "Json index posting list")) {
       // Create chunk iterators from the posting list file
       int numChunks = _postingListChunkEndOffsets.size();
       List<ChunkIterator> chunkIterators = new ArrayList<>(numChunks);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
index bccaf1579e..bb45728c3f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.pinot.segment.local.io.util.VarLengthValueWriter;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.roaringbitmap.RoaringBitmap;
 import org.roaringbitmap.RoaringBitmapWriter;
 
@@ -38,9 +39,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  */
 public class OnHeapJsonIndexCreator extends BaseJsonIndexCreator {
 
-  public OnHeapJsonIndexCreator(File indexDir, String columnName)
+  public OnHeapJsonIndexCreator(File indexDir, String columnName, JsonIndexConfig jsonIndexConfig)
       throws IOException {
-    super(indexDir, columnName);
+    super(indexDir, columnName, jsonIndexConfig);
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index 209f2040e8..beeb768a5c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -83,7 +83,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
     boolean loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName);
     boolean loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName);
     boolean loadFSTIndex = indexLoadingConfig.getFSTIndexColumns().contains(columnName);
-    boolean loadJsonIndex = indexLoadingConfig.getJsonIndexColumns().contains(columnName);
+    boolean loadJsonIndex = indexLoadingConfig.getJsonIndexConfigs().containsKey(columnName);
     boolean loadH3Index = indexLoadingConfig.getH3IndexConfigs().containsKey(columnName);
     boolean loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
     BloomFilterConfig bloomFilterConfig = indexLoadingConfig.getBloomFilterConfigs().get(columnName);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 78dc118a78..d83ebbfe58 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.config.table.FSTType;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
@@ -66,11 +67,11 @@ public class IndexLoadingConfig {
   private Set<String> _textIndexColumns = new HashSet<>();
   private Set<String> _fstIndexColumns = new HashSet<>();
   private FSTType _fstIndexType = FSTType.LUCENE;
-  private Set<String> _jsonIndexColumns = new HashSet<>();
+  private Map<String, JsonIndexConfig> _jsonIndexConfigs = new HashMap<>();
   private Map<String, H3IndexConfig> _h3IndexConfigs = new HashMap<>();
   private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
-  private Map<String, String> _noDictionaryConfig = new HashMap<>();
-  private Set<String> _varLengthDictionaryColumns = new HashSet<>();
+  private final Map<String, String> _noDictionaryConfig = new HashMap<>();
+  private final Set<String> _varLengthDictionaryColumns = new HashSet<>();
   private Set<String> _onHeapDictionaryColumns = new HashSet<>();
   private Map<String, BloomFilterConfig> _bloomFilterConfigs = new HashMap<>();
   private boolean _enableDynamicStarTreeCreation;
@@ -118,9 +119,18 @@ public class IndexLoadingConfig {
       _invertedIndexColumns.addAll(invertedIndexColumns);
     }
 
-    List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
-    if (jsonIndexColumns != null) {
-      _jsonIndexColumns.addAll(jsonIndexColumns);
+    // Ignore jsonIndexColumns when jsonIndexConfigs is configured
+    Map<String, JsonIndexConfig> jsonIndexConfigs = indexingConfig.getJsonIndexConfigs();
+    if (jsonIndexConfigs != null) {
+      _jsonIndexConfigs = jsonIndexConfigs;
+    } else {
+      List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
+      if (jsonIndexColumns != null) {
+        _jsonIndexConfigs = new HashMap<>();
+        for (String jsonIndexColumn : jsonIndexColumns) {
+          _jsonIndexConfigs.put(jsonIndexColumn, new JsonIndexConfig());
+        }
+      }
     }
 
     List<String> rangeIndexColumns = indexingConfig.getRangeIndexColumns();
@@ -372,8 +382,8 @@ public class IndexLoadingConfig {
     return _fstIndexColumns;
   }
 
-  public Set<String> getJsonIndexColumns() {
-    return _jsonIndexColumns;
+  public Map<String, JsonIndexConfig> getJsonIndexConfigs() {
+    return _jsonIndexConfigs;
   }
 
   public Map<String, H3IndexConfig> getH3IndexConfigs() {
@@ -445,7 +455,14 @@ public class IndexLoadingConfig {
 
   @VisibleForTesting
   public void setJsonIndexColumns(Set<String> jsonIndexColumns) {
-    _jsonIndexColumns = jsonIndexColumns;
+    if (jsonIndexColumns != null) {
+      _jsonIndexConfigs = new HashMap<>();
+      for (String jsonIndexColumn : jsonIndexColumns) {
+        _jsonIndexConfigs.put(jsonIndexColumn, new JsonIndexConfig());
+      }
+    } else {
+      _jsonIndexConfigs = null;
+    }
   }
 
   @VisibleForTesting
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
index e064448673..dcdaf7f81d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
@@ -40,6 +41,7 @@ import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,17 +52,17 @@ public class JsonIndexHandler implements IndexHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(JsonIndexHandler.class);
 
   private final SegmentMetadata _segmentMetadata;
-  private final Set<String> _columnsToAddIdx;
+  private final Map<String, JsonIndexConfig> _jsonIndexConfigs;
 
   public JsonIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
     _segmentMetadata = segmentMetadata;
-    _columnsToAddIdx = indexLoadingConfig.getJsonIndexColumns();
+    _jsonIndexConfigs = indexLoadingConfig.getJsonIndexConfigs();
   }
 
   @Override
   public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
     String segmentName = _segmentMetadata.getName();
-    Set<String> columnsToAddIdx = new HashSet<>(_columnsToAddIdx);
+    Set<String> columnsToAddIdx = new HashSet<>(_jsonIndexConfigs.keySet());
     Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.JSON_INDEX);
     // Check if any existing index need to be removed.
     for (String column : existingColumns) {
@@ -85,7 +87,7 @@ public class JsonIndexHandler implements IndexHandler {
       throws Exception {
     // Remove indices not set in table config any more
     String segmentName = _segmentMetadata.getName();
-    Set<String> columnsToAddIdx = new HashSet<>(_columnsToAddIdx);
+    Set<String> columnsToAddIdx = new HashSet<>(_jsonIndexConfigs.keySet());
     Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.JSON_INDEX);
     for (String column : existingColumns) {
       if (!columnsToAddIdx.remove(column)) {
@@ -152,11 +154,13 @@ public class JsonIndexHandler implements IndexHandler {
       JsonIndexCreatorProvider indexCreatorProvider)
       throws IOException {
     File indexDir = _segmentMetadata.getIndexDir();
+    String columnName = columnMetadata.getColumnName();
     try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
         Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata);
-        JsonIndexCreator jsonIndexCreator = indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
-            .withIndexDir(indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
+        JsonIndexCreator jsonIndexCreator = indexCreatorProvider.newJsonIndexCreator(
+            IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
+                .forJsonIndex(_jsonIndexConfigs.get(columnName)))) {
       int numDocs = columnMetadata.getTotalDocs();
       for (int i = 0; i < numDocs; i++) {
         int dictId = forwardIndexReader.getDictId(i, readerContext);
@@ -170,10 +174,12 @@ public class JsonIndexHandler implements IndexHandler {
       JsonIndexCreatorProvider indexCreatorProvider)
       throws IOException {
     File indexDir = _segmentMetadata.getIndexDir();
+    String columnName = columnMetadata.getColumnName();
     try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        JsonIndexCreator jsonIndexCreator = indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
-            .withIndexDir(indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
+        JsonIndexCreator jsonIndexCreator = indexCreatorProvider.newJsonIndexCreator(
+            IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
+                .forJsonIndex(_jsonIndexConfigs.get(columnName)))) {
       int numDocs = columnMetadata.getTotalDocs();
       for (int i = 0; i < numDocs; i++) {
         jsonIndexCreator.add(forwardIndexReader.getString(i, readerContext));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 8e880865c0..ed6e4d9095 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -754,11 +754,18 @@ public final class TableConfigUtils {
         columnNameToConfigMap.put(columnName, "Segment Partition Config");
       }
     }
-    if (indexingConfig.getJsonIndexColumns() != null) {
-      for (String columnName : indexingConfig.getJsonIndexColumns()) {
-        columnNameToConfigMap.put(columnName, "Json Index Config");
+    Set<String> jsonIndexColumns = new HashSet<>();
+    // Ignore jsonIndexColumns when jsonIndexConfigs is configured
+    if (indexingConfig.getJsonIndexConfigs() != null) {
+      jsonIndexColumns.addAll(indexingConfig.getJsonIndexConfigs().keySet());
+    } else {
+      if (indexingConfig.getJsonIndexColumns() != null) {
+        jsonIndexColumns.addAll(indexingConfig.getJsonIndexColumns());
       }
     }
+    for (String columnName : jsonIndexColumns) {
+      columnNameToConfigMap.put(columnName, "Json Index Config");
+    }
 
     List<StarTreeIndexConfig> starTreeIndexConfigList = indexingConfig.getStarTreeIndexConfigs();
     if (starTreeIndexConfigList != null) {
@@ -828,13 +835,11 @@ public final class TableConfigUtils {
       }
     }
 
-    if (indexingConfig.getJsonIndexColumns() != null) {
-      for (String jsonIndexCol : indexingConfig.getJsonIndexColumns()) {
-        FieldSpec fieldSpec = schema.getFieldSpecFor(jsonIndexCol);
-        Preconditions.checkState(
-            fieldSpec.isSingleValueField() && fieldSpec.getDataType().getStoredType() == DataType.STRING,
-            "Json index can only be created for single value String column. Invalid for column: %s", jsonIndexCol);
-      }
+    for (String jsonIndexColumn : jsonIndexColumns) {
+      FieldSpec fieldSpec = schema.getFieldSpecFor(jsonIndexColumn);
+      Preconditions.checkState(
+          fieldSpec.isSingleValueField() && fieldSpec.getDataType().getStoredType() == DataType.STRING,
+          "Json index can only be created for single value String column. Invalid for column: %s", jsonIndexColumn);
     }
   }
 
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
index 5207884ab0..30be137465 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -52,13 +53,12 @@ public class IndexingFailureTest {
   public void setup() {
     Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
         .addSingleValueDimension(STRING_COL, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(JSON_COL, FieldSpec.DataType.JSON)
-        .setSchemaName(TABLE_NAME)
-        .build();
+        .addSingleValueDimension(JSON_COL, FieldSpec.DataType.JSON).setSchemaName(TABLE_NAME).build();
     _serverMetrics = mock(ServerMetrics.class);
-    _mutableSegment = MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
-        Collections.emptySet(), Collections.emptySet(), new HashSet<>(Arrays.asList(INT_COL, STRING_COL)),
-        Collections.singleton(JSON_COL), _serverMetrics);
+    _mutableSegment =
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, Collections.emptySet(), Collections.emptySet(),
+            new HashSet<>(Arrays.asList(INT_COL, STRING_COL)),
+            Collections.singletonMap(JSON_COL, new JsonIndexConfig()), _serverMetrics);
   }
 
   @Test
@@ -125,7 +125,6 @@ public class IndexingFailureTest {
     assertTrue(_mutableSegment.getDataSource(STRING_COL).getNullValueVector().isNull(3));
     // null string value skipped
     verify(_serverMetrics, times(1)).addMeteredTableValue(matches("DICTIONARY-indexingError$"),
-        eq(ServerMeter.INDEXING_FAILURES),
-        eq(1L));
+        eq(ServerMeter.INDEXING_FAILURES), eq(1L));
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 43309ea52a..4c0a924c81 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.indexsegment.mutable;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -28,6 +29,7 @@ import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -49,7 +51,7 @@ public class MutableSegmentImplTestUtils {
       Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
       List<AggregationConfig> preAggregationConfigs) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, varLengthDictionaryColumns, invertedIndexColumns,
-        Collections.emptySet(), false, false, null, null, null, null, null, preAggregationConfigs);
+        Collections.emptyMap(), false, false, null, null, null, null, null, preAggregationConfigs);
   }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns,
@@ -71,21 +73,21 @@ public class MutableSegmentImplTestUtils {
       PartitionUpsertMetadataManager partitionUpsertMetadataManager,
       PartitionDedupMetadataManager partitionDedupMetadataManager) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, varLengthDictionaryColumns, invertedIndexColumns,
-        Collections.emptySet(), aggregateMetrics, nullHandlingEnabled, upsertConfig, timeColumnName,
+        Collections.emptyMap(), aggregateMetrics, nullHandlingEnabled, upsertConfig, timeColumnName,
         partitionUpsertMetadataManager, partitionDedupMetadataManager, null, Collections.emptyList());
   }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns,
-      Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, Set<String> jsonIndexColumns,
-      ServerMetrics serverMetrics) {
+      Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
+      Map<String, JsonIndexConfig> jsonIndexConfigs, ServerMetrics serverMetrics) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, varLengthDictionaryColumns, invertedIndexColumns,
-        jsonIndexColumns, false, true, null, null, null, null, serverMetrics, Collections.emptyList());
+        jsonIndexConfigs, false, true, null, null, null, null, serverMetrics, Collections.emptyList());
   }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns,
-      Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, Set<String> jsonIndexColumns,
-      boolean aggregateMetrics, boolean nullHandlingEnabled, UpsertConfig upsertConfig, String timeColumnName,
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+      Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
+      Map<String, JsonIndexConfig> jsonIndexConfigs, boolean aggregateMetrics, boolean nullHandlingEnabled,
+      UpsertConfig upsertConfig, String timeColumnName, PartitionUpsertMetadataManager partitionUpsertMetadataManager,
       PartitionDedupMetadataManager partitionDedupMetadataManager, ServerMetrics serverMetrics,
       List<AggregationConfig> aggregationConfigs) {
 
@@ -98,7 +100,7 @@ public class MutableSegmentImplTestUtils {
     RealtimeSegmentConfig realtimeSegmentConfig =
         new RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
             .setStreamName(STREAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
-            .setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns).setJsonIndexColumns(jsonIndexColumns)
+            .setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns).setJsonIndexConfigs(jsonIndexConfigs)
             .setVarLengthDictionaryColumns(varLengthDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
             .setSegmentZKMetadata(new SegmentZKMetadata(SEGMENT_NAME))
             .setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
index c62ddbcff8..656b47a9c5 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -78,7 +79,8 @@ public class JsonIndexTest {
     // @formatter: on
 
     String onHeapColumnName = "onHeap";
-    try (JsonIndexCreator onHeapIndexCreator = new OnHeapJsonIndexCreator(INDEX_DIR, onHeapColumnName)) {
+    try (JsonIndexCreator onHeapIndexCreator = new OnHeapJsonIndexCreator(INDEX_DIR, onHeapColumnName,
+        new JsonIndexConfig())) {
       for (String record : records) {
         onHeapIndexCreator.add(record);
       }
@@ -88,7 +90,8 @@ public class JsonIndexTest {
     Assert.assertTrue(onHeapIndexFile.exists());
 
     String offHeapColumnName = "offHeap";
-    try (JsonIndexCreator offHeapIndexCreator = new OffHeapJsonIndexCreator(INDEX_DIR, offHeapColumnName)) {
+    try (JsonIndexCreator offHeapIndexCreator = new OffHeapJsonIndexCreator(INDEX_DIR, offHeapColumnName,
+        new JsonIndexConfig())) {
       for (String record : records) {
         offHeapIndexCreator.add(record);
       }
@@ -101,7 +104,7 @@ public class JsonIndexTest {
         PinotDataBuffer offHeapDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
         JsonIndexReader onHeapIndexReader = new ImmutableJsonIndexReader(onHeapDataBuffer, records.length);
         JsonIndexReader offHeapIndexReader = new ImmutableJsonIndexReader(offHeapDataBuffer, records.length);
-        MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl()) {
+        MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(new JsonIndexConfig())) {
       for (String record : records) {
         mutableJsonIndex.add(record);
       }
@@ -158,12 +161,12 @@ public class JsonIndexTest {
     for (int i = 0; i < numRecords; i++) {
       records[i] = String.format(
           "{\"name\":\"adam-%d\",\"addresses\":[{\"street\":\"us-%d\",\"country\":\"us\"},{\"street\":\"ca-%d\","
-              + "\"country\":\"ca\"}]}",
-          i, i, i);
+              + "\"country\":\"ca\"}]}", i, i, i);
     }
 
     String onHeapColumnName = "onHeap";
-    try (JsonIndexCreator onHeapIndexCreator = new OnHeapJsonIndexCreator(INDEX_DIR, onHeapColumnName)) {
+    try (JsonIndexCreator onHeapIndexCreator = new OnHeapJsonIndexCreator(INDEX_DIR, onHeapColumnName,
+        new JsonIndexConfig())) {
       for (String record : records) {
         onHeapIndexCreator.add(record);
       }
@@ -173,7 +176,8 @@ public class JsonIndexTest {
     Assert.assertTrue(onHeapIndexFile.exists());
 
     String offHeapColumnName = "offHeap";
-    try (JsonIndexCreator offHeapIndexCreator = new OffHeapJsonIndexCreator(INDEX_DIR, offHeapColumnName)) {
+    try (JsonIndexCreator offHeapIndexCreator = new OffHeapJsonIndexCreator(INDEX_DIR, offHeapColumnName,
+        new JsonIndexConfig())) {
       for (String record : records) {
         offHeapIndexCreator.add(record);
       }
@@ -186,7 +190,7 @@ public class JsonIndexTest {
         PinotDataBuffer offHeapDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
         JsonIndexReader onHeapIndexReader = new ImmutableJsonIndexReader(onHeapDataBuffer, records.length);
         JsonIndexReader offHeapIndexReader = new ImmutableJsonIndexReader(offHeapDataBuffer, records.length);
-        MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl()) {
+        MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(new JsonIndexConfig())) {
       for (String record : records) {
         mutableJsonIndex.add(record);
       }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index 5993f312f8..732ef48cec 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -27,6 +27,7 @@ import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
 import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -281,8 +282,8 @@ public interface IndexCreationContext {
       return new Inverted(this);
     }
 
-    public Json forJsonIndex() {
-      return new Json(this);
+    public Json forJsonIndex(JsonIndexConfig jsonIndexConfig) {
+      return new Json(this, jsonIndexConfig);
     }
 
     public Range forRangeIndex(int rangeIndexVersion) {
@@ -426,9 +427,15 @@ public interface IndexCreationContext {
   }
 
   class Json extends Wrapper {
+    private final JsonIndexConfig _jsonIndexConfig;
 
-    Json(IndexCreationContext delegate) {
+    public Json(IndexCreationContext delegate, JsonIndexConfig jsonIndexConfig) {
       super(delegate);
+      _jsonIndexConfig = jsonIndexConfig;
+    }
+
+    public JsonIndexConfig getJsonIndexConfig() {
+      return _jsonIndexConfig;
     }
   }
 
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index ba765f5755..b5539b8b17 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -42,6 +42,7 @@ import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
 import org.apache.pinot.spi.config.table.FSTType;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -80,7 +81,7 @@ public class SegmentGeneratorConfig implements Serializable {
   private final List<String> _rangeIndexCreationColumns = new ArrayList<>();
   private final List<String> _textIndexCreationColumns = new ArrayList<>();
   private final List<String> _fstIndexCreationColumns = new ArrayList<>();
-  private final List<String> _jsonIndexCreationColumns = new ArrayList<>();
+  private final Map<String, JsonIndexConfig> _jsonIndexConfigs = new HashMap<>();
   private final Map<String, H3IndexConfig> _h3IndexConfigs = new HashMap<>();
   private final Map<String, List<TimestampIndexGranularity>> _timestampIndexConfigs = new HashMap<>();
   private final List<String> _columnSortOrder = new ArrayList<>();
@@ -207,8 +208,17 @@ public class SegmentGeneratorConfig implements Serializable {
         _rangeIndexCreationColumns.addAll(indexingConfig.getRangeIndexColumns());
       }
 
-      if (indexingConfig.getJsonIndexColumns() != null) {
-        _jsonIndexCreationColumns.addAll(indexingConfig.getJsonIndexColumns());
+      // Ignore jsonIndexColumns when jsonIndexConfigs is configured
+      Map<String, JsonIndexConfig> jsonIndexConfigs = indexingConfig.getJsonIndexConfigs();
+      if (jsonIndexConfigs != null) {
+        _jsonIndexConfigs.putAll(jsonIndexConfigs);
+      } else {
+        List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
+        if (jsonIndexColumns != null) {
+          for (String jsonIndexColumn : jsonIndexColumns) {
+            _jsonIndexConfigs.put(jsonIndexColumn, new JsonIndexConfig());
+          }
+        }
       }
 
       List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
@@ -412,8 +422,8 @@ public class SegmentGeneratorConfig implements Serializable {
     return _fstIndexCreationColumns;
   }
 
-  public List<String> getJsonIndexCreationColumns() {
-    return _jsonIndexCreationColumns;
+  public Map<String, JsonIndexConfig> getJsonIndexConfigs() {
+    return _jsonIndexConfigs;
   }
 
   public Map<String, H3IndexConfig> getH3IndexConfigs() {
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
index 7d86dae819..9bc8695e7e 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.spi.index.mutable.provider;
 
 import java.util.Objects;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -144,8 +145,8 @@ public interface MutableIndexContext {
       return new Inverted(this);
     }
 
-    public Json forJsonIndex() {
-      return new Json(this);
+    public Json forJsonIndex(JsonIndexConfig jsonIndexConfig) {
+      return new Json(this, jsonIndexConfig);
     }
 
     public Text forTextIndex() {
@@ -234,9 +235,15 @@ public interface MutableIndexContext {
   }
 
   class Json extends Wrapper {
+    private final JsonIndexConfig _jsonIndexConfig;
 
-    public Json(MutableIndexContext wrapped) {
+    public Json(MutableIndexContext wrapped, JsonIndexConfig jsonIndexConfig) {
       super(wrapped);
+      _jsonIndexConfig = jsonIndexConfig;
+    }
+
+    public JsonIndexConfig getJsonIndexConfig() {
+      return _jsonIndexConfig;
     }
   }
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 0211c4c48f..2ff2fe1b66 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -32,7 +32,9 @@ public class IndexingConfig extends BaseJsonConfig {
   private List<String> _invertedIndexColumns;
   private List<String> _rangeIndexColumns;
   private int _rangeIndexVersion = DEFAULT_RANGE_INDEX_VERSION;
+  @Deprecated
   private List<String> _jsonIndexColumns;
+  private Map<String, JsonIndexConfig> _jsonIndexConfigs;
   private List<String> _h3IndexColumns;
   private List<String> _sortedColumn;
   private List<String> _bloomFilterColumns;
@@ -114,6 +116,14 @@ public class IndexingConfig extends BaseJsonConfig {
     _jsonIndexColumns = jsonIndexColumns;
   }
 
+  public Map<String, JsonIndexConfig> getJsonIndexConfigs() {
+    return _jsonIndexConfigs;
+  }
+
+  public void setJsonIndexConfigs(Map<String, JsonIndexConfig> jsonIndexConfigs) {
+    _jsonIndexConfigs = jsonIndexConfigs;
+  }
+
   public boolean isAutoGeneratedInvertedIndex() {
     return _autoGeneratedInvertedIndex;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
new file mode 100644
index 0000000000..292b676c03
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.google.common.base.Preconditions;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * Configs related to the JSON index:
+ * - maxLevels: Max levels to flatten the json object (array is also counted as one level), non-positive value means
+ *              unlimited
+ * - excludeArray: Whether to exclude array when flattening the object
+ * - disableCrossArrayUnnest: Whether to not unnest multiple arrays (unique combination of all elements)
+ * - includePaths: Only include the given paths, e.g. "$.a.b", "$.a.c[*]" (mutual exclusive with excludePaths). Paths
+ *                 under the included paths will be included, e.g. "$.a.b.c" will be included when "$.a.b" is configured
+ *                 to be included.
+ * - excludePaths: Exclude the given paths, e.g. "$.a.b", "$.a.c[*]" (mutual exclusive with includePaths). Paths under
+ *                 the excluded paths will also be excluded, e.g. "$.a.b.c" will be excluded when "$.a.b" is configured
+ *                 to be excluded.
+ * - excludeFields: Exclude the given fields, e.g. "b", "c", even if it is under the included paths.
+ */
+public class JsonIndexConfig extends BaseJsonConfig {
+  private int _maxLevels = -1;
+  private boolean _excludeArray = false;
+  private boolean _disableCrossArrayUnnest = false;
+  private Set<String> _includePaths;
+  private Set<String> _excludePaths;
+  private Set<String> _excludeFields;
+
+  public int getMaxLevels() {
+    return _maxLevels;
+  }
+
+  public void setMaxLevels(int maxLevels) {
+    _maxLevels = maxLevels;
+  }
+
+  public boolean isExcludeArray() {
+    return _excludeArray;
+  }
+
+  public void setExcludeArray(boolean excludeArray) {
+    _excludeArray = excludeArray;
+  }
+
+  public boolean isDisableCrossArrayUnnest() {
+    return _disableCrossArrayUnnest;
+  }
+
+  public void setDisableCrossArrayUnnest(boolean disableCrossArrayUnnest) {
+    _disableCrossArrayUnnest = disableCrossArrayUnnest;
+  }
+
+  @Nullable
+  public Set<String> getIncludePaths() {
+    return _includePaths;
+  }
+
+  public void setIncludePaths(@Nullable Set<String> includePaths) {
+    Preconditions.checkArgument(includePaths == null || _excludePaths == null,
+        "Cannot configure both include and exclude paths");
+    Preconditions.checkArgument(includePaths == null || includePaths.size() > 0, "Include paths cannot be empty");
+    _includePaths = includePaths;
+  }
+
+  @Nullable
+  public Set<String> getExcludePaths() {
+    return _excludePaths;
+  }
+
+  public void setExcludePaths(@Nullable Set<String> excludePaths) {
+    Preconditions.checkArgument(excludePaths == null || _includePaths == null,
+        "Cannot configure both include and exclude paths");
+    _excludePaths = excludePaths;
+  }
+
+  @Nullable
+  public Set<String> getExcludeFields() {
+    return _excludeFields;
+  }
+
+  public void setExcludeFields(@Nullable Set<String> excludeFields) {
+    _excludeFields = excludeFields;
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index 670f510a5a..3bf8a7f989 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -47,6 +47,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
@@ -54,6 +55,7 @@ import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -70,7 +72,9 @@ public class JsonUtils {
   // For flattening
   public static final String VALUE_KEY = "";
   public static final String KEY_SEPARATOR = ".";
+  public static final String ARRAY_PATH = "[*]";
   public static final String ARRAY_INDEX_KEY = ".$index";
+  public static final int MAX_COMBINATIONS = 100_000;
 
   // For querying
   public static final String WILDCARD = "*";
@@ -346,7 +350,12 @@ public class JsonUtils {
    * ]
    * </pre>
    */
-  public static List<Map<String, String>> flatten(JsonNode node) {
+  public static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig) {
+    return flatten(node, jsonIndexConfig, 0, "$", false);
+  }
+
+  private static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig, int level,
+      String path, boolean includePathMatched) {
     // Null
     if (node.isNull()) {
       return Collections.emptyList();
@@ -357,23 +366,42 @@ public class JsonUtils {
       return Collections.singletonList(Collections.singletonMap(VALUE_KEY, node.asText()));
     }
 
+    Preconditions.checkArgument(node.isArray() || node.isObject(), "Unexpected node type: %s", node.getNodeType());
+
+    // Do not flatten further for array and object when max level reached
+    int maxLevels = jsonIndexConfig.getMaxLevels();
+    if (maxLevels > 0 && level == maxLevels) {
+      return Collections.emptyList();
+    }
+
     // Array
     if (node.isArray()) {
-      List<Map<String, String>> results = new ArrayList<>();
+      if (jsonIndexConfig.isExcludeArray()) {
+        return Collections.emptyList();
+      }
       int numChildren = node.size();
+      if (numChildren == 0) {
+        return Collections.emptyList();
+      }
+      String childPath = path + ARRAY_PATH;
+      IncludeResult includeResult =
+          includePathMatched ? IncludeResult.MATCH : shouldInclude(jsonIndexConfig, childPath);
+      if (!includeResult._shouldInclude) {
+        return Collections.emptyList();
+      }
+      List<Map<String, String>> results = new ArrayList<>(numChildren);
       for (int i = 0; i < numChildren; i++) {
         JsonNode childNode = node.get(i);
         String arrayIndexValue = Integer.toString(i);
-        List<Map<String, String>> childResults = flatten(childNode);
+        List<Map<String, String>> childResults =
+            flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched);
         for (Map<String, String> childResult : childResults) {
-          if (!childResult.isEmpty()) {
-            Map<String, String> result = new TreeMap<>();
-            for (Map.Entry<String, String> entry : childResult.entrySet()) {
-              result.put(KEY_SEPARATOR + entry.getKey(), entry.getValue());
-            }
-            result.put(ARRAY_INDEX_KEY, arrayIndexValue);
-            results.add(result);
+          Map<String, String> result = new TreeMap<>();
+          for (Map.Entry<String, String> entry : childResult.entrySet()) {
+            result.put(KEY_SEPARATOR + entry.getKey(), entry.getValue());
           }
+          result.put(ARRAY_INDEX_KEY, arrayIndexValue);
+          results.add(result);
         }
       }
       return results;
@@ -390,8 +418,20 @@ public class JsonUtils {
     Iterator<Map.Entry<String, JsonNode>> fieldIterator = node.fields();
     while (fieldIterator.hasNext()) {
       Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+      String field = fieldEntry.getKey();
+      Set<String> excludeFields = jsonIndexConfig.getExcludeFields();
+      if (excludeFields != null && excludeFields.contains(field)) {
+        continue;
+      }
+      String childPath = path + KEY_SEPARATOR + field;
+      IncludeResult includeResult =
+          includePathMatched ? IncludeResult.MATCH : shouldInclude(jsonIndexConfig, childPath);
+      if (!includeResult._shouldInclude) {
+        continue;
+      }
       JsonNode childNode = fieldEntry.getValue();
-      List<Map<String, String>> childResults = flatten(childNode);
+      List<Map<String, String>> childResults =
+          flatten(childNode, jsonIndexConfig, level + 1, childPath, includeResult._includePathMatched);
       int numChildResults = childResults.size();
 
       // Empty list - skip
@@ -400,7 +440,7 @@ public class JsonUtils {
       }
 
       // Single map - put all key-value pairs into the non-nested result map
-      String prefix = KEY_SEPARATOR + fieldEntry.getKey();
+      String prefix = KEY_SEPARATOR + field;
       if (numChildResults == 1) {
         Map<String, String> childResult = childResults.get(0);
         for (Map.Entry<String, String> entry : childResult.entrySet()) {
@@ -412,21 +452,11 @@ public class JsonUtils {
       // Multiple maps - put the results into a list to be processed later
       List<Map<String, String>> prefixedResults = new ArrayList<>(numChildResults);
       for (Map<String, String> childResult : childResults) {
-        if (!childResult.isEmpty()) {
-          Map<String, String> prefixedResult = new TreeMap<>();
-          for (Map.Entry<String, String> entry : childResult.entrySet()) {
-            prefixedResult.put(prefix + entry.getKey(), entry.getValue());
-          }
-          prefixedResults.add(prefixedResult);
+        Map<String, String> prefixedResult = new TreeMap<>();
+        for (Map.Entry<String, String> entry : childResult.entrySet()) {
+          prefixedResult.put(prefix + entry.getKey(), entry.getValue());
         }
-      }
-      int numPrefixedResults = prefixedResults.size();
-      if (numPrefixedResults == 0) {
-        continue;
-      }
-      if (numPrefixedResults == 1) {
-        nonNestedResult.putAll(prefixedResults.get(0));
-        continue;
+        prefixedResults.add(prefixedResult);
       }
       nestedResultsList.add(prefixedResults);
     }
@@ -447,10 +477,64 @@ public class JsonUtils {
       }
       return nestedResults;
     }
-    // If there are multiple child nodes with multiple records, calculate each combination of them as a new record.
-    List<Map<String, String>> results = new ArrayList<>();
-    unnestResults(nestedResultsList.get(0), nestedResultsList, 1, nonNestedResult, results);
-    return results;
+    // Multiple child nodes with multiple records
+    if (jsonIndexConfig.isDisableCrossArrayUnnest()) {
+      // Add each array individually
+      int numResults = 0;
+      for (List<Map<String, String>> nestedResults : nestedResultsList) {
+        numResults += nestedResults.size();
+      }
+      List<Map<String, String>> results = new ArrayList<>(numResults);
+      for (List<Map<String, String>> nestedResults : nestedResultsList) {
+        for (Map<String, String> nestedResult : nestedResults) {
+          nestedResult.putAll(nonNestedResult);
+          results.add(nestedResult);
+        }
+      }
+      return results;
+    } else {
+      // Calculate each combination of them as a new record
+      long numResults = 1;
+      for (List<Map<String, String>> nestedResults : nestedResultsList) {
+        numResults *= nestedResults.size();
+        Preconditions.checkState(numResults < MAX_COMBINATIONS, "Got too many combinations");
+      }
+      List<Map<String, String>> results = new ArrayList<>((int) numResults);
+      unnestResults(nestedResultsList.get(0), nestedResultsList, 1, nonNestedResult, results);
+      return results;
+    }
+  }
+
+  private static IncludeResult shouldInclude(JsonIndexConfig jsonIndexConfig, String path) {
+    Set<String> includePaths = jsonIndexConfig.getIncludePaths();
+    if (includePaths != null) {
+      if (includePaths.contains(path)) {
+        return IncludeResult.MATCH;
+      }
+      for (String includePath : includePaths) {
+        if (includePath.startsWith(path)) {
+          return IncludeResult.POTENTIAL_MATCH;
+        }
+      }
+      return IncludeResult.NOT_MATCH;
+    }
+    Set<String> excludePaths = jsonIndexConfig.getExcludePaths();
+    if (excludePaths != null && excludePaths.contains(path)) {
+      return IncludeResult.NOT_MATCH;
+    }
+    return IncludeResult.POTENTIAL_MATCH;
+  }
+
+  private enum IncludeResult {
+    MATCH(true, true), POTENTIAL_MATCH(true, false), NOT_MATCH(false, false);
+
+    final boolean _shouldInclude;
+    final boolean _includePathMatched;
+
+    IncludeResult(boolean shouldInclude, boolean includePathMatched) {
+      _shouldInclude = shouldInclude;
+      _includePathMatched = includePathMatched;
+    }
   }
 
   private static void unnestResults(List<Map<String, String>> currentResults,
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
index b1df9ce6d8..8f425efef9 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
@@ -20,20 +20,22 @@ package org.apache.pinot.spi.utils;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import org.testng.collections.Lists;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -45,99 +47,110 @@ public class JsonUtilsTest {
   @Test
   public void testFlatten()
       throws IOException {
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("null");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertTrue(flattenedRecords.isEmpty());
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("123");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 1);
       assertEquals(flattenedRecords.get(0), Collections.singletonMap("", "123"));
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("[]");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertTrue(flattenedRecords.isEmpty());
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("[1,2,3]");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 3);
-      Map<String, String> firstFlattenedRecord = flattenedRecords.get(0);
-      assertEquals(firstFlattenedRecord.get(".$index"), "0");
-      assertEquals(firstFlattenedRecord.get("."), "1");
-      Map<String, String> secondFlattenedRecord = flattenedRecords.get(1);
-      assertEquals(secondFlattenedRecord.get(".$index"), "1");
-      assertEquals(secondFlattenedRecord.get("."), "2");
-      Map<String, String> thirdFlattenedRecord = flattenedRecords.get(2);
-      assertEquals(thirdFlattenedRecord.get(".$index"), "2");
-      assertEquals(thirdFlattenedRecord.get("."), "3");
+      Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+      assertEquals(flattenedRecord0.size(), 2);
+      assertEquals(flattenedRecord0.get(".$index"), "0");
+      assertEquals(flattenedRecord0.get("."), "1");
+      Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+      assertEquals(flattenedRecord1.size(), 2);
+      assertEquals(flattenedRecord1.get(".$index"), "1");
+      assertEquals(flattenedRecord1.get("."), "2");
+      Map<String, String> flattenedRecord2 = flattenedRecords.get(2);
+      assertEquals(flattenedRecord2.size(), 2);
+      assertEquals(flattenedRecord2.get(".$index"), "2");
+      assertEquals(flattenedRecord2.get("."), "3");
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("[1,[2,3],[4,[5,6]]]]");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 6);
-      Map<String, String> firstFlattenedRecord = flattenedRecords.get(0);
-      assertEquals(firstFlattenedRecord.get(".$index"), "0");
-      assertEquals(firstFlattenedRecord.get("."), "1");
-      Map<String, String> secondFlattenedRecord = flattenedRecords.get(1);
-      assertEquals(secondFlattenedRecord.get(".$index"), "1");
-      assertEquals(secondFlattenedRecord.get("..$index"), "0");
-      assertEquals(secondFlattenedRecord.get(".."), "2");
-      Map<String, String> thirdFlattenedRecord = flattenedRecords.get(2);
-      assertEquals(thirdFlattenedRecord.get(".$index"), "1");
-      assertEquals(thirdFlattenedRecord.get("..$index"), "1");
-      assertEquals(thirdFlattenedRecord.get(".."), "3");
-      Map<String, String> fourthFlattenedRecord = flattenedRecords.get(3);
-      assertEquals(fourthFlattenedRecord.get(".$index"), "2");
-      assertEquals(fourthFlattenedRecord.get("..$index"), "0");
-      assertEquals(fourthFlattenedRecord.get(".."), "4");
-      Map<String, String> fifthFlattenedRecord = flattenedRecords.get(4);
-      assertEquals(fifthFlattenedRecord.get(".$index"), "2");
-      assertEquals(fifthFlattenedRecord.get("..$index"), "1");
-      assertEquals(fifthFlattenedRecord.get("...$index"), "0");
-      assertEquals(fifthFlattenedRecord.get("..."), "5");
-      Map<String, String> sixthFlattenedRecord = flattenedRecords.get(5);
-      assertEquals(sixthFlattenedRecord.get(".$index"), "2");
-      assertEquals(sixthFlattenedRecord.get("..$index"), "1");
-      assertEquals(sixthFlattenedRecord.get("...$index"), "1");
-      assertEquals(sixthFlattenedRecord.get("..."), "6");
+      Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+      assertEquals(flattenedRecord0.size(), 2);
+      assertEquals(flattenedRecord0.get(".$index"), "0");
+      assertEquals(flattenedRecord0.get("."), "1");
+      Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+      assertEquals(flattenedRecord1.size(), 3);
+      assertEquals(flattenedRecord1.get(".$index"), "1");
+      assertEquals(flattenedRecord1.get("..$index"), "0");
+      assertEquals(flattenedRecord1.get(".."), "2");
+      Map<String, String> flattenedRecord2 = flattenedRecords.get(2);
+      assertEquals(flattenedRecord2.size(), 3);
+      assertEquals(flattenedRecord2.get(".$index"), "1");
+      assertEquals(flattenedRecord2.get("..$index"), "1");
+      assertEquals(flattenedRecord2.get(".."), "3");
+      Map<String, String> flattenedRecord3 = flattenedRecords.get(3);
+      assertEquals(flattenedRecord3.size(), 3);
+      assertEquals(flattenedRecord3.get(".$index"), "2");
+      assertEquals(flattenedRecord3.get("..$index"), "0");
+      assertEquals(flattenedRecord3.get(".."), "4");
+      Map<String, String> flattenedRecord4 = flattenedRecords.get(4);
+      assertEquals(flattenedRecord4.size(), 4);
+      assertEquals(flattenedRecord4.get(".$index"), "2");
+      assertEquals(flattenedRecord4.get("..$index"), "1");
+      assertEquals(flattenedRecord4.get("...$index"), "0");
+      assertEquals(flattenedRecord4.get("..."), "5");
+      Map<String, String> flattenedRecord5 = flattenedRecords.get(5);
+      assertEquals(flattenedRecord5.size(), 4);
+      assertEquals(flattenedRecord5.get(".$index"), "2");
+      assertEquals(flattenedRecord5.get("..$index"), "1");
+      assertEquals(flattenedRecord5.get("...$index"), "1");
+      assertEquals(flattenedRecord5.get("..."), "6");
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("{}");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertTrue(flattenedRecords.isEmpty());
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("{\"key\":{}}");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertTrue(flattenedRecords.isEmpty());
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("[{},{},{}]");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertTrue(flattenedRecords.isEmpty());
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("{\"key\":[]}");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertTrue(flattenedRecords.isEmpty());
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode("{\"name\":\"adam\",\"age\":20}");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 1);
-      Map<String, String> firstFlattenedRecord = flattenedRecords.get(0);
-      assertEquals(firstFlattenedRecord.get(".name"), "adam");
-      assertEquals(firstFlattenedRecord.get(".age"), "20");
+      Map<String, String> flattenedRecord = flattenedRecords.get(0);
+      assertEquals(flattenedRecord.size(), 2);
+      assertEquals(flattenedRecord.get(".name"), "adam");
+      assertEquals(flattenedRecord.get(".age"), "20");
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode(
           "[{\"country\":\"us\",\"street\":\"main st\",\"number\":1},{\"country\":\"ca\",\"street\":\"second st\","
               + "\"number\":2}]");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 2);
       for (Map<String, String> flattenedRecord : flattenedRecords) {
         assertEquals(flattenedRecord.size(), 4);
@@ -146,22 +159,22 @@ public class JsonUtilsTest {
         assertTrue(flattenedRecord.containsKey("..street"));
         assertTrue(flattenedRecord.containsKey("..number"));
       }
-      Map<String, String> firstFlattenedRecord = flattenedRecords.get(0);
-      assertEquals(firstFlattenedRecord.get(".$index"), "0");
-      assertEquals(firstFlattenedRecord.get("..country"), "us");
-      assertEquals(firstFlattenedRecord.get("..street"), "main st");
-      assertEquals(firstFlattenedRecord.get("..number"), "1");
-      Map<String, String> secondFlattenedRecord = flattenedRecords.get(1);
-      assertEquals(secondFlattenedRecord.get(".$index"), "1");
-      assertEquals(secondFlattenedRecord.get("..country"), "ca");
-      assertEquals(secondFlattenedRecord.get("..street"), "second st");
-      assertEquals(secondFlattenedRecord.get("..number"), "2");
+      Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+      assertEquals(flattenedRecord0.get(".$index"), "0");
+      assertEquals(flattenedRecord0.get("..country"), "us");
+      assertEquals(flattenedRecord0.get("..street"), "main st");
+      assertEquals(flattenedRecord0.get("..number"), "1");
+      Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+      assertEquals(flattenedRecord1.get(".$index"), "1");
+      assertEquals(flattenedRecord1.get("..country"), "ca");
+      assertEquals(flattenedRecord1.get("..street"), "second st");
+      assertEquals(flattenedRecord1.get("..number"), "2");
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode(
           "{\"name\":\"adam\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"number\":1},"
               + "{\"country\":\"ca\",\"street\":\"second st\",\"number\":2}]}");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 2);
       for (Map<String, String> flattenedRecord : flattenedRecords) {
         assertEquals(flattenedRecord.size(), 5);
@@ -171,23 +184,23 @@ public class JsonUtilsTest {
         assertTrue(flattenedRecord.containsKey(".addresses..street"));
         assertTrue(flattenedRecord.containsKey(".addresses..number"));
       }
-      Map<String, String> firstFlattenedRecord = flattenedRecords.get(0);
-      assertEquals(firstFlattenedRecord.get(".addresses.$index"), "0");
-      assertEquals(firstFlattenedRecord.get(".addresses..country"), "us");
-      assertEquals(firstFlattenedRecord.get(".addresses..street"), "main st");
-      assertEquals(firstFlattenedRecord.get(".addresses..number"), "1");
-      Map<String, String> secondFlattenedRecord = flattenedRecords.get(1);
-      assertEquals(secondFlattenedRecord.get(".addresses.$index"), "1");
-      assertEquals(secondFlattenedRecord.get(".addresses..country"), "ca");
-      assertEquals(secondFlattenedRecord.get(".addresses..street"), "second st");
-      assertEquals(secondFlattenedRecord.get(".addresses..number"), "2");
+      Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+      assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+      assertEquals(flattenedRecord0.get(".addresses..country"), "us");
+      assertEquals(flattenedRecord0.get(".addresses..street"), "main st");
+      assertEquals(flattenedRecord0.get(".addresses..number"), "1");
+      Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+      assertEquals(flattenedRecord1.get(".addresses.$index"), "1");
+      assertEquals(flattenedRecord1.get(".addresses..country"), "ca");
+      assertEquals(flattenedRecord1.get(".addresses..street"), "second st");
+      assertEquals(flattenedRecord1.get(".addresses..number"), "2");
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode(
           "{\"name\":\"adam\",\"age\":20,\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"number\":1},"
               + "{\"country\":\"ca\",\"street\":\"second st\",\"number\":2}],\"skills\":[\"english\","
               + "\"programming\"]}");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 4);
       for (Map<String, String> flattenedRecord : flattenedRecords) {
         assertEquals(flattenedRecord.size(), 8);
@@ -200,26 +213,26 @@ public class JsonUtilsTest {
         assertTrue(flattenedRecord.containsKey(".skills.$index"));
         assertTrue(flattenedRecord.containsKey(".skills."));
       }
-      Map<String, String> firstFlattenedRecord = flattenedRecords.get(0);
-      assertEquals(firstFlattenedRecord.get(".addresses.$index"), "0");
-      assertEquals(firstFlattenedRecord.get(".addresses..country"), "us");
-      assertEquals(firstFlattenedRecord.get(".addresses..street"), "main st");
-      assertEquals(firstFlattenedRecord.get(".addresses..number"), "1");
-      assertEquals(firstFlattenedRecord.get(".skills.$index"), "0");
-      assertEquals(firstFlattenedRecord.get(".skills."), "english");
-      Map<String, String> lastFlattenedRecord = flattenedRecords.get(3);
-      assertEquals(lastFlattenedRecord.get(".addresses.$index"), "1");
-      assertEquals(lastFlattenedRecord.get(".addresses..country"), "ca");
-      assertEquals(lastFlattenedRecord.get(".addresses..street"), "second st");
-      assertEquals(lastFlattenedRecord.get(".addresses..number"), "2");
-      assertEquals(lastFlattenedRecord.get(".skills.$index"), "1");
-      assertEquals(lastFlattenedRecord.get(".skills."), "programming");
+      Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+      assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+      assertEquals(flattenedRecord0.get(".addresses..country"), "us");
+      assertEquals(flattenedRecord0.get(".addresses..street"), "main st");
+      assertEquals(flattenedRecord0.get(".addresses..number"), "1");
+      assertEquals(flattenedRecord0.get(".skills.$index"), "0");
+      assertEquals(flattenedRecord0.get(".skills."), "english");
+      Map<String, String> flattenedRecord3 = flattenedRecords.get(3);
+      assertEquals(flattenedRecord3.get(".addresses.$index"), "1");
+      assertEquals(flattenedRecord3.get(".addresses..country"), "ca");
+      assertEquals(flattenedRecord3.get(".addresses..street"), "second st");
+      assertEquals(flattenedRecord3.get(".addresses..number"), "2");
+      assertEquals(flattenedRecord3.get(".skills.$index"), "1");
+      assertEquals(flattenedRecord3.get(".skills."), "programming");
     }
     {
       JsonNode jsonNode = JsonUtils.stringToJsonNode(
           "{\"name\":\"bob\",\"age\":null,\"addresses\":[{\"country\":\"us\",\"street\":\"main st\"}],\"skills\":[],"
               + "\"hobbies\":[null]}");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 1);
       Map<String, String> flattenedRecord = flattenedRecords.get(0);
       assertEquals(flattenedRecord.size(), 4);
@@ -232,7 +245,7 @@ public class JsonUtilsTest {
       JsonNode jsonNode = JsonUtils.stringToJsonNode(
           "{\"name\":\"bob\",\"age\":null,\"addresses\":[{\"country\":\"us\",\"street\":\"main st\"}],\"skills\":[],"
               + "\"hobbies\":[null," + "\"football\"]}");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 1);
       Map<String, String> flattenedRecord = flattenedRecords.get(0);
       assertEquals(flattenedRecord.size(), 6);
@@ -247,33 +260,302 @@ public class JsonUtilsTest {
       JsonNode jsonNode = JsonUtils.stringToJsonNode(
           "{\"name\":\"charles\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"types\":[\"home\","
               + "\"office\"]}," + "{\"country\":\"ca\",\"street\":\"second st\"}]}");
-      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
       assertEquals(flattenedRecords.size(), 3);
-      Map<String, String> firstFlattenedRecord = flattenedRecords.get(0);
-      assertEquals(firstFlattenedRecord.size(), 6);
-      assertEquals(firstFlattenedRecord.get(".name"), "charles");
-      assertEquals(firstFlattenedRecord.get(".addresses.$index"), "0");
-      assertEquals(firstFlattenedRecord.get(".addresses..country"), "us");
-      assertEquals(firstFlattenedRecord.get(".addresses..street"), "main st");
-      assertEquals(firstFlattenedRecord.get(".addresses..types.$index"), "0");
-      assertEquals(firstFlattenedRecord.get(".addresses..types."), "home");
-      Map<String, String> secondFlattenedRecord = flattenedRecords.get(1);
-      assertEquals(secondFlattenedRecord.size(), 6);
-      assertEquals(secondFlattenedRecord.get(".name"), "charles");
-      assertEquals(secondFlattenedRecord.get(".addresses.$index"), "0");
-      assertEquals(secondFlattenedRecord.get(".addresses..country"), "us");
-      assertEquals(secondFlattenedRecord.get(".addresses..street"), "main st");
-      assertEquals(secondFlattenedRecord.get(".addresses..types.$index"), "1");
-      assertEquals(secondFlattenedRecord.get(".addresses..types."), "office");
-      Map<String, String> thirdFlattenedRecord = flattenedRecords.get(2);
-      assertEquals(thirdFlattenedRecord.size(), 4);
-      assertEquals(thirdFlattenedRecord.get(".name"), "charles");
-      assertEquals(thirdFlattenedRecord.get(".addresses.$index"), "1");
-      assertEquals(thirdFlattenedRecord.get(".addresses..country"), "ca");
-      assertEquals(thirdFlattenedRecord.get(".addresses..street"), "second st");
+      Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+      assertEquals(flattenedRecord0.size(), 6);
+      assertEquals(flattenedRecord0.get(".name"), "charles");
+      assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+      assertEquals(flattenedRecord0.get(".addresses..country"), "us");
+      assertEquals(flattenedRecord0.get(".addresses..street"), "main st");
+      assertEquals(flattenedRecord0.get(".addresses..types.$index"), "0");
+      assertEquals(flattenedRecord0.get(".addresses..types."), "home");
+      Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+      assertEquals(flattenedRecord1.size(), 6);
+      assertEquals(flattenedRecord1.get(".name"), "charles");
+      assertEquals(flattenedRecord1.get(".addresses.$index"), "0");
+      assertEquals(flattenedRecord1.get(".addresses..country"), "us");
+      assertEquals(flattenedRecord1.get(".addresses..street"), "main st");
+      assertEquals(flattenedRecord1.get(".addresses..types.$index"), "1");
+      assertEquals(flattenedRecord1.get(".addresses..types."), "office");
+      Map<String, String> flattenedRecord2 = flattenedRecords.get(2);
+      assertEquals(flattenedRecord2.size(), 4);
+      assertEquals(flattenedRecord2.get(".name"), "charles");
+      assertEquals(flattenedRecord2.get(".addresses.$index"), "1");
+      assertEquals(flattenedRecord2.get(".addresses..country"), "ca");
+      assertEquals(flattenedRecord2.get(".addresses..street"), "second st");
     }
   }
 
+  @Test
+  public void testFlattenWithMaxLevels()
+      throws IOException {
+    {
+      JsonNode jsonNode = JsonUtils.stringToJsonNode("[1,[2,3],[4,[5,6]]]]");
+      JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+
+      jsonIndexConfig.setMaxLevels(3);
+      assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);
+
+      jsonIndexConfig.setMaxLevels(2);
+      flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+      assertEquals(flattenedRecords.size(), 4);
+      Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+      assertEquals(flattenedRecord0.size(), 2);
+      assertEquals(flattenedRecord0.get(".$index"), "0");
+      assertEquals(flattenedRecord0.get("."), "1");
+      Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+      assertEquals(flattenedRecord1.size(), 3);
+      assertEquals(flattenedRecord1.get(".$index"), "1");
+      assertEquals(flattenedRecord1.get("..$index"), "0");
+      assertEquals(flattenedRecord1.get(".."), "2");
+      Map<String, String> flattenedRecord2 = flattenedRecords.get(2);
+      assertEquals(flattenedRecord2.size(), 3);
+      assertEquals(flattenedRecord2.get(".$index"), "1");
+      assertEquals(flattenedRecord2.get("..$index"), "1");
+      assertEquals(flattenedRecord2.get(".."), "3");
+      Map<String, String> flattenedRecord3 = flattenedRecords.get(3);
+      assertEquals(flattenedRecord3.size(), 3);
+      assertEquals(flattenedRecord3.get(".$index"), "2");
+      assertEquals(flattenedRecord3.get("..$index"), "0");
+      assertEquals(flattenedRecord3.get(".."), "4");
+
+      jsonIndexConfig.setMaxLevels(1);
+      flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+      assertEquals(flattenedRecords.size(), 1);
+      Map<String, String> flattenedRecord = flattenedRecords.get(0);
+      assertEquals(flattenedRecord.size(), 2);
+      assertEquals(flattenedRecord.get(".$index"), "0");
+      assertEquals(flattenedRecord.get("."), "1");
+    }
+    {
+      JsonNode jsonNode = JsonUtils.stringToJsonNode(
+          "[{\"country\":\"us\",\"street\":\"main st\",\"number\":1},{\"country\":\"ca\",\"street\":\"second st\","
+              + "\"number\":2}]");
+      JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+
+      jsonIndexConfig.setMaxLevels(2);
+      assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);
+
+      jsonIndexConfig.setMaxLevels(1);
+      flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+      assertTrue(flattenedRecords.isEmpty());
+    }
+    {
+      JsonNode jsonNode = JsonUtils.stringToJsonNode(
+          "{\"name\":\"adam\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"number\":1},"
+              + "{\"country\":\"ca\",\"street\":\"second st\",\"number\":2}]}");
+      JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+
+      jsonIndexConfig.setMaxLevels(3);
+      assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);
+
+      jsonIndexConfig.setMaxLevels(2);
+      flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+      assertEquals(flattenedRecords.size(), 1);
+      assertEquals(flattenedRecords.get(0), Collections.singletonMap(".name", "adam"));
+
+      jsonIndexConfig.setMaxLevels(1);
+      assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);
+    }
+    {
+      JsonNode jsonNode = JsonUtils.stringToJsonNode(
+          "{\"name\":\"charles\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"types\":[\"home\","
+              + "\"office\"]}," + "{\"country\":\"ca\",\"street\":\"second st\"}]}");
+      JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+
+      jsonIndexConfig.setMaxLevels(4);
+      assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);
+
+      jsonIndexConfig.setMaxLevels(3);
+      flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+      assertEquals(flattenedRecords.size(), 2);
+      Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+      assertEquals(flattenedRecord0.size(), 4);
+      assertEquals(flattenedRecord0.get(".name"), "charles");
+      assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+      assertEquals(flattenedRecord0.get(".addresses..country"), "us");
+      assertEquals(flattenedRecord0.get(".addresses..street"), "main st");
+      Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+      assertEquals(flattenedRecord1.size(), 4);
+      assertEquals(flattenedRecord1.get(".name"), "charles");
+      assertEquals(flattenedRecord1.get(".addresses.$index"), "1");
+      assertEquals(flattenedRecord1.get(".addresses..country"), "ca");
+      assertEquals(flattenedRecord1.get(".addresses..street"), "second st");
+    }
+  }
+
+  @Test
+  public void testFlattenWithDisableCrossArrayUnnest()
+      throws IOException {
+    JsonNode jsonNode = JsonUtils.stringToJsonNode(
+        "{\"name\":\"adam\",\"age\":20,\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"number\":1},"
+            + "{\"country\":\"ca\",\"street\":\"second st\",\"number\":2}],\"skills\":[\"english\","
+            + "\"programming\"]}");
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setDisableCrossArrayUnnest(true);
+    List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 4);
+    Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+    assertEquals(flattenedRecord0.size(), 6);
+    assertEquals(flattenedRecord0.get(".name"), "adam");
+    assertEquals(flattenedRecord0.get(".age"), "20");
+    assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+    assertEquals(flattenedRecord0.get(".addresses..country"), "us");
+    assertEquals(flattenedRecord0.get(".addresses..street"), "main st");
+    assertEquals(flattenedRecord0.get(".addresses..number"), "1");
+    Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+    assertEquals(flattenedRecord1.size(), 6);
+    assertEquals(flattenedRecord1.get(".name"), "adam");
+    assertEquals(flattenedRecord1.get(".age"), "20");
+    assertEquals(flattenedRecord1.get(".addresses.$index"), "1");
+    assertEquals(flattenedRecord1.get(".addresses..country"), "ca");
+    assertEquals(flattenedRecord1.get(".addresses..street"), "second st");
+    assertEquals(flattenedRecord1.get(".addresses..number"), "2");
+    Map<String, String> flattenedRecord2 = flattenedRecords.get(2);
+    assertEquals(flattenedRecord2.get(".name"), "adam");
+    assertEquals(flattenedRecord2.get(".age"), "20");
+    assertEquals(flattenedRecord2.get(".skills.$index"), "0");
+    assertEquals(flattenedRecord2.get(".skills."), "english");
+    Map<String, String> flattenedRecord3 = flattenedRecords.get(3);
+    assertEquals(flattenedRecord3.get(".name"), "adam");
+    assertEquals(flattenedRecord3.get(".age"), "20");
+    assertEquals(flattenedRecord3.get(".skills.$index"), "1");
+    assertEquals(flattenedRecord3.get(".skills."), "programming");
+  }
+
+  @Test
+  public void testFlattenIncludePaths()
+      throws IOException {
+    JsonNode jsonNode = JsonUtils.stringToJsonNode(
+        "{\"name\":\"charles\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"types\":[\"home\","
+            + "\"office\"]}," + "{\"country\":\"ca\",\"street\":\"second st\"}]}");
+
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setIncludePaths(Collections.singleton("$.name"));
+    List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 1);
+    assertEquals(flattenedRecords.get(0), Collections.singletonMap(".name", "charles"));
+
+    jsonIndexConfig.setIncludePaths(Collections.singleton("$.addresses"));
+    flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 3);
+    Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+    assertEquals(flattenedRecord0.size(), 5);
+    assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+    assertEquals(flattenedRecord0.get(".addresses..country"), "us");
+    assertEquals(flattenedRecord0.get(".addresses..street"), "main st");
+    assertEquals(flattenedRecord0.get(".addresses..types.$index"), "0");
+    assertEquals(flattenedRecord0.get(".addresses..types."), "home");
+    Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+    assertEquals(flattenedRecord1.size(), 5);
+    assertEquals(flattenedRecord1.get(".addresses.$index"), "0");
+    assertEquals(flattenedRecord1.get(".addresses..country"), "us");
+    assertEquals(flattenedRecord1.get(".addresses..street"), "main st");
+    assertEquals(flattenedRecord1.get(".addresses..types.$index"), "1");
+    assertEquals(flattenedRecord1.get(".addresses..types."), "office");
+    Map<String, String> flattenedRecord2 = flattenedRecords.get(2);
+    assertEquals(flattenedRecord2.size(), 3);
+    assertEquals(flattenedRecord2.get(".addresses.$index"), "1");
+    assertEquals(flattenedRecord2.get(".addresses..country"), "ca");
+    assertEquals(flattenedRecord2.get(".addresses..street"), "second st");
+
+    jsonIndexConfig.setIncludePaths(Collections.singleton("$.addresses[*]"));
+    assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig), flattenedRecords);
+
+    jsonIndexConfig.setIncludePaths(Collections.singleton("$.addresses[*].types"));
+    flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 2);
+    flattenedRecord0 = flattenedRecords.get(0);
+    assertEquals(flattenedRecord0.size(), 3);
+    assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+    assertEquals(flattenedRecord0.get(".addresses..types.$index"), "0");
+    assertEquals(flattenedRecord0.get(".addresses..types."), "home");
+    flattenedRecord1 = flattenedRecords.get(1);
+    assertEquals(flattenedRecord1.size(), 3);
+    assertEquals(flattenedRecord1.get(".addresses.$index"), "0");
+    assertEquals(flattenedRecord1.get(".addresses..types.$index"), "1");
+    assertEquals(flattenedRecord1.get(".addresses..types."), "office");
+
+    jsonIndexConfig.setIncludePaths(ImmutableSet.of("$.name", "$.addresses[*].types"));
+    flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 2);
+    flattenedRecord0 = flattenedRecords.get(0);
+    assertEquals(flattenedRecord0.size(), 4);
+    assertEquals(flattenedRecord0.get(".name"), "charles");
+    assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+    assertEquals(flattenedRecord0.get(".addresses..types.$index"), "0");
+    assertEquals(flattenedRecord0.get(".addresses..types."), "home");
+    flattenedRecord1 = flattenedRecords.get(1);
+    assertEquals(flattenedRecord1.size(), 4);
+    assertEquals(flattenedRecord1.get(".name"), "charles");
+    assertEquals(flattenedRecord1.get(".addresses.$index"), "0");
+    assertEquals(flattenedRecord1.get(".addresses..types.$index"), "1");
+    assertEquals(flattenedRecord1.get(".addresses..types."), "office");
+
+    jsonIndexConfig.setIncludePaths(Collections.singleton("$.no_match"));
+    flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertTrue(flattenedRecords.isEmpty());
+  }
+
+  @Test
+  public void testFlattenExclude()
+      throws IOException {
+    JsonNode jsonNode = JsonUtils.stringToJsonNode(
+        "{\"name\":\"charles\",\"addresses\":[{\"country\":\"us\",\"street\":\"main st\",\"types\":[\"home\","
+            + "\"office\"]}," + "{\"country\":\"ca\",\"street\":\"second st\"}]}");
+
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setExcludeArray(true);
+    List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 1);
+    assertEquals(flattenedRecords.get(0), Collections.singletonMap(".name", "charles"));
+
+    jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setExcludePaths(Collections.singleton("$.name"));
+    flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 3);
+    Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+    assertEquals(flattenedRecord0.size(), 5);
+    assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+    assertEquals(flattenedRecord0.get(".addresses..country"), "us");
+    assertEquals(flattenedRecord0.get(".addresses..street"), "main st");
+    assertEquals(flattenedRecord0.get(".addresses..types.$index"), "0");
+    assertEquals(flattenedRecord0.get(".addresses..types."), "home");
+    Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+    assertEquals(flattenedRecord1.size(), 5);
+    assertEquals(flattenedRecord1.get(".addresses.$index"), "0");
+    assertEquals(flattenedRecord1.get(".addresses..country"), "us");
+    assertEquals(flattenedRecord1.get(".addresses..street"), "main st");
+    assertEquals(flattenedRecord1.get(".addresses..types.$index"), "1");
+    assertEquals(flattenedRecord1.get(".addresses..types."), "office");
+    Map<String, String> flattenedRecord2 = flattenedRecords.get(2);
+    assertEquals(flattenedRecord2.size(), 3);
+    assertEquals(flattenedRecord2.get(".addresses.$index"), "1");
+    assertEquals(flattenedRecord2.get(".addresses..country"), "ca");
+    assertEquals(flattenedRecord2.get(".addresses..street"), "second st");
+
+    jsonIndexConfig.setExcludePaths(Collections.singleton("$.addresses"));
+    flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 1);
+    assertEquals(flattenedRecords.get(0), Collections.singletonMap(".name", "charles"));
+
+    jsonIndexConfig.setExcludePaths(Collections.singleton("$.addresses[*]"));
+    flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 1);
+    assertEquals(flattenedRecords.get(0), Collections.singletonMap(".name", "charles"));
+
+    jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setExcludeFields(Collections.singleton("addresses"));
+    flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    assertEquals(flattenedRecords.size(), 1);
+    assertEquals(flattenedRecords.get(0), Collections.singletonMap(".name", "charles"));
+  }
+
   @Test
   public void testUnrecognizedJsonProperties()
       throws Exception {
@@ -283,22 +565,22 @@ public class JsonUtilsTest {
     Pair<JsonUtilsTestSamplePojo, Map<String, Object>> parsedResp =
         JsonUtils.stringToObjectAndUnrecognizedProperties(inputJsonMissingProp, JsonUtilsTestSamplePojo.class);
 
-    Assert.assertTrue(parsedResp.getRight().containsKey("/missingProp"));
-    Assert.assertTrue(parsedResp.getRight().containsKey("/missingObjectProp/somestuff"));
-    Assert.assertTrue(parsedResp.getRight().containsKey("/missingObjectProp/somemorestuff"));
-    Assert.assertTrue(parsedResp.getRight().containsKey("/classField/internalMissingField"));
+    assertTrue(parsedResp.getRight().containsKey("/missingProp"));
+    assertTrue(parsedResp.getRight().containsKey("/missingObjectProp/somestuff"));
+    assertTrue(parsedResp.getRight().containsKey("/missingObjectProp/somemorestuff"));
+    assertTrue(parsedResp.getRight().containsKey("/classField/internalMissingField"));
   }
 
   @Test
   public void testInferSchema()
       throws Exception {
     ClassLoader classLoader = JsonUtilsTest.class.getClassLoader();
-    final File file = new File(classLoader.getResource(JSON_FILE).getFile());
+    File file = new File(Objects.requireNonNull(classLoader.getResource(JSON_FILE)).getFile());
     Map<String, FieldSpec.FieldType> fieldSpecMap =
-        new ImmutableMap.Builder<String, FieldSpec.FieldType>().put("d1", FieldSpec.FieldType.DIMENSION)
-            .put("hoursSinceEpoch", FieldSpec.FieldType.DATE_TIME).put("m1", FieldSpec.FieldType.METRIC).build();
-    Schema inferredPinotSchema = JsonUtils
-        .getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, new ArrayList<>(), ".",
+        ImmutableMap.of("d1", FieldSpec.FieldType.DIMENSION, "hoursSinceEpoch", FieldSpec.FieldType.DATE_TIME, "m1",
+            FieldSpec.FieldType.METRIC);
+    Schema inferredPinotSchema =
+        JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, new ArrayList<>(), ".",
             ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE);
     Schema expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
         .addMetric("m1", FieldSpec.DataType.INT)
@@ -310,9 +592,9 @@ public class JsonUtilsTest {
     Assert.assertEquals(inferredPinotSchema, expectedSchema);
 
     // unnest collection entries
-    inferredPinotSchema = JsonUtils
-        .getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList("entries"), ".",
-            ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE);
+    inferredPinotSchema =
+        JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Collections.singletonList("entries"),
+            ".", ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE);
     expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
         .addMetric("m1", FieldSpec.DataType.INT)
         .addSingleValueDimension("tuple.address.streetaddress", FieldSpec.DataType.STRING)
@@ -324,8 +606,8 @@ public class JsonUtilsTest {
     Assert.assertEquals(inferredPinotSchema, expectedSchema);
 
     // change delimiter
-    inferredPinotSchema = JsonUtils
-        .getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList(""), "_",
+    inferredPinotSchema =
+        JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Collections.singletonList(""), "_",
             ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE);
     expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
         .addMetric("m1", FieldSpec.DataType.INT)
@@ -337,9 +619,9 @@ public class JsonUtilsTest {
     Assert.assertEquals(inferredPinotSchema, expectedSchema);
 
     // change the handling of collection-to-json option, d2 will become string
-    inferredPinotSchema = JsonUtils
-        .getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList("entries"), ".",
-            ComplexTypeConfig.CollectionNotUnnestedToJson.ALL);
+    inferredPinotSchema =
+        JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Collections.singletonList("entries"),
+            ".", ComplexTypeConfig.CollectionNotUnnestedToJson.ALL);
     expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING)
         .addMetric("m1", FieldSpec.DataType.INT)
         .addSingleValueDimension("tuple.address.streetaddress", FieldSpec.DataType.STRING)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org