You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/22 02:37:51 UTC

[pinot] branch master updated: Add help methods to check if segment needs reprocessing (#7925)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 29bcc82  Add help methods to check if segment needs reprocessing (#7925)
29bcc82 is described below

commit 29bcc827bf259157250d5756acbbd38a7296d569
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Tue Dec 21 18:37:30 2021 -0800

    Add help methods to check if segment needs reprocessing (#7925)
---
 .../immutable/ImmutableSegmentLoader.java          |  20 +++
 .../local/segment/index/loader/IndexHandler.java   |  12 +-
 .../segment/index/loader/IndexHandlerFactory.java  |  35 +++---
 .../segment/index/loader/SegmentPreProcessor.java  |  74 ++++++++++-
 .../loader/bloomfilter/BloomFilterHandler.java     |  45 +++----
 .../ColumnMinMaxValueGenerator.java                |  26 +++-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |   6 +
 .../loader/defaultcolumn/DefaultColumnHandler.java |   7 ++
 .../loader/invertedindex/FSTIndexHandler.java      |  50 ++++----
 .../index/loader/invertedindex/H3IndexHandler.java |  73 ++++++-----
 .../loader/invertedindex/InvertedIndexHandler.java |  47 +++----
 .../loader/invertedindex/JsonIndexHandler.java     |  60 +++++----
 .../loader/invertedindex/RangeIndexHandler.java    |  66 +++++-----
 .../loader/invertedindex/TextIndexHandler.java     |  62 ++++-----
 .../local/segment/index/loader/LoaderTest.java     |  27 ++++
 .../index/loader/SegmentPreProcessorTest.java      | 138 +++++++++++++++++++--
 .../pinot/segment/spi/store/SegmentDirectory.java  |   8 +-
 17 files changed, 532 insertions(+), 224 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index d1a31cb..164d86a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -175,6 +175,26 @@ public class ImmutableSegmentLoader {
     return segment;
   }
 
+  /**
+   * Check segment directory against the table config and schema to see if any preprocessing is needed,
+   * like changing segment format, adding new indices or updating default columns.
+   */
+  public static boolean needPreprocess(SegmentDirectory segmentDirectory, IndexLoadingConfig indexLoadingConfig,
+      @Nullable Schema schema)
+      throws Exception {
+    if (needConvertSegmentFormat(indexLoadingConfig, segmentDirectory.getSegmentMetadata())) {
+      return true;
+    }
+    SegmentPreProcessor preProcessor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema);
+    return preProcessor.needProcess();
+  }
+
+  private static boolean needConvertSegmentFormat(IndexLoadingConfig indexLoadingConfig,
+      SegmentMetadataImpl segmentMetadata) {
+    SegmentVersion segmentVersionToLoad = indexLoadingConfig.getSegmentVersion();
+    return segmentVersionToLoad != null && segmentVersionToLoad != segmentMetadata.getVersion();
+  }
+
   private static void convertSegmentFormat(File indexDir, IndexLoadingConfig indexLoadingConfig,
       SegmentMetadataImpl localSegmentMetadata)
       throws Exception {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
index f0ebe8c..5b2ba47 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.segment.local.segment.index.loader;
 
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+
+
 /**
  * Interface for index handlers, which update the corresponding type of indices,
  * like adding, removing or converting the format.
@@ -26,6 +30,12 @@ public interface IndexHandler {
   /**
    * Adds new indices and removes obsolete indices.
    */
-  void updateIndices()
+  void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
       throws Exception;
+
+  /**
+   * Check if there is a need to add new indices or removes obsolete indices.
+   * @return true if there is a need to update.
+   */
+  boolean needUpdateIndices(SegmentDirectory.Reader segmentReader);
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
index d06c1ba..bc48ff9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.segment.local.segment.index.loader;
 
-import java.io.File;
 import org.apache.pinot.segment.local.segment.index.loader.bloomfilter.BloomFilterHandler;
 import org.apache.pinot.segment.local.segment.index.loader.invertedindex.FSTIndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.invertedindex.H3IndexHandler;
@@ -27,7 +26,6 @@ import org.apache.pinot.segment.local.segment.index.loader.invertedindex.JsonInd
 import org.apache.pinot.segment.local.segment.index.loader.invertedindex.RangeIndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler;
 import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
-import org.apache.pinot.segment.spi.index.IndexingOverrides;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
@@ -37,31 +35,34 @@ public class IndexHandlerFactory {
   private IndexHandlerFactory() {
   }
 
-  private static final IndexHandler NO_OP_HANDLER = () -> {
+  private static final IndexHandler NO_OP_HANDLER = new IndexHandler() {
+    @Override
+    public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) {
+    }
+
+    @Override
+    public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+      return false;
+    }
   };
 
-  public static IndexHandler getIndexHandler(ColumnIndexType type, File indexDir, SegmentMetadataImpl segmentMetadata,
-      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer segmentWriter) {
-    IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+  public static IndexHandler getIndexHandler(ColumnIndexType type, SegmentMetadataImpl segmentMetadata,
+      IndexLoadingConfig indexLoadingConfig) {
     switch (type) {
       case INVERTED_INDEX:
-        return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
-            indexCreatorProvider);
+        return new InvertedIndexHandler(segmentMetadata, indexLoadingConfig);
       case RANGE_INDEX:
-        return new RangeIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
-            indexCreatorProvider);
+        return new RangeIndexHandler(segmentMetadata, indexLoadingConfig);
       case TEXT_INDEX:
-        return new TextIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
+        return new TextIndexHandler(segmentMetadata, indexLoadingConfig);
       case FST_INDEX:
-        return new FSTIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
-            indexLoadingConfig.getFSTIndexType(), indexCreatorProvider);
+        return new FSTIndexHandler(segmentMetadata, indexLoadingConfig);
       case JSON_INDEX:
-        return new JsonIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
+        return new JsonIndexHandler(segmentMetadata, indexLoadingConfig);
       case H3_INDEX:
-        return new H3IndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
+        return new H3IndexHandler(segmentMetadata, indexLoadingConfig);
       case BLOOM_FILTER:
-        return new BloomFilterHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
-            indexCreatorProvider);
+        return new BloomFilterHandler(segmentMetadata, indexLoadingConfig);
       default:
         return NO_OP_HANDLER;
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
index 82a30a7..47b01ce 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
@@ -31,6 +31,8 @@ import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
 import org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder;
 import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.index.IndexingOverrides;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -96,9 +98,10 @@ public class SegmentPreProcessor implements AutoCloseable {
       }
 
       // Update single-column indices, like inverted index, json index etc.
+      IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
       for (ColumnIndexType type : ColumnIndexType.values()) {
-        IndexHandlerFactory.getIndexHandler(type, _indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter)
-            .updateIndices();
+        IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig)
+            .updateIndices(segmentWriter, indexCreatorProvider);
       }
 
       // Create/modify/remove star-trees if required.
@@ -120,6 +123,73 @@ public class SegmentPreProcessor implements AutoCloseable {
     }
   }
 
+  /**
+   * This method checks if there is any discrepancy between the segment and current table config and schema.
+   * If so, it returns true indicating the segment needs to be reprocessed. Right now, the default columns,
+   * all types of indices and column min/max values are checked against what's set in table config and schema.
+   */
+  public boolean needProcess()
+      throws Exception {
+    if (_segmentMetadata.getTotalDocs() == 0) {
+      return false;
+    }
+    try (SegmentDirectory.Reader segmentReader = _segmentDirectory.createReader()) {
+      // Check if there is need to update default columns according to the schema.
+      if (_schema != null) {
+        DefaultColumnHandler defaultColumnHandler = DefaultColumnHandlerFactory
+            .getDefaultColumnHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, _schema, null);
+        if (defaultColumnHandler.needUpdateDefaultColumns()) {
+          return true;
+        }
+      }
+      // Check if there is need to update single-column indices, like inverted index, json index etc.
+      for (ColumnIndexType type : ColumnIndexType.values()) {
+        if (IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig)
+            .needUpdateIndices(segmentReader)) {
+          return true;
+        }
+      }
+      // Check if there is need to create/modify/remove star-trees.
+      if (needProcessStarTrees()) {
+        return true;
+      }
+      // Check if there is need to update column min max value.
+      if (needUpdateColumnMinMaxValue()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean needUpdateColumnMinMaxValue() {
+    ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode =
+        _indexLoadingConfig.getColumnMinMaxValueGeneratorMode();
+    if (columnMinMaxValueGeneratorMode == ColumnMinMaxValueGeneratorMode.NONE) {
+      return false;
+    }
+    ColumnMinMaxValueGenerator columnMinMaxValueGenerator =
+        new ColumnMinMaxValueGenerator(_segmentMetadata, null, columnMinMaxValueGeneratorMode);
+    return columnMinMaxValueGenerator.needAddColumnMinMaxValue();
+  }
+
+  private boolean needProcessStarTrees() {
+    // Check if there is need to create/modify/remove star-trees.
+    if (!_indexLoadingConfig.isEnableDynamicStarTreeCreation()) {
+      return false;
+    }
+    List<StarTreeV2BuilderConfig> starTreeBuilderConfigs = StarTreeBuilderUtils
+        .generateBuilderConfigs(_indexLoadingConfig.getStarTreeIndexConfigs(),
+            _indexLoadingConfig.isEnableDefaultStarTree(), _segmentMetadata);
+    List<StarTreeV2Metadata> starTreeMetadataList = _segmentMetadata.getStarTreeV2MetadataList();
+    // There are existing star-trees, but if they match the builder configs exactly,
+    // then there is no need to generate the star-trees
+    if (starTreeMetadataList != null && !StarTreeBuilderUtils
+        .shouldRemoveExistingStarTrees(starTreeBuilderConfigs, starTreeMetadataList)) {
+      return false;
+    }
+    return !starTreeBuilderConfigs.isEmpty();
+  }
+
   private void processStarTrees()
       throws Exception {
     // Create/modify/remove star-trees if required
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
index f992344..065f693 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -35,12 +35,13 @@ import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
 import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.BloomFilterCreatorProvider;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -54,32 +55,32 @@ import org.slf4j.LoggerFactory;
 public class BloomFilterHandler implements IndexHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterHandler.class);
 
-  private final File _indexDir;
-  private final SegmentMetadataImpl _segmentMetadata;
-  private final SegmentDirectory.Writer _segmentWriter;
+  private final SegmentMetadata _segmentMetadata;
   private final Map<String, BloomFilterConfig> _bloomFilterConfigs;
-  private final BloomFilterCreatorProvider _indexCreatorProvider;
 
-  public BloomFilterHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter, BloomFilterCreatorProvider indexCreatorProvider) {
-    _indexDir = indexDir;
-    _segmentWriter = segmentWriter;
+  public BloomFilterHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
     _segmentMetadata = segmentMetadata;
     _bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs();
-    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
-  public void updateIndices()
+  public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+    Set<String> columnsToAddBF = new HashSet<>(_bloomFilterConfigs.keySet());
+    Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.BLOOM_FILTER);
+    return !existingColumns.equals(columnsToAddBF);
+  }
+
+  @Override
+  public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
       throws Exception {
     Set<String> columnsToAddBF = new HashSet<>(_bloomFilterConfigs.keySet());
     // Remove indices not set in table config any more.
     String segmentName = _segmentMetadata.getName();
-    Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.BLOOM_FILTER);
+    Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.BLOOM_FILTER);
     for (String column : existingColumns) {
       if (!columnsToAddBF.remove(column)) {
         LOGGER.info("Removing existing bloom filter from segment: {}, column: {}", segmentName, column);
-        _segmentWriter.removeIndex(column, ColumnIndexType.BLOOM_FILTER);
+        segmentWriter.removeIndex(column, ColumnIndexType.BLOOM_FILTER);
         LOGGER.info("Removed existing bloom filter from segment: {}, column: {}", segmentName, column);
       }
     }
@@ -87,19 +88,21 @@ public class BloomFilterHandler implements IndexHandler {
       ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
       if (columnMetadata != null) {
         if (columnMetadata.hasDictionary()) {
-          createBloomFilterForColumn(columnMetadata);
+          createBloomFilterForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
         }
         // TODO: Support raw index
       }
     }
   }
 
-  private void createBloomFilterForColumn(ColumnMetadata columnMetadata)
+  private void createBloomFilterForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      BloomFilterCreatorProvider indexCreatorProvider)
       throws Exception {
+    File indexDir = _segmentMetadata.getIndexDir();
     String segmentName = _segmentMetadata.getName();
     String columnName = columnMetadata.getColumnName();
-    File bloomFilterFileInProgress = new File(_indexDir, columnName + ".bloom.inprogress");
-    File bloomFilterFile = new File(_indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+    File bloomFilterFileInProgress = new File(indexDir, columnName + ".bloom.inprogress");
+    File bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
 
     if (!bloomFilterFileInProgress.exists()) {
       // Marker file does not exist, which means last run ended normally.
@@ -115,10 +118,10 @@ public class BloomFilterHandler implements IndexHandler {
     BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
     LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", segmentName, columnName,
         bloomFilterConfig);
-    try (BloomFilterCreator bloomFilterCreator = _indexCreatorProvider.newBloomFilterCreator(
-        IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+    try (BloomFilterCreator bloomFilterCreator = indexCreatorProvider.newBloomFilterCreator(
+        IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
             .build().forBloomFilter(bloomFilterConfig));
-        Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter)) {
+        Dictionary dictionary = getDictionaryReader(columnMetadata, segmentWriter)) {
       int length = dictionary.length();
       for (int i = 0; i < length; i++) {
         bloomFilterCreator.add(dictionary.getStringValue(i));
@@ -128,7 +131,7 @@ public class BloomFilterHandler implements IndexHandler {
 
     // For v3, write the generated bloom filter file into the single file and remove it.
     if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
-      LoaderUtils.writeIndexToV3Format(_segmentWriter, columnName, bloomFilterFile, ColumnIndexType.BLOOM_FILTER);
+      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, bloomFilterFile, ColumnIndexType.BLOOM_FILTER);
     }
 
     // Delete the marker file.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index da42aab..36cf3fd 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -56,10 +56,25 @@ public class ColumnMinMaxValueGenerator {
     _columnMinMaxValueGeneratorMode = columnMinMaxValueGeneratorMode;
   }
 
+  public boolean needAddColumnMinMaxValue() {
+    for (String column : getColumnsToAddMinMaxValue()) {
+      if (needAddColumnMinMaxValueForColumn(column)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public void addColumnMinMaxValue()
       throws Exception {
     Preconditions.checkState(_columnMinMaxValueGeneratorMode != ColumnMinMaxValueGeneratorMode.NONE);
+    for (String column : getColumnsToAddMinMaxValue()) {
+      addColumnMinMaxValueForColumn(column);
+    }
+    saveMetadata();
+  }
 
+  private Set<String> getColumnsToAddMinMaxValue() {
     Schema schema = _segmentMetadata.getSchema();
     Set<String> columnsToAddMinMaxValue = new HashSet<>(schema.getPhysicalColumnNames());
 
@@ -77,10 +92,13 @@ public class ColumnMinMaxValueGenerator {
       default:
         break;
     }
-    for (String column : columnsToAddMinMaxValue) {
-      addColumnMinMaxValueForColumn(column);
-    }
-    saveMetadata();
+    return columnsToAddMinMaxValue;
+  }
+
+  private boolean needAddColumnMinMaxValueForColumn(String columnName) {
+    ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(columnName);
+    return columnMetadata.hasDictionary() && columnMetadata.getMinValue() == null
+        && columnMetadata.getMaxValue() == null;
   }
 
   private void addColumnMinMaxValueForColumn(String columnName)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 6e791cc..45c0554 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -126,6 +126,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
     _segmentProperties = SegmentMetadataImpl.getPropertiesConfiguration(indexDir);
   }
 
+  @Override
+  public boolean needUpdateDefaultColumns() {
+    Map<String, DefaultColumnAction> defaultColumnActionMap = computeDefaultColumnActionMap();
+    return !defaultColumnActionMap.isEmpty();
+  }
+
   /**
    * {@inheritDoc}
    */
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandler.java
index 9d1baae..88db502 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandler.java
@@ -28,4 +28,11 @@ public interface DefaultColumnHandler {
    */
   void updateDefaultColumns()
       throws Exception;
+
+  /**
+   * Check if there is a need to add/remove/update the auto-generated default columns
+   * for the segment, according to the current table schema.
+   * @return true if there is a need to update.
+   */
+  boolean needUpdateDefaultColumns();
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
index 292bf0c..c594f87 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
@@ -31,6 +31,7 @@ import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
 import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
@@ -66,33 +67,32 @@ import static org.apache.pinot.segment.spi.V1Constants.Indexes.FST_INDEX_FILE_EX
 public class FSTIndexHandler implements IndexHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(FSTIndexHandler.class);
 
-  private final File _indexDir;
   private final SegmentMetadata _segmentMetadata;
-  private final SegmentDirectory.Writer _segmentWriter;
   private final Set<String> _columnsToAddIdx;
   private final FSTType _fstType;
-  private final TextIndexCreatorProvider _indexCreatorProvider;
 
-  public FSTIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter, FSTType fstType, TextIndexCreatorProvider indexCreatorProvider) {
-    _indexDir = indexDir;
+  public FSTIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
     _segmentMetadata = segmentMetadata;
-    _segmentWriter = segmentWriter;
+    _fstType = indexLoadingConfig.getFSTIndexType();
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getFSTIndexColumns());
-    _fstType = fstType;
-    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
-  public void updateIndices()
+  public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+    Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FST_INDEX);
+    return !existingColumns.equals(_columnsToAddIdx);
+  }
+
+  @Override
+  public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
       throws Exception {
     // Remove indices not set in table config any more
     String segmentName = _segmentMetadata.getName();
-    Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FST_INDEX);
+    Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FST_INDEX);
     for (String column : existingColumns) {
       if (!_columnsToAddIdx.remove(column)) {
         LOGGER.info("Removing existing FST index from segment: {}, column: {}", segmentName, column);
-        _segmentWriter.removeIndex(column, ColumnIndexType.FST_INDEX);
+        segmentWriter.removeIndex(column, ColumnIndexType.FST_INDEX);
         LOGGER.info("Removed existing FST index from segment: {}, column: {}", segmentName, column);
       }
     }
@@ -100,7 +100,7 @@ public class FSTIndexHandler implements IndexHandler {
       ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
       if (columnMetadata != null) {
         checkUnsupportedOperationsForFSTIndex(columnMetadata);
-        createFSTIndexForColumn(columnMetadata);
+        createFSTIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
       }
     }
   }
@@ -121,12 +121,14 @@ public class FSTIndexHandler implements IndexHandler {
     }
   }
 
-  private void createFSTIndexForColumn(ColumnMetadata columnMetadata)
+  private void createFSTIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      TextIndexCreatorProvider indexCreatorProvider)
       throws IOException {
+    File indexDir = _segmentMetadata.getIndexDir();
     String segmentName = _segmentMetadata.getName();
-    String column = columnMetadata.getColumnName();
-    File inProgress = new File(_indexDir, column + ".fst.inprogress");
-    File fstIndexFile = new File(_indexDir, column + FST_INDEX_FILE_EXTENSION);
+    String columnName = columnMetadata.getColumnName();
+    File inProgress = new File(indexDir, columnName + ".fst.inprogress");
+    File fstIndexFile = new File(indexDir, columnName + FST_INDEX_FILE_EXTENSION);
 
     if (!inProgress.exists()) {
       // Create a marker file.
@@ -135,14 +137,14 @@ public class FSTIndexHandler implements IndexHandler {
       FileUtils.deleteQuietly(fstIndexFile);
     }
 
-    LOGGER.info("Creating new FST index for column: {} in segment: {}, cardinality: {}", column, segmentName,
+    LOGGER.info("Creating new FST index for column: {} in segment: {}, cardinality: {}", columnName, segmentName,
         columnMetadata.getCardinality());
 
-    TextIndexCreator fstIndexCreator = _indexCreatorProvider.newTextIndexCreator(
-        IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
-            .build().forFSTIndex(_fstType, null));
+    TextIndexCreator fstIndexCreator = indexCreatorProvider.newTextIndexCreator(
+        IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
+            .forFSTIndex(_fstType, null));
 
-    try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
+    try (Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata)) {
       for (int dictId = 0; dictId < dictionary.length(); dictId++) {
         fstIndexCreator.add(dictionary.getStringValue(dictId));
       }
@@ -151,11 +153,11 @@ public class FSTIndexHandler implements IndexHandler {
 
     // For v3, write the generated range index file into the single file and remove it.
     if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
-      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, fstIndexFile, ColumnIndexType.FST_INDEX);
+      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, fstIndexFile, ColumnIndexType.FST_INDEX);
     }
 
     // Delete the marker file.
     FileUtils.deleteQuietly(inProgress);
-    LOGGER.info("Created FST index for segment: {}, column: {}", segmentName, column);
+    LOGGER.info("Created FST index for segment: {}, column: {}", segmentName, columnName);
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
index d2fceaa..e58ccef 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
@@ -25,19 +25,19 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.GeoSpatialIndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -52,49 +52,51 @@ import org.slf4j.LoggerFactory;
 public class H3IndexHandler implements IndexHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(H3IndexHandler.class);
 
-  private final File _indexDir;
-  private final SegmentMetadataImpl _segmentMetadata;
-  private final SegmentDirectory.Writer _segmentWriter;
+  private final SegmentMetadata _segmentMetadata;
   private final Map<String, H3IndexConfig> _h3Configs;
-  private final IndexCreatorProvider _indexCreatorProvider;
 
-  public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) {
-    _indexDir = indexDir;
+  public H3IndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
     _segmentMetadata = segmentMetadata;
-    _segmentWriter = segmentWriter;
     _h3Configs = indexLoadingConfig.getH3IndexConfigs();
-    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
-  public void updateIndices()
+  public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+    Set<String> columnsToAddIdx = new HashSet<>(_h3Configs.keySet());
+    Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.H3_INDEX);
+    return !existingColumns.equals(columnsToAddIdx);
+  }
+
+  @Override
+  public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
       throws Exception {
     Set<String> columnsToAddIdx = new HashSet<>(_h3Configs.keySet());
     // Remove indices not set in table config any more
     String segmentName = _segmentMetadata.getName();
-    Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.H3_INDEX);
+    Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.H3_INDEX);
     for (String column : existingColumns) {
       if (!columnsToAddIdx.remove(column)) {
         LOGGER.info("Removing existing H3 index from segment: {}, column: {}", segmentName, column);
-        _segmentWriter.removeIndex(column, ColumnIndexType.H3_INDEX);
+        segmentWriter.removeIndex(column, ColumnIndexType.H3_INDEX);
         LOGGER.info("Removed existing H3 index from segment: {}, column: {}", segmentName, column);
       }
     }
     for (String column : columnsToAddIdx) {
       ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
       if (columnMetadata != null) {
-        createH3IndexForColumn(columnMetadata);
+        createH3IndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
       }
     }
   }
 
-  private void createH3IndexForColumn(ColumnMetadata columnMetadata)
+  private void createH3IndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      GeoSpatialIndexCreatorProvider indexCreatorProvider)
       throws Exception {
+    File indexDir = _segmentMetadata.getIndexDir();
     String segmentName = _segmentMetadata.getName();
-    String column = columnMetadata.getColumnName();
-    File inProgress = new File(_indexDir, column + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION + ".inprogress");
-    File h3IndexFile = new File(_indexDir, column + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION);
+    String columnName = columnMetadata.getColumnName();
+    File inProgress = new File(indexDir, columnName + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION + ".inprogress");
+    File h3IndexFile = new File(indexDir, columnName + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION);
 
     if (!inProgress.exists()) {
       // Marker file does not exist, which means last run ended normally.
@@ -108,34 +110,36 @@ public class H3IndexHandler implements IndexHandler {
     }
 
     // Create new H3 index for the column.
-    LOGGER.info("Creating new H3 index for segment: {}, column: {}", segmentName, column);
+    LOGGER.info("Creating new H3 index for segment: {}, column: {}", segmentName, columnName);
     Preconditions
         .checkState(columnMetadata.getDataType() == DataType.BYTES, "H3 index can only be applied to BYTES columns");
     if (columnMetadata.hasDictionary()) {
-      handleDictionaryBasedColumn(columnMetadata);
+      handleDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
     } else {
-      handleNonDictionaryBasedColumn(columnMetadata);
+      handleNonDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
     }
 
     // For v3, write the generated H3 index file into the single file and remove it.
     if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
-      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, h3IndexFile, ColumnIndexType.H3_INDEX);
+      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, h3IndexFile, ColumnIndexType.H3_INDEX);
     }
 
     // Delete the marker file.
     FileUtils.deleteQuietly(inProgress);
 
-    LOGGER.info("Created H3 index for segment: {}, column: {}", segmentName, column);
+    LOGGER.info("Created H3 index for segment: {}, column: {}", segmentName, columnName);
   }
 
-  private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+  private void handleDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      GeoSpatialIndexCreatorProvider indexCreatorProvider)
       throws IOException {
+    File indexDir = _segmentMetadata.getIndexDir();
     String columnName = columnMetadata.getColumnName();
-    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata);
-        GeoSpatialIndexCreator h3IndexCreator = _indexCreatorProvider.newGeoSpatialIndexCreator(
-            IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+        Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata);
+        GeoSpatialIndexCreator h3IndexCreator = indexCreatorProvider.newGeoSpatialIndexCreator(
+            IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
                 .build().forGeospatialIndex(_h3Configs.get(columnName)))) {
       int numDocs = columnMetadata.getTotalDocs();
       for (int i = 0; i < numDocs; i++) {
@@ -146,13 +150,16 @@ public class H3IndexHandler implements IndexHandler {
     }
   }
 
-  private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+  private void handleNonDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      GeoSpatialIndexCreatorProvider indexCreatorProvider)
       throws Exception {
+    File indexDir = _segmentMetadata.getIndexDir();
     String columnName = columnMetadata.getColumnName();
-    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        OffHeapH3IndexCreator h3IndexCreator = new OffHeapH3IndexCreator(_indexDir, columnName,
-            _h3Configs.get(columnName).getResolution())) {
+        GeoSpatialIndexCreator h3IndexCreator = indexCreatorProvider.newGeoSpatialIndexCreator(
+            IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
+                .build().forGeospatialIndex(_h3Configs.get(columnName)))) {
       int numDocs = columnMetadata.getTotalDocs();
       for (int i = 0; i < numDocs; i++) {
         h3IndexCreator.add(GeometrySerializer.deserialize(forwardIndexReader.getBytes(i, readerContext)));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
index 693ca32..91d3ae7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -30,6 +30,7 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.InvertedIndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
@@ -45,32 +46,32 @@ import org.slf4j.LoggerFactory;
 public class InvertedIndexHandler implements IndexHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(InvertedIndexHandler.class);
 
-  private final File _indexDir;
   private final SegmentMetadata _segmentMetadata;
-  private final SegmentDirectory.Writer _segmentWriter;
   private final HashSet<String> _columnsToAddIdx;
-  private final InvertedIndexCreatorProvider _indexCreatorProvider;
 
-  public InvertedIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter, InvertedIndexCreatorProvider indexCreatorProvider) {
-    _indexDir = indexDir;
+  public InvertedIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
     _segmentMetadata = segmentMetadata;
-    _segmentWriter = segmentWriter;
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getInvertedIndexColumns());
-    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
-  public void updateIndices()
+  public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+    Set<String> existingColumns =
+        segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX);
+    return !existingColumns.equals(_columnsToAddIdx);
+  }
+
+  @Override
+  public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
       throws IOException {
     // Remove indices not set in table config any more.
     String segmentName = _segmentMetadata.getName();
     Set<String> existingColumns =
-        _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX);
+        segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX);
     for (String column : existingColumns) {
       if (!_columnsToAddIdx.remove(column)) {
         LOGGER.info("Removing existing inverted index from segment: {}, column: {}", segmentName, column);
-        _segmentWriter.removeIndex(column, ColumnIndexType.INVERTED_INDEX);
+        segmentWriter.removeIndex(column, ColumnIndexType.INVERTED_INDEX);
         LOGGER.info("Removed existing inverted index from segment: {}, column: {}", segmentName, column);
       }
     }
@@ -78,17 +79,19 @@ public class InvertedIndexHandler implements IndexHandler {
       ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
       // Only create inverted index on dictionary-encoded unsorted columns.
       if (columnMetadata != null && !columnMetadata.isSorted() && columnMetadata.hasDictionary()) {
-        createInvertedIndexForColumn(columnMetadata);
+        createInvertedIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
       }
     }
   }
 
-  private void createInvertedIndexForColumn(ColumnMetadata columnMetadata)
+  private void createInvertedIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      InvertedIndexCreatorProvider indexCreatorProvider)
       throws IOException {
+    File indexDir = _segmentMetadata.getIndexDir();
     String segmentName = _segmentMetadata.getName();
-    String column = columnMetadata.getColumnName();
-    File inProgress = new File(_indexDir, column + ".inv.inprogress");
-    File invertedIndexFile = new File(_indexDir, column + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
+    String columnName = columnMetadata.getColumnName();
+    File inProgress = new File(indexDir, columnName + ".inv.inprogress");
+    File invertedIndexFile = new File(indexDir, columnName + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
 
     if (!inProgress.exists()) {
       // Marker file does not exist, which means last run ended normally.
@@ -102,12 +105,12 @@ public class InvertedIndexHandler implements IndexHandler {
     }
 
     // Create new inverted index for the column.
-    LOGGER.info("Creating new inverted index for segment: {}, column: {}", segmentName, column);
+    LOGGER.info("Creating new inverted index for segment: {}, column: {}", segmentName, columnName);
     int numDocs = columnMetadata.getTotalDocs();
-    try (DictionaryBasedInvertedIndexCreator creator = _indexCreatorProvider.newInvertedIndexCreator(
-        IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+    try (DictionaryBasedInvertedIndexCreator creator = indexCreatorProvider.newInvertedIndexCreator(
+        IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
             .forInvertedIndex())) {
-      try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+      try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
           ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
         if (columnMetadata.isSingleValue()) {
           // Single-value column.
@@ -128,12 +131,12 @@ public class InvertedIndexHandler implements IndexHandler {
 
     // For v3, write the generated inverted index file into the single file and remove it.
     if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
-      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, invertedIndexFile, ColumnIndexType.INVERTED_INDEX);
+      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, invertedIndexFile, ColumnIndexType.INVERTED_INDEX);
     }
 
     // Delete the marker file.
     FileUtils.deleteQuietly(inProgress);
 
-    LOGGER.info("Created inverted index for segment: {}, column: {}", segmentName, column);
+    LOGGER.info("Created inverted index for segment: {}, column: {}", segmentName, columnName);
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
index 27b8ade..c114247 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
@@ -31,6 +31,7 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.JsonIndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
@@ -48,48 +49,49 @@ import org.slf4j.LoggerFactory;
 public class JsonIndexHandler implements IndexHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(JsonIndexHandler.class);
 
-  private final File _indexDir;
   private final SegmentMetadata _segmentMetadata;
-  private final SegmentDirectory.Writer _segmentWriter;
   private final HashSet<String> _columnsToAddIdx;
-  private final JsonIndexCreatorProvider _indexCreatorProvider;
 
-  public JsonIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter, JsonIndexCreatorProvider indexCreatorProvider) {
-    _indexDir = indexDir;
+  public JsonIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
     _segmentMetadata = segmentMetadata;
-    _segmentWriter = segmentWriter;
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getJsonIndexColumns());
-    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
-  public void updateIndices()
+  public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+    Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.JSON_INDEX);
+    return !existingColumns.equals(_columnsToAddIdx);
+  }
+
+  @Override
+  public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
       throws Exception {
     // Remove indices not set in table config any more
     String segmentName = _segmentMetadata.getName();
-    Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.JSON_INDEX);
+    Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.JSON_INDEX);
     for (String column : existingColumns) {
       if (!_columnsToAddIdx.remove(column)) {
         LOGGER.info("Removing existing json index from segment: {}, column: {}", segmentName, column);
-        _segmentWriter.removeIndex(column, ColumnIndexType.JSON_INDEX);
+        segmentWriter.removeIndex(column, ColumnIndexType.JSON_INDEX);
         LOGGER.info("Removed existing json index from segment: {}, column: {}", segmentName, column);
       }
     }
     for (String column : _columnsToAddIdx) {
       ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
       if (columnMetadata != null) {
-        createJsonIndexForColumn(columnMetadata);
+        createJsonIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
       }
     }
   }
 
-  private void createJsonIndexForColumn(ColumnMetadata columnMetadata)
+  private void createJsonIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      JsonIndexCreatorProvider indexCreatorProvider)
       throws Exception {
+    File indexDir = _segmentMetadata.getIndexDir();
     String segmentName = _segmentMetadata.getName();
     String columnName = columnMetadata.getColumnName();
-    File inProgress = new File(_indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION + ".inprogress");
-    File jsonIndexFile = new File(_indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    File inProgress = new File(indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION + ".inprogress");
+    File jsonIndexFile = new File(indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
 
     if (!inProgress.exists()) {
       // Marker file does not exist, which means last run ended normally.
@@ -108,14 +110,14 @@ public class JsonIndexHandler implements IndexHandler {
             || columnMetadata.getDataType() == DataType.JSON),
         "Json index can only be applied to single-value STRING or JSON columns");
     if (columnMetadata.hasDictionary()) {
-      handleDictionaryBasedColumn(columnMetadata);
+      handleDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
     } else {
-      handleNonDictionaryBasedColumn(columnMetadata);
+      handleNonDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
     }
 
     // For v3, write the generated json index file into the single file and remove it.
     if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
-      LoaderUtils.writeIndexToV3Format(_segmentWriter, columnName, jsonIndexFile, ColumnIndexType.JSON_INDEX);
+      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, jsonIndexFile, ColumnIndexType.JSON_INDEX);
     }
 
     // Delete the marker file.
@@ -124,13 +126,15 @@ public class JsonIndexHandler implements IndexHandler {
     LOGGER.info("Created json index for segment: {}, column: {}", segmentName, columnName);
   }
 
-  private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+  private void handleDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      JsonIndexCreatorProvider indexCreatorProvider)
       throws IOException {
-    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+    File indexDir = _segmentMetadata.getIndexDir();
+    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata);
-        JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
-            .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
+        Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata);
+        JsonIndexCreator jsonIndexCreator = indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+            .withIndexDir(indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
       int numDocs = columnMetadata.getTotalDocs();
       for (int i = 0; i < numDocs; i++) {
         int dictId = forwardIndexReader.getDictId(i, readerContext);
@@ -140,12 +144,14 @@ public class JsonIndexHandler implements IndexHandler {
     }
   }
 
-  private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+  private void handleNonDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      JsonIndexCreatorProvider indexCreatorProvider)
       throws IOException {
-    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+    File indexDir = _segmentMetadata.getIndexDir();
+    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
-            .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
+        JsonIndexCreator jsonIndexCreator = indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+            .withIndexDir(indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
       int numDocs = columnMetadata.getTotalDocs();
       for (int i = 0; i < numDocs; i++) {
         jsonIndexCreator.add(forwardIndexReader.getString(i, readerContext));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
index 3d5551d..95745aa 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -30,6 +30,7 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.RangeIndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
@@ -45,33 +46,32 @@ import org.slf4j.LoggerFactory;
 public class RangeIndexHandler implements IndexHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexHandler.class);
 
-  private final File _indexDir;
   private final SegmentMetadata _segmentMetadata;
-  private final SegmentDirectory.Writer _segmentWriter;
   private final Set<String> _columnsToAddIdx;
   private final int _rangeIndexVersion;
-  private final RangeIndexCreatorProvider _indexCreatorProvider;
 
-  public RangeIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter, RangeIndexCreatorProvider indexCreatorProvider) {
-    _indexDir = indexDir;
+  public RangeIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
     _segmentMetadata = segmentMetadata;
-    _segmentWriter = segmentWriter;
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getRangeIndexColumns());
     _rangeIndexVersion = indexLoadingConfig.getRangeIndexVersion();
-    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
-  public void updateIndices()
+  public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+    Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.RANGE_INDEX);
+    return !existingColumns.equals(_columnsToAddIdx);
+  }
+
+  @Override
+  public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
       throws IOException {
     // Remove indices not set in table config any more
     String segmentName = _segmentMetadata.getName();
-    Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.RANGE_INDEX);
+    Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.RANGE_INDEX);
     for (String column : existingColumns) {
       if (!_columnsToAddIdx.remove(column)) {
         LOGGER.info("Removing existing range index from segment: {}, column: {}", segmentName, column);
-        _segmentWriter.removeIndex(column, ColumnIndexType.RANGE_INDEX);
+        segmentWriter.removeIndex(column, ColumnIndexType.RANGE_INDEX);
         LOGGER.info("Removed existing range index from segment: {}, column: {}", segmentName, column);
       }
     }
@@ -79,17 +79,19 @@ public class RangeIndexHandler implements IndexHandler {
       ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
       // Only create range index on dictionary-encoded unsorted columns
       if (columnMetadata != null && !columnMetadata.isSorted()) {
-        createRangeIndexForColumn(columnMetadata);
+        createRangeIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
       }
     }
   }
 
-  private void createRangeIndexForColumn(ColumnMetadata columnMetadata)
+  private void createRangeIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      RangeIndexCreatorProvider indexCreatorProvider)
       throws IOException {
+    File indexDir = _segmentMetadata.getIndexDir();
     String segmentName = _segmentMetadata.getName();
-    String column = columnMetadata.getColumnName();
-    File inProgress = new File(_indexDir, column + ".range.inprogress");
-    File rangeIndexFile = new File(_indexDir, column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION);
+    String columnName = columnMetadata.getColumnName();
+    File inProgress = new File(indexDir, columnName + ".range.inprogress");
+    File rangeIndexFile = new File(indexDir, columnName + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION);
 
     if (!inProgress.exists()) {
       // Marker file does not exist, which means last run ended normally.
@@ -103,30 +105,31 @@ public class RangeIndexHandler implements IndexHandler {
     }
 
     // Create new range index for the column.
-    LOGGER.info("Creating new range index for segment: {}, column: {}", segmentName, column);
+    LOGGER.info("Creating new range index for segment: {}, column: {}", segmentName, columnName);
     if (columnMetadata.hasDictionary()) {
-      handleDictionaryBasedColumn(columnMetadata);
+      handleDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
     } else {
-      handleNonDictionaryBasedColumn(columnMetadata);
+      handleNonDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
     }
 
     // For v3, write the generated range index file into the single file and remove it.
     if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
-      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, rangeIndexFile, ColumnIndexType.RANGE_INDEX);
+      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, rangeIndexFile, ColumnIndexType.RANGE_INDEX);
     }
 
     // Delete the marker file.
     FileUtils.deleteQuietly(inProgress);
 
-    LOGGER.info("Created range index for segment: {}, column: {}", segmentName, column);
+    LOGGER.info("Created range index for segment: {}, column: {}", segmentName, columnName);
   }
 
-  private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+  private void handleDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      RangeIndexCreatorProvider indexCreatorProvider)
       throws IOException {
     int numDocs = columnMetadata.getTotalDocs();
-    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) {
+        CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata, indexCreatorProvider)) {
       if (columnMetadata.isSingleValue()) {
         // Single-value column
         for (int i = 0; i < numDocs; i++) {
@@ -144,12 +147,13 @@ public class RangeIndexHandler implements IndexHandler {
     }
   }
 
-  private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+  private void handleNonDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      RangeIndexCreatorProvider indexCreatorProvider)
       throws IOException {
     int numDocs = columnMetadata.getTotalDocs();
-    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) {
+        CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata, indexCreatorProvider)) {
       if (columnMetadata.isSingleValue()) {
         // Single-value column.
         switch (columnMetadata.getDataType()) {
@@ -216,10 +220,12 @@ public class RangeIndexHandler implements IndexHandler {
     }
   }
 
-  private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata)
+  private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata,
+      RangeIndexCreatorProvider indexCreatorProvider)
       throws IOException {
-    return _indexCreatorProvider.newRangeIndexCreator(
-        IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+    File indexDir = _segmentMetadata.getIndexDir();
+    return indexCreatorProvider.newRangeIndexCreator(
+        IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
             .forRangeIndex(_rangeIndexVersion, columnMetadata.getMinValue(), columnMetadata.getMaxValue()));
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
index 6e7c64e..9f81933 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -47,6 +47,7 @@ import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
 import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -82,31 +83,30 @@ import org.slf4j.LoggerFactory;
 public class TextIndexHandler implements IndexHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(TextIndexHandler.class);
 
-  private final File _indexDir;
   private final SegmentMetadata _segmentMetadata;
-  private final SegmentDirectory.Writer _segmentWriter;
   private final Set<String> _columnsToAddIdx;
-  private final TextIndexCreatorProvider _textIndexCreatorProvider;
 
-  public TextIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter, TextIndexCreatorProvider textIndexCreatorProvider) {
-    _indexDir = indexDir;
+  public TextIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
     _segmentMetadata = segmentMetadata;
-    _segmentWriter = segmentWriter;
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getTextIndexColumns());
-    _textIndexCreatorProvider = textIndexCreatorProvider;
   }
 
   @Override
-  public void updateIndices()
+  public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+    Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.TEXT_INDEX);
+    return !existingColumns.equals(_columnsToAddIdx);
+  }
+
+  @Override
+  public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
       throws Exception {
     // Remove indices not set in table config any more
     String segmentName = _segmentMetadata.getName();
-    Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.TEXT_INDEX);
+    Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.TEXT_INDEX);
     for (String column : existingColumns) {
       if (!_columnsToAddIdx.remove(column)) {
         LOGGER.info("Removing existing text index from segment: {}, column: {}", segmentName, column);
-        _segmentWriter.removeIndex(column, ColumnIndexType.TEXT_INDEX);
+        segmentWriter.removeIndex(column, ColumnIndexType.TEXT_INDEX);
         LOGGER.info("Removed existing text index from segment: {}, column: {}", segmentName, column);
       }
     }
@@ -114,7 +114,7 @@ public class TextIndexHandler implements IndexHandler {
       ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
       if (columnMetadata != null) {
         checkUnsupportedOperationsForTextIndex(columnMetadata);
-        createTextIndexForColumn(columnMetadata);
+        createTextIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
       }
     }
   }
@@ -131,38 +131,42 @@ public class TextIndexHandler implements IndexHandler {
     }
   }
 
-  private void createTextIndexForColumn(ColumnMetadata columnMetadata)
+  private void createTextIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+      TextIndexCreatorProvider indexCreatorProvider)
       throws Exception {
+    File indexDir = _segmentMetadata.getIndexDir();
     String segmentName = _segmentMetadata.getName();
-    String column = columnMetadata.getColumnName();
+    String columnName = columnMetadata.getColumnName();
     int numDocs = columnMetadata.getTotalDocs();
     boolean hasDictionary = columnMetadata.hasDictionary();
-    LOGGER.info("Creating new text index for column: {} in segment: {}, hasDictionary: {}", column, segmentName,
+    LOGGER.info("Creating new text index for column: {} in segment: {}, hasDictionary: {}", columnName, segmentName,
         hasDictionary);
-    File segmentDirectory = SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, _segmentMetadata.getVersion());
+    File segmentDirectory = SegmentDirectoryPaths.segmentDirectoryFor(indexDir, _segmentMetadata.getVersion());
     // The handlers are always invoked by the preprocessor. Before this ImmutableSegmentLoader would have already
     // up-converted the segment from v1/v2 -> v3 (if needed). So based on the segmentVersion, whatever segment
     // segmentDirectory is indicated to us by SegmentDirectoryPaths, we create lucene index there. There is no
     // further need to move around the lucene index directory since it is created with correct directory structure
     // based on segmentVersion.
-    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        TextIndexCreator textIndexCreator = _textIndexCreatorProvider.newTextIndexCreator(IndexCreationContext.builder()
+        TextIndexCreator textIndexCreator = indexCreatorProvider.newTextIndexCreator(IndexCreationContext.builder()
             .withColumnMetadata(columnMetadata).withIndexDir(segmentDirectory).build().forTextIndex(true))) {
       if (columnMetadata.isSingleValue()) {
-        processSVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata);
+        processSVField(segmentWriter, hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs,
+            columnMetadata);
       } else {
-        processMVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata);
+        processMVField(segmentWriter, hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs,
+            columnMetadata);
       }
       textIndexCreator.seal();
     }
 
-    LOGGER.info("Created text index for column: {} in segment: {}", column, segmentName);
+    LOGGER.info("Created text index for column: {} in segment: {}", columnName, segmentName);
   }
 
-  private void processSVField(boolean hasDictionary, ForwardIndexReader forwardIndexReader,
-      ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator, int numDocs,
-      ColumnMetadata columnMetadata)
+  private void processSVField(SegmentDirectory.Writer segmentWriter, boolean hasDictionary,
+      ForwardIndexReader forwardIndexReader, ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator,
+      int numDocs, ColumnMetadata columnMetadata)
       throws IOException {
     if (!hasDictionary) {
       // text index on raw column, just read the raw forward index
@@ -173,7 +177,7 @@ public class TextIndexHandler implements IndexHandler {
       // text index on dictionary encoded SV column
       // read forward index to get dictId
       // read the raw value from dictionary using dictId
-      try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
+      try (Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata)) {
         for (int docId = 0; docId < numDocs; docId++) {
           int dictId = forwardIndexReader.getDictId(docId, readerContext);
           textIndexCreator.add(dictionary.getStringValue(dictId));
@@ -182,9 +186,9 @@ public class TextIndexHandler implements IndexHandler {
     }
   }
 
-  private void processMVField(boolean hasDictionary, ForwardIndexReader forwardIndexReader,
-      ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator, int numDocs,
-      ColumnMetadata columnMetadata)
+  private void processMVField(SegmentDirectory.Writer segmentWriter, boolean hasDictionary,
+      ForwardIndexReader forwardIndexReader, ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator,
+      int numDocs, ColumnMetadata columnMetadata)
       throws IOException {
     if (!hasDictionary) {
       // text index on raw column, just read the raw forward index
@@ -197,7 +201,7 @@ public class TextIndexHandler implements IndexHandler {
       // text index on dictionary encoded MV column
       // read forward index to get dictId
       // read the raw value from dictionary using dictId
-      try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
+      try (Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata)) {
         int maxNumEntries = columnMetadata.getMaxNumberOfMultiValues();
         int[] dictIdBuffer = new int[maxNumEntries];
         String[] valueBuffer = new String[maxNumEntries];
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
index 820317d..8ec95e0 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
@@ -62,6 +62,8 @@ import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
 import static org.apache.pinot.segment.spi.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 
 public class LoaderTest {
@@ -144,6 +146,31 @@ public class LoaderTest {
     Assert.assertFalse(v3TempDir.exists());
   }
 
+  @Test
+  public void testIfNeedConvertSegmentFormat()
+      throws Exception {
+    constructV1Segment();
+
+    // The newly generated segment is consistent with table config and schema, thus
+    // in follow checks, whether it needs reprocess or not depends on segment format.
+    SegmentDirectory segmentDir = _localSegmentDirectoryLoader.load(_indexDir.toURI(),
+        new SegmentDirectoryLoaderContext(null, null, null, _pinotConfiguration));
+
+    // The segmentVersionToLoad is null, not leading to reprocess.
+    assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDir, new IndexLoadingConfig(), null));
+
+    // The segmentVersionToLoad is v1, not leading to reprocess.
+    assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDir, _v1IndexLoadingConfig, null));
+
+    // The segmentVersionToLoad is v3, leading to reprocess.
+    assertTrue(ImmutableSegmentLoader.needPreprocess(segmentDir, _v3IndexLoadingConfig, null));
+
+    // The segment is in v3 format now, not leading to reprocess.
+    ImmutableSegmentLoader.load(_indexDir, _v3IndexLoadingConfig);
+    segmentDir.reloadMetadata();
+    assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDir, _v3IndexLoadingConfig, null));
+  }
+
   private void testConversion()
       throws Exception {
     // Do not set segment version, should not convert the segment
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index f985ba9..df231b3 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.configuration.PropertiesConfiguration;
@@ -49,6 +50,7 @@ import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -783,16 +785,7 @@ public class SegmentPreProcessorTest {
     constructV1Segment();
 
     // Remove min/max value from the metadata
-    PropertiesConfiguration configuration = SegmentMetadataImpl.getPropertiesConfiguration(_indexDir);
-    Iterator<String> keys = configuration.getKeys();
-    while (keys.hasNext()) {
-      String key = keys.next();
-      if (key.endsWith(V1Constants.MetadataKeys.Column.MIN_VALUE) || key.endsWith(
-          V1Constants.MetadataKeys.Column.MAX_VALUE)) {
-        configuration.clearProperty(key);
-      }
-    }
-    configuration.save();
+    removeMinMaxValuesFromMetadataFile(_indexDir);
 
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
     indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NONE);
@@ -1080,4 +1073,129 @@ public class SegmentPreProcessorTest {
     }
     assertEquals(singleFileIndex.length(), initFileSize);
   }
+
+  @Test
+  public void testV1IfNeedProcess()
+      throws Exception {
+    constructV1Segment();
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    assertEquals(segmentMetadata.getVersion(), SegmentVersion.v1);
+
+    testIfNeedProcess();
+  }
+
+  @Test
+  public void testV3IfNeedProcess()
+      throws Exception {
+    constructV3Segment();
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
+
+    testIfNeedProcess();
+  }
+
+  private void testIfNeedProcess()
+      throws Exception {
+    // There are a few indices initially. Require to remove them with an empty IndexLoadingConfig.
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) {
+      assertTrue(processor.needProcess());
+      processor.process();
+      assertFalse(processor.needProcess());
+    }
+
+    // Require to add some default columns with new schema.
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(),
+            _newColumnsSchemaWithH3Json)) {
+      assertTrue(processor.needProcess());
+      processor.process();
+      assertFalse(processor.needProcess());
+    }
+
+    // Require to add different types of indices. Add one new index a time
+    // to test the index handlers separately.
+    IndexLoadingConfig config = new IndexLoadingConfig();
+    for (Runnable prepFunc : createConfigPrepFunctions(config)) {
+      prepFunc.run();
+      try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+          .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+          SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, config,
+              _newColumnsSchemaWithH3Json)) {
+        assertTrue(processor.needProcess());
+        processor.process();
+        assertFalse(processor.needProcess());
+      }
+    }
+
+    // Require to add startree index.
+    IndexingConfig indexingConfig = new IndexingConfig();
+    indexingConfig.setEnableDynamicStarTreeCreation(true);
+    indexingConfig.setEnableDefaultStarTree(true);
+    _tableConfig.setIndexingConfig(indexingConfig);
+    IndexLoadingConfig configWithStarTreeIndex = new IndexLoadingConfig(null, _tableConfig);
+    createConfigPrepFunctions(configWithStarTreeIndex).forEach(Runnable::run);
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, configWithStarTreeIndex,
+            _newColumnsSchemaWithH3Json)) {
+      assertTrue(processor.needProcess());
+      processor.process();
+      assertFalse(processor.needProcess());
+    }
+
+    // Require to update min and max values.
+    removeMinMaxValuesFromMetadataFile(_indexDir);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    segmentMetadata.getColumnMetadataMap().forEach((k, v) -> {
+      assertNull(v.getMinValue(), "checking column: " + k);
+      assertNull(v.getMaxValue(), "checking column: " + k);
+    });
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, configWithStarTreeIndex,
+            _newColumnsSchemaWithH3Json)) {
+      assertTrue(processor.needProcess());
+      processor.process();
+    }
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    segmentMetadata.getColumnMetadataMap().forEach((k, v) -> {
+      if (v.hasDictionary()) {
+        assertNotNull(v.getMinValue(), "checking column: " + k);
+        assertNotNull(v.getMaxValue(), "checking column: " + k);
+      } else {
+        assertNull(v.getMinValue(), "checking column: " + k);
+        assertNull(v.getMaxValue(), "checking column: " + k);
+      }
+    });
+  }
+
+  private static void removeMinMaxValuesFromMetadataFile(File indexDir)
+      throws Exception {
+    PropertiesConfiguration configuration = SegmentMetadataImpl.getPropertiesConfiguration(indexDir);
+    Iterator<String> keys = configuration.getKeys();
+    while (keys.hasNext()) {
+      String key = keys.next();
+      if (key.endsWith(V1Constants.MetadataKeys.Column.MIN_VALUE) || key.endsWith(
+          V1Constants.MetadataKeys.Column.MAX_VALUE)) {
+        configuration.clearProperty(key);
+      }
+    }
+    configuration.save();
+  }
+
+  private static List<Runnable> createConfigPrepFunctions(IndexLoadingConfig config) {
+    return Arrays.asList(
+        () -> config.setInvertedIndexColumns(new HashSet<>(Collections.singletonList("column3"))),
+        () -> config.setRangeIndexColumns(new HashSet<>(Collections.singletonList("column3"))),
+        () -> config.setTextIndexColumns(new HashSet<>(Collections.singletonList("column3"))),
+        () -> config.setFSTIndexColumns(new HashSet<>(Collections.singletonList("column3"))),
+        () -> config.setBloomFilterConfigs(ImmutableMap.of("column3", new BloomFilterConfig(0.1, 1024, true))),
+        () -> config
+            .setH3IndexConfigs(ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))),
+        () -> config.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol")))
+    );
+  }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
index 60de836..7f0dc76 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
@@ -157,6 +157,10 @@ public abstract class SegmentDirectory implements Closeable {
 
     public abstract boolean hasIndexFor(String column, ColumnIndexType type);
 
+    public SegmentDirectory toSegmentDirectory() {
+      return SegmentDirectory.this;
+    }
+
     public abstract String toString();
   }
 
@@ -192,10 +196,6 @@ public abstract class SegmentDirectory implements Closeable {
     public abstract void save()
         throws IOException;
 
-    public SegmentDirectory toSegmentDirectory() {
-      return SegmentDirectory.this;
-    }
-
     public abstract String toString();
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org