You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/29 21:06:49 UTC

[pinot] branch master updated: ForwardIndexHandler: Change compressionType during segmentReload (#9454)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e917bdc5b7 ForwardIndexHandler: Change compressionType during segmentReload (#9454)
e917bdc5b7 is described below

commit e917bdc5b71df66f46cb805d3a409b7850f2b7a0
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Thu Sep 29 14:06:41 2022 -0700

    ForwardIndexHandler: Change compressionType during segmentReload (#9454)
    
    * FrwardIndexHandler: Allow changing compressionType for SV columns on reload
    
    * Address review comments.
    
    * Add more tests and address review comments
---
 .../segment/index/loader/ForwardIndexHandler.java  | 273 ++++++++++++
 .../local/segment/index/loader/IndexHandler.java   |   3 +-
 .../segment/index/loader/IndexHandlerFactory.java  |   2 +
 .../segment/index/loader/IndexLoadingConfig.java   |  55 +++
 .../segment/index/loader/SegmentPreProcessor.java  |   4 +-
 .../forward/BaseChunkForwardIndexReader.java       |  20 +-
 .../index/loader/ForwardIndexHandlerTest.java      | 475 +++++++++++++++++++++
 .../index/loader/SegmentPreProcessorTest.java      | 154 +++++--
 .../spi/index/reader/ForwardIndexReader.java       |  19 +
 .../pinot/segment/spi/store/ColumnIndexType.java   |   3 +
 10 files changed, 969 insertions(+), 39 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
new file mode 100644
index 0000000000..d9c317e2e6
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.loader;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.store.ColumnIndexType;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class used by {@link SegmentPreProcessor} to make changes to forward index and dictionary configs. Note
+ * that this handler only works for segment versions >= 3.0. Support for segment version < 3.0 is not added because
+ * majority of the usecases are in versions >= 3.0 and this avoids adding tech debt. The currently supported
+ * operations are:
+ * 1. Change compression on raw SV columns.
+ *
+ *  TODO: Add support for the following:
+ *  1. Change compression for raw MV columns
+ *  2. Enable dictionary
+ *  3. Disable dictionary
+ */
+public class ForwardIndexHandler implements IndexHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ForwardIndexHandler.class);
+
+  private final SegmentMetadata _segmentMetadata;
+  IndexLoadingConfig _indexLoadingConfig;
+
+  protected enum Operation {
+    // TODO: Add other operations like ENABLE_DICTIONARY, DISABLE_DICTIONARY.
+    CHANGE_RAW_INDEX_COMPRESSION_TYPE,
+  }
+
+  public ForwardIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
+    _segmentMetadata = segmentMetadata;
+    _indexLoadingConfig = indexLoadingConfig;
+  }
+
+  @Override
+  public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader)
+      throws Exception {
+    Map<String, Operation> columnOperationMap = computeOperation(segmentReader);
+    return !columnOperationMap.isEmpty();
+  }
+
+  @Override
+  public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
+      throws Exception {
+    Map<String, Operation> columnOperationMap = computeOperation(segmentWriter);
+    if (columnOperationMap.isEmpty()) {
+      return;
+    }
+
+    for (Map.Entry<String, Operation> entry : columnOperationMap.entrySet()) {
+      String column = entry.getKey();
+      Operation operation = entry.getValue();
+
+      switch (operation) {
+        case CHANGE_RAW_INDEX_COMPRESSION_TYPE:
+          rewriteRawForwardIndex(column, segmentWriter, indexCreatorProvider);
+          break;
+        // TODO: Add other operations here.
+        default:
+          throw new IllegalStateException("Unsupported operation for column " + column);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
+      throws Exception {
+    Map<String, Operation> columnOperationMap = new HashMap<>();
+
+    // Does not work for segment versions < V3
+    if (_segmentMetadata.getVersion().compareTo(SegmentVersion.v3) < 0) {
+      return columnOperationMap;
+    }
+
+    // From existing column config.
+    Set<String> existingAllColumns = _segmentMetadata.getAllColumns();
+    Set<String> existingDictColumns =
+        segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.DICTIONARY);
+    Set<String> existingNoDictColumns = new HashSet<>();
+    for (String column : existingAllColumns) {
+      if (!existingDictColumns.contains(column)) {
+        existingNoDictColumns.add(column);
+      }
+    }
+
+    // From new column config.
+    Set<String> newNoDictColumns = _indexLoadingConfig.getNoDictionaryColumns();
+
+    for (String column : existingAllColumns) {
+      if (existingNoDictColumns.contains(column) && newNoDictColumns.contains(column)) {
+        // Both existing and new column is RAW forward index encoded. Check if compression needs to be changed.
+        if (shouldChangeCompressionType(column, segmentReader)) {
+          columnOperationMap.put(column, Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+        }
+      }
+    }
+
+    return columnOperationMap;
+  }
+
+  private boolean shouldChangeCompressionType(String column, SegmentDirectory.Reader segmentReader) throws Exception {
+    ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column);
+
+    // TODO: Remove this MV column limitation.
+    if (!existingColMetadata.isSingleValue()) {
+      return false;
+    }
+
+    // The compression type for an existing segment can only be determined by reading the forward index header.
+    try (ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(segmentReader, existingColMetadata)) {
+      ChunkCompressionType existingCompressionType = fwdIndexReader.getCompressionType();
+      Preconditions.checkState(existingCompressionType != null,
+          "Existing compressionType cannot be null for raw forward index column=" + column);
+
+      // Get the new compression type.
+      ChunkCompressionType newCompressionType = null;
+      Map<String, ChunkCompressionType> newCompressionConfigs = _indexLoadingConfig.getCompressionConfigs();
+      if (newCompressionConfigs.containsKey(column)) {
+        newCompressionType = newCompressionConfigs.get(column);
+      }
+
+      // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
+      // compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting the all
+      // forward indexes during segmentReload when the default compressionType changes.
+      if (newCompressionType == null || existingCompressionType == newCompressionType) {
+        return false;
+      }
+
+      return true;
+    }
+  }
+
+  private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segmentWriter,
+      IndexCreatorProvider indexCreatorProvider)
+      throws Exception {
+    Preconditions.checkState(_segmentMetadata.getVersion() == SegmentVersion.v3);
+
+    ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column);
+    File indexDir = _segmentMetadata.getIndexDir();
+    String segmentName = _segmentMetadata.getName();
+    File inProgress = new File(indexDir, column + ".fwd.inprogress");
+    File fwdIndexFile = new File(indexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run was interrupted.
+      // Remove forward index if exists.
+      FileUtils.deleteQuietly(fwdIndexFile);
+    }
+
+    LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column);
+
+    Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs();
+    Preconditions.checkState(compressionConfigs.containsKey(column));
+    // At this point, compressionConfigs is guaranteed to contain the column. If there's no entry in the map, we
+    // wouldn't have computed the CHANGE_RAW_COMPRESSION_TYPE operation for this column as compressionType changes
+    // are processed only if a valid compressionType is specified in fieldConfig.
+    ChunkCompressionType newCompressionType = compressionConfigs.get(column);
+
+    int numDocs = existingColMetadata.getTotalDocs();
+
+    try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+      int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
+      Preconditions.checkState(lengthOfLongestEntry >= 0,
+          "lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column={}" + column);
+
+      IndexCreationContext.Forward context =
+          IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
+              .withLengthOfLongestEntry(lengthOfLongestEntry).build()
+              .forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties());
+
+      try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) {
+        // If creator stored type and the reader stored type do not match, throw an exception.
+        if (!reader.getStoredType().equals(creator.getValueType())) {
+          String failureMsg =
+              "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType()
+                  .toString() + " to " + creator.getValueType().toString();
+          throw new UnsupportedOperationException(failureMsg);
+        }
+
+        PinotSegmentColumnReader columnReader =
+            new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues());
+
+        for (int i = 0; i < numDocs; i++) {
+          Object val = columnReader.getValue(i);
+
+          // JSON fields are either stored as string or bytes. No special handling is needed because we make this
+          // decision based on the storedType of the reader.
+          switch (reader.getStoredType()) {
+            case INT:
+              creator.putInt((int) val);
+              break;
+            case LONG:
+              creator.putLong((long) val);
+              break;
+            case FLOAT:
+              creator.putFloat((float) val);
+              break;
+            case DOUBLE:
+              creator.putDouble((double) val);
+              break;
+            case STRING:
+              creator.putString((String) val);
+              break;
+            case BYTES:
+              creator.putBytes((byte[]) val);
+              break;
+            case BIG_DECIMAL:
+              creator.putBigDecimal((BigDecimal) val);
+              break;
+            default:
+              throw new IllegalStateException();
+          }
+        }
+      }
+    }
+
+    // We used the existing forward index to generate a new forward index. The existing forward index will be in V3
+    // format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed
+    // anymore. Note that removeIndex() will only mark an index for removal and remove the in-memory state. The
+    // actual cleanup from columns.psf file will happen when singleFileIndexDirectory.cleanupRemovedIndices() is
+    // called during segmentWriter.close().
+    segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
+    LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX);
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created forward index for segment: {}, column: {}", segmentName, column);
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
index 5b2ba4706f..76132a7767 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
@@ -37,5 +37,6 @@ public interface IndexHandler {
    * Check if there is a need to add new indices or removes obsolete indices.
    * @return true if there is a need to update.
    */
-  boolean needUpdateIndices(SegmentDirectory.Reader segmentReader);
+  boolean needUpdateIndices(SegmentDirectory.Reader segmentReader)
+      throws Exception;
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
index bc48ff9771..6be28d8bb9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
@@ -63,6 +63,8 @@ public class IndexHandlerFactory {
         return new H3IndexHandler(segmentMetadata, indexLoadingConfig);
       case BLOOM_FILTER:
         return new BloomFilterHandler(segmentMetadata, indexLoadingConfig);
+      case FORWARD_INDEX:
+        return new ForwardIndexHandler(segmentMetadata, indexLoadingConfig);
       default:
         return NO_OP_HANDLER;
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index d5c4b0d7bc..78dc118a78 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
 import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
 import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
@@ -75,6 +76,7 @@ public class IndexLoadingConfig {
   private boolean _enableDynamicStarTreeCreation;
   private List<StarTreeIndexConfig> _starTreeIndexConfigs;
   private boolean _enableDefaultStarTree;
+  private Map<String, ChunkCompressionType> _compressionConfigs = new HashMap<>();
 
   private SegmentVersion _segmentVersion;
   private ColumnMinMaxValueGeneratorMode _columnMinMaxValueGeneratorMode = ColumnMinMaxValueGeneratorMode.DEFAULT_MODE;
@@ -153,6 +155,7 @@ public class IndexLoadingConfig {
       }
     }
 
+    extractCompressionConfigs(tableConfig);
     extractTextIndexColumnsFromTableConfig(tableConfig);
     extractFSTIndexColumnsFromTableConfig(tableConfig);
     extractH3IndexConfigsFromTableConfig(tableConfig);
@@ -215,6 +218,28 @@ public class IndexLoadingConfig {
     }
   }
 
+  /**
+   * Extracts compressionType for each column. Populates a map containing column name as key and compression type as
+   * value. This map will only contain the compressionType overrides, and it does not correspond to the default value
+   * of compressionType (derived using SegmentColumnarIndexCreator.getColumnCompressionType())  used for a column.
+   * Note that only RAW forward index columns will be populated in this map.
+   * @param tableConfig table config
+   */
+  private void extractCompressionConfigs(TableConfig tableConfig) {
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList == null) {
+      return;
+    }
+
+    for (FieldConfig fieldConfig : fieldConfigList) {
+      String column = fieldConfig.getName();
+      if (fieldConfig.getCompressionCodec() != null) {
+        ChunkCompressionType compressionType = ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name());
+        _compressionConfigs.put(column, compressionType);
+      }
+    }
+  }
+
   /**
    * Text index creation info for each column is specified
    * using {@link FieldConfig} model of indicating per column
@@ -371,6 +396,24 @@ public class IndexLoadingConfig {
     _invertedIndexColumns = invertedIndexColumns;
   }
 
+  /**
+   * For tests only.
+   * Used by segmentPreProcessorTest to set raw columns.
+   */
+  @VisibleForTesting
+  public void setNoDictionaryColumns(Set<String> noDictionaryColumns) {
+    _noDictionaryColumns = noDictionaryColumns;
+  }
+
+  /**
+   * For tests only.
+   * Used by segmentPreProcessorTest to set compression configs.
+   */
+  @VisibleForTesting
+  public void setCompressionConfigs(Map<String, ChunkCompressionType> compressionConfigs) {
+    _compressionConfigs = compressionConfigs;
+  }
+
   /**
    * For tests only.
    */
@@ -424,6 +467,18 @@ public class IndexLoadingConfig {
     return _noDictionaryColumns;
   }
 
+  /**
+   * Populates a map containing column name as key and compression type as value. This map will only contain the
+   * compressionType overrides, and it does not correspond to the default value of compressionType (derived using
+   * SegmentColumnarIndexCreator.getColumnCompressionType())  used for a column. Note that only RAW forward index
+   * columns will be populated in this map.
+   *
+   * @return a map containing column name as key and compressionType as value.
+   */
+  public Map<String, ChunkCompressionType> getCompressionConfigs() {
+    return _compressionConfigs;
+  }
+
   public Map<String, String> getnoDictionaryConfig() {
     return _noDictionaryConfig;
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
index 202f41c2b2..3da4c2b524 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
@@ -104,8 +104,8 @@ public class SegmentPreProcessor implements AutoCloseable {
       // Update single-column indices, like inverted index, json index etc.
       IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
       for (ColumnIndexType type : ColumnIndexType.values()) {
-        IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig)
-            .updateIndices(segmentWriter, indexCreatorProvider);
+        IndexHandler handler = IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig);
+        handler.updateIndices(segmentWriter, indexCreatorProvider);
       }
 
       // Create/modify/remove star-trees if required.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
index beba74f9a2..e37fb4a987 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
@@ -48,6 +48,7 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<
   protected final int _numDocsPerChunk;
   protected final int _lengthOfLongestEntry;
   protected final boolean _isCompressed;
+  protected final ChunkCompressionType _compressionType;
   protected final ChunkDecompressor _chunkDecompressor;
   protected final PinotDataBuffer _dataHeader;
   protected final int _headerEntryChunkOffsetSize;
@@ -79,15 +80,16 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<
       _dataBuffer.getInt(headerOffset); // Total docs
       headerOffset += Integer.BYTES;
 
-      ChunkCompressionType compressionType = ChunkCompressionType.valueOf(_dataBuffer.getInt(headerOffset));
-      _chunkDecompressor = ChunkCompressorFactory.getDecompressor(compressionType);
-      _isCompressed = !compressionType.equals(ChunkCompressionType.PASS_THROUGH);
+      _compressionType = ChunkCompressionType.valueOf(_dataBuffer.getInt(headerOffset));
+      _chunkDecompressor = ChunkCompressorFactory.getDecompressor(_compressionType);
+      _isCompressed = !_compressionType.equals(ChunkCompressionType.PASS_THROUGH);
 
       headerOffset += Integer.BYTES;
       dataHeaderStart = _dataBuffer.getInt(headerOffset);
     } else {
       _isCompressed = true;
-      _chunkDecompressor = ChunkCompressorFactory.getDecompressor(ChunkCompressionType.SNAPPY);
+      _compressionType = ChunkCompressionType.SNAPPY;
+      _chunkDecompressor = ChunkCompressorFactory.getDecompressor(_compressionType);
     }
 
     _headerEntryChunkOffsetSize = BaseChunkSVForwardIndexWriter.getHeaderEntryChunkOffsetSize(version);
@@ -174,6 +176,16 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<
     return _storedType;
   }
 
+  @Override
+  public ChunkCompressionType getCompressionType() {
+    return _compressionType;
+  }
+
+  @Override
+  public int getLengthOfLongestEntry() {
+    return _lengthOfLongestEntry;
+  }
+
   @Override
   public void readValuesSV(int[] docIds, int length, int[] values, ChunkReaderContext context) {
     if (_storedType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
new file mode 100644
index 0000000000..12bb8b61a2
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.loader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.IndexingOverrides;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class ForwardIndexHandlerTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ForwardIndexHandlerTest");
+  private static final String TABLE_NAME = "myTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  // TODO:
+  // 1. Add other datatypes (double, float, bigdecimal, bytes). Also add MV columns.
+  // 2. Add text index and other index types for raw columns.
+  private static final String DIM_SNAPPY_STRING = "DIM_SNAPPY_STRING";
+  private static final String DIM_PASS_THROUGH_STRING = "DIM_PASS_THROUGH_STRING";
+  private static final String DIM_ZSTANDARD_STRING = "DIM_ZSTANDARD_STRING";
+  private static final String DIM_LZ4_STRING = "DIM_LZ4_STRING";
+
+  private static final String DIM_SNAPPY_LONG = "DIM_SNAPPY_LONG";
+  private static final String DIM_PASS_THROUGH_LONG = "DIM_PASS_THROUGH_LONG";
+  private static final String DIM_ZSTANDARD_LONG = "DIM_ZSTANDARD_LONG";
+  private static final String DIM_LZ4_LONG = "DIM_LZ4_LONG";
+
+  private static final String DIM_SNAPPY_INTEGER = "DIM_SNAPPY_INTEGER";
+  private static final String DIM_PASS_THROUGH_INTEGER = "DIM_PASS_THROUGH_INTEGER";
+  private static final String DIM_ZSTANDARD_INTEGER = "DIM_ZSTANDARD_INTEGER";
+  private static final String DIM_LZ4_INTEGER = "DIM_LZ4_INTEGER";
+
+  private static final String DIM_DICT_INTEGER = "DIM_DICT_INTEGER";
+  private static final String DIM_DICT_STRING = "DIM_DICT_STRING";
+  private static final String DIM_DICT_LONG = "DIM_DICT_LONG";
+
+  private static final String METRIC_PASSTHROUGH_INTEGER = "METRIC_PASSTHROUGH_INTEGER";
+  private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER";
+  private static final String METRIC_ZSTANDARD_INTEGER = "METRIC_ZSTANDARD_INTEGER";
+  private static final String METRIC_LZ4_INTEGER = "METRIC_LZ4_INTEGER";
+
+  private static final List<String> RAW_SNAPPY_INDEX_COLUMNS =
+      Arrays.asList(DIM_SNAPPY_STRING, DIM_SNAPPY_LONG, DIM_SNAPPY_INTEGER, METRIC_SNAPPY_INTEGER);
+
+  private static final List<String> RAW_ZSTANDARD_INDEX_COLUMNS =
+      Arrays.asList(DIM_ZSTANDARD_STRING, DIM_ZSTANDARD_LONG, DIM_ZSTANDARD_INTEGER, METRIC_ZSTANDARD_INTEGER);
+
+  private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS =
+      Arrays.asList(DIM_PASS_THROUGH_STRING, DIM_PASS_THROUGH_LONG, DIM_PASS_THROUGH_INTEGER,
+          METRIC_PASSTHROUGH_INTEGER);
+
+  private static final List<String> RAW_LZ4_INDEX_COLUMNS =
+      Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER, METRIC_LZ4_INTEGER);
+
+  private final List<String> _noDictionaryColumns = new ArrayList<>();
+  TableConfig _tableConfig;
+  Schema _schema;
+  File _segmentDirectory;
+  private List<FieldConfig.CompressionCodec> _allCompressionTypes =
+      Arrays.asList(FieldConfig.CompressionCodec.values());
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    // Delete index directly if it already exists.
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    buildSegment();
+  }
+
+  private void buildSegment()
+      throws Exception {
+    List<GenericRow> rows = createTestData();
+
+    List<FieldConfig> fieldConfigs = new ArrayList<>(
+        RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size()
+            + RAW_LZ4_INDEX_COLUMNS.size());
+
+    for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
+      fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
+          FieldConfig.CompressionCodec.SNAPPY, null));
+    }
+
+    for (String indexColumn : RAW_ZSTANDARD_INDEX_COLUMNS) {
+      fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
+          FieldConfig.CompressionCodec.ZSTANDARD, null));
+    }
+
+    for (String indexColumn : RAW_PASS_THROUGH_INDEX_COLUMNS) {
+      fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
+          FieldConfig.CompressionCodec.PASS_THROUGH, null));
+    }
+
+    for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) {
+      fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
+          FieldConfig.CompressionCodec.LZ4, null));
+    }
+
+    _noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
+    _noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
+    _noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
+    _noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
+
+    _tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
+            .setFieldConfigList(fieldConfigs).build();
+    _schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(DIM_SNAPPY_STRING, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(DIM_PASS_THROUGH_STRING, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(DIM_ZSTANDARD_STRING, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(DIM_LZ4_STRING, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(DIM_SNAPPY_INTEGER, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIM_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIM_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIM_LZ4_INTEGER, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIM_SNAPPY_LONG, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DIM_ZSTANDARD_LONG, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DIM_PASS_THROUGH_LONG, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DIM_LZ4_LONG, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING)
+        .addMetric(METRIC_PASSTHROUGH_INTEGER, FieldSpec.DataType.INT)
+        .addMetric(METRIC_SNAPPY_INTEGER, FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
+        .addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
+
+        .build();
+
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+
+    _segmentDirectory = new File(INDEX_DIR, driver.getSegmentName());
+  }
+
+  private List<GenericRow> createTestData() {
+    List<GenericRow> rows = new ArrayList<>();
+
+    //Generate random data
+    int rowLength = 1000;
+    Random random = new Random();
+    String[] tempStringRows = new String[rowLength];
+    Integer[] tempIntRows = new Integer[rowLength];
+    Long[] tempLongRows = new Long[rowLength];
+
+    for (int i = 0; i < rowLength; i++) {
+      //Adding a fixed value to check for filter queries
+      if (i % 10 == 0) {
+        tempStringRows[i] = "testRow";
+        tempIntRows[i] = 1001;
+        tempLongRows[i] = 1001L;
+      } else {
+        tempStringRows[i] = "n" + i;
+        tempIntRows[i] = i;
+        tempLongRows[i] = (long) i;
+      }
+    }
+
+    for (int i = 0; i < rowLength; i++) {
+      GenericRow row = new GenericRow();
+
+      // Raw String columns
+      row.putValue(DIM_SNAPPY_STRING, tempStringRows[i]);
+      row.putValue(DIM_ZSTANDARD_STRING, tempStringRows[i]);
+      row.putValue(DIM_PASS_THROUGH_STRING, tempStringRows[i]);
+      row.putValue(DIM_LZ4_STRING, tempStringRows[i]);
+
+      // Raw integer columns
+      row.putValue(DIM_SNAPPY_INTEGER, tempIntRows[i]);
+      row.putValue(DIM_ZSTANDARD_INTEGER, tempIntRows[i]);
+      row.putValue(DIM_PASS_THROUGH_INTEGER, tempIntRows[i]);
+      row.putValue(DIM_LZ4_INTEGER, tempIntRows[i]);
+      row.putValue(METRIC_LZ4_INTEGER, tempIntRows[i]);
+      row.putValue(METRIC_PASSTHROUGH_INTEGER, tempIntRows[i]);
+      row.putValue(METRIC_ZSTANDARD_INTEGER, tempIntRows[i]);
+      row.putValue(METRIC_SNAPPY_INTEGER, tempIntRows[i]);
+
+      // Raw long columns
+      row.putValue(DIM_SNAPPY_LONG, tempLongRows[i]);
+      row.putValue(DIM_ZSTANDARD_LONG, tempLongRows[i]);
+      row.putValue(DIM_PASS_THROUGH_LONG, tempLongRows[i]);
+      row.putValue(DIM_LZ4_LONG, tempLongRows[i]);
+
+      // Dictionary columns
+      row.putValue(DIM_DICT_INTEGER, tempIntRows[i]);
+      row.putValue(DIM_DICT_LONG, tempLongRows[i]);
+      row.putValue(DIM_DICT_STRING, tempStringRows[i]);
+
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  @Test
+  public void testComputeOperation()
+      throws Exception {
+    // Setup
+    SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+    SegmentDirectory segmentLocalFSDirectory =
+        new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+    SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+    // TEST1 : Validate with zero changes. ForwardIndexHandler should be a No-Op.
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+    Map<String, ForwardIndexHandler.Operation> operationMap = new HashMap<>();
+    operationMap = fwdIndexHandler.computeOperation(writer);
+    assertEquals(operationMap, Collections.EMPTY_MAP);
+
+    // TEST2: Enable dictionary for a RAW_ZSTANDARD_INDEX_COLUMN. ForwardIndexHandler should be a No-Op.
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getNoDictionaryColumns().remove(DIM_ZSTANDARD_STRING);
+    fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+    operationMap = fwdIndexHandler.computeOperation(writer);
+    assertEquals(operationMap, Collections.EMPTY_MAP);
+
+    // TEST3: Disable dictionary. ForwardIndexHandler should be a No-Op.
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getNoDictionaryColumns().add(DIM_DICT_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+    operationMap = fwdIndexHandler.computeOperation(writer);
+    assertEquals(operationMap, Collections.EMPTY_MAP);
+
+    // TEST4: Add random index. ForwardIndexHandler should be a No-Op.
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getTextIndexColumns().add(DIM_DICT_INTEGER);
+    indexLoadingConfig.getTextIndexColumns().add(DIM_LZ4_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+    operationMap = fwdIndexHandler.computeOperation(writer);
+    assertEquals(operationMap, Collections.EMPTY_MAP);
+
+    // TEST5: Change compression
+
+    // Create new tableConfig with the modified fieldConfigs.
+    List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+    FieldConfig config = fieldConfigs.remove(0);
+    FieldConfig newConfig = new FieldConfig(config.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
+        FieldConfig.CompressionCodec.ZSTANDARD, null);
+    fieldConfigs.add(newConfig);
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
+            .setFieldConfigList(fieldConfigs).build();
+    tableConfig.setFieldConfigList(fieldConfigs);
+
+    indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
+    fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+    operationMap = fwdIndexHandler.computeOperation(writer);
+    assertEquals(operationMap.size(), 1);
+    assertEquals(operationMap.get(config.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+
+    // TEST6: Change compression and add index. Change compressionType for more than 1 column.
+    fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+    FieldConfig config1 = fieldConfigs.remove(0);
+    FieldConfig config2 = fieldConfigs.remove(1);
+
+    FieldConfig newConfig1 = new FieldConfig(config1.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
+        FieldConfig.CompressionCodec.ZSTANDARD, null);
+    fieldConfigs.add(newConfig1);
+    FieldConfig newConfig2 = new FieldConfig(config2.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
+        FieldConfig.CompressionCodec.ZSTANDARD, null);
+    fieldConfigs.add(newConfig2);
+
+    tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
+            .setFieldConfigList(fieldConfigs).build();
+    tableConfig.setFieldConfigList(fieldConfigs);
+
+    indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
+    indexLoadingConfig.getTextIndexColumns().add(config1.getName());
+    indexLoadingConfig.getInvertedIndexColumns().add(config1.getName());
+    fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+    operationMap = fwdIndexHandler.computeOperation(writer);
+    assertEquals(operationMap.size(), 2);
+    assertEquals(operationMap.get(config1.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+    assertEquals(operationMap.get(config2.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+
+    // Tear down
+    segmentLocalFSDirectory.close();
+  }
+
+  @Test
+  public void testRewriteRawForwardIndexForSingleColumn()
+      throws Exception {
+    for (int i = 0; i < _noDictionaryColumns.size(); i++) {
+
+      // For every noDictionaryColumn, change the compressionType to all available types, one by one.
+      for (FieldConfig.CompressionCodec compressionType : _allCompressionTypes) {
+        // Setup
+        SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+        SegmentDirectory segmentLocalFSDirectory =
+            new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+        SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+        List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+        FieldConfig config = fieldConfigs.remove(i);
+        String columnName = config.getName();
+        FieldConfig.CompressionCodec newCompressionType = compressionType;
+
+        FieldConfig newConfig =
+            new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType,
+                null);
+        fieldConfigs.add(newConfig);
+
+        TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+            .setNoDictionaryColumns(_noDictionaryColumns).setFieldConfigList(fieldConfigs).build();
+        tableConfig.setFieldConfigList(fieldConfigs);
+
+        IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
+        IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+        ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+
+        fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
+
+        // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
+        segmentLocalFSDirectory.close();
+
+        // Validation
+        validateIndexMap();
+        validateForwardIndex(columnName, newCompressionType);
+      }
+    }
+  }
+
+  @Test
+  public void testRewriteRawForwardIndexForMultipleColumns()
+      throws Exception {
+    // Setup
+    SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+    SegmentDirectory segmentLocalFSDirectory =
+        new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+    SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+    List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+    Random rand = new Random();
+    int randomIdx = rand.nextInt(_allCompressionTypes.size());
+    FieldConfig.CompressionCodec newCompressionType = _allCompressionTypes.get(randomIdx);
+
+    // Column 1
+    randomIdx = rand.nextInt(fieldConfigs.size());
+    FieldConfig config1 = fieldConfigs.remove(randomIdx);
+    String column1 = config1.getName();
+    FieldConfig newConfig1 =
+        new FieldConfig(column1, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType, null);
+    fieldConfigs.add(newConfig1);
+
+    // Column 2
+    randomIdx = rand.nextInt(fieldConfigs.size());
+    FieldConfig config2 = fieldConfigs.remove(randomIdx);
+    String column2 = config2.getName();
+    FieldConfig newConfig2 =
+        new FieldConfig(column2, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType, null);
+    fieldConfigs.add(newConfig2);
+
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
+            .setFieldConfigList(fieldConfigs).build();
+    tableConfig.setFieldConfigList(fieldConfigs);
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
+    IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+    ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+    fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
+
+    // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
+    segmentLocalFSDirectory.close();
+
+    validateIndexMap();
+    validateForwardIndex(column1, newCompressionType);
+    validateForwardIndex(column2, newCompressionType);
+  }
+
+  private void validateForwardIndex(String columnName, FieldConfig.CompressionCodec expectedCompressionType)
+      throws IOException {
+    // Setup
+    SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+    SegmentDirectory segmentLocalFSDirectory =
+        new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+    SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+    ColumnMetadata columnMetadata = existingSegmentMetadata.getColumnMetadataFor(columnName);
+
+    // Check Compression type in header
+    ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(writer, columnMetadata);
+    ChunkCompressionType fwdIndexCompressionType = fwdIndexReader.getCompressionType();
+    assertEquals(fwdIndexCompressionType.name(), expectedCompressionType.name());
+
+    try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(writer, columnMetadata)) {
+      PinotSegmentColumnReader columnReader =
+          new PinotSegmentColumnReader(forwardIndexReader, null, null, columnMetadata.getMaxNumberOfMultiValues());
+
+      for (int rowIdx = 0; rowIdx < columnMetadata.getTotalDocs(); rowIdx++) {
+        if (rowIdx % 10 == 0) {
+          Object val = columnReader.getValue(rowIdx);
+
+          FieldSpec.DataType dataType = forwardIndexReader.getStoredType();
+          if (dataType == FieldSpec.DataType.STRING) {
+            assertEquals((String) val, "testRow");
+          } else if (dataType == FieldSpec.DataType.INT) {
+            assertEquals((int) val, 1001, columnName + " " + rowIdx + " " + expectedCompressionType);
+          } else if (dataType == FieldSpec.DataType.LONG) {
+            assertEquals((long) val, 1001L, columnName + " " + rowIdx + " " + expectedCompressionType);
+          }
+        }
+      }
+    }
+  }
+
+  private void validateIndexMap()
+      throws IOException {
+    // Panic validation to make sure all columns have only one forward index entry in index map.
+    for (String columnName : _noDictionaryColumns) {
+      String segmentDir = INDEX_DIR + "/" + SEGMENT_NAME + "/v3";
+      File idxMapFile = new File(segmentDir, V1Constants.INDEX_MAP_FILE_NAME);
+      String indexMapStr = FileUtils.readFileToString(idxMapFile, StandardCharsets.UTF_8);
+      assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".startOffset"), 1);
+      assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".size"), 1);
+    }
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index b4de2f5e22..2bcad84f23 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -43,11 +43,13 @@ import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.Col
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -308,6 +310,60 @@ public class SegmentPreProcessorTest {
     checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
   }
 
+  @Test
+  public void testForwardIndexHandler()
+      throws Exception {
+    Map<String, ChunkCompressionType> compressionConfigs = new HashMap<>();
+    ChunkCompressionType newCompressionType = ChunkCompressionType.ZSTANDARD;
+    compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+    _indexLoadingConfig.setCompressionConfigs(compressionConfigs);
+    _indexLoadingConfig.setNoDictionaryColumns(new HashSet<String>() {{
+      add(EXISTING_STRING_COL_RAW);
+    }});
+
+    // Test1: Rewriting forward index will be a no-op for v1 segments. Default LZ4 compressionType will be retained.
+    constructV1Segment();
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, ChunkCompressionType.LZ4);
+
+    // Convert the segment to V3.
+    new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+
+    // Test2: Now forward index will be rewritten with ZSTANDARD compressionType.
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, newCompressionType);
+
+    // Test3: Change compression on existing raw index column. Also add text index on same column. Check correctness.
+    newCompressionType = ChunkCompressionType.SNAPPY;
+    compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+    Set<String> textIndexColumns = new HashSet<>();
+    textIndexColumns.add(EXISTING_STRING_COL_RAW);
+    _indexLoadingConfig.setTextIndexColumns(textIndexColumns);
+
+    constructV3Segment();
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
+    assertNotNull(columnMetadata);
+    checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
+    validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
+        0, newCompressionType);
+
+    // Test4: Change compression on RAW index column. Change another index on another column. Check correctness.
+    newCompressionType = ChunkCompressionType.ZSTANDARD;
+    compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+    Set<String> fstColumns = new HashSet<>();
+    fstColumns.add(EXISTING_STRING_COL_DICT);
+    _indexLoadingConfig.setFSTIndexColumns(fstColumns);
+
+    constructV3Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
+    assertNotNull(columnMetadata);
+    // Check FST index
+    checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
+    // Check forward index.
+    validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
+        0, newCompressionType);
+  }
+
   /**
    * Test to check for default column handling and text index creation during
    * segment load after a new dictionary encoded column is added to the schema
@@ -406,28 +462,36 @@ public class SegmentPreProcessorTest {
   private void checkFSTIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
       boolean isSorted, int dictionaryElementSize)
       throws Exception {
-    checkIndexCreation(ColumnIndexType.FST_INDEX, column, cardinality, bits, schema, isAutoGenerated, true, isSorted,
-        dictionaryElementSize, true, 0);
+    createAndValidateIndex(ColumnIndexType.FST_INDEX, column, cardinality, bits, schema, isAutoGenerated, true,
+        isSorted, dictionaryElementSize, true, 0, null);
   }
 
   private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
       boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
       throws Exception {
-    checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated, hasDictionary,
-        isSorted, dictionaryElementSize, true, 0);
+    createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
+        hasDictionary, isSorted, dictionaryElementSize, true, 0, null);
   }
 
   private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
       boolean hasDictionary, boolean isSorted, int dictionaryElementSize, boolean isSingleValue,
       int maxNumberOfMultiValues)
       throws Exception {
-    checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated, hasDictionary,
-        isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues);
+    createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
+        hasDictionary, isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues, null);
   }
 
-  private void checkIndexCreation(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema,
+  private void checkForwardIndexCreation(String column, int cardinality, int bits, Schema schema,
       boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
-      boolean isSingleValued, int maxNumberOfMultiValues)
+      ChunkCompressionType expectedCompressionType)
+      throws Exception {
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, column, cardinality, bits, schema, isAutoGenerated,
+        hasDictionary, isSorted, dictionaryElementSize, true, 0, expectedCompressionType);
+  }
+
+  private void createAndValidateIndex(ColumnIndexType indexType, String column, int cardinality, int bits,
+      Schema schema, boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
+      boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
       throws Exception {
 
     try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
@@ -435,33 +499,59 @@ public class SegmentPreProcessorTest {
             new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
         SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, schema)) {
       processor.process();
-      SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
-      ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
-      assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, isSingleValued));
-      assertEquals(columnMetadata.getCardinality(), cardinality);
-      assertEquals(columnMetadata.getTotalDocs(), 100000);
-      assertEquals(columnMetadata.getBitsPerElement(), bits);
-      assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize);
-      assertEquals(columnMetadata.isSorted(), isSorted);
-      assertEquals(columnMetadata.hasDictionary(), hasDictionary);
-      assertEquals(columnMetadata.getMaxNumberOfMultiValues(), maxNumberOfMultiValues);
-      assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
-      assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
-
-      try (SegmentDirectory segmentDirectory1 = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
-          .load(_indexDir.toURI(),
-              new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
-          SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
-        assertTrue(reader.hasIndexFor(column, indexType));
-        assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
-        // if the text index is enabled on a new column with dictionary,
-        // then dictionary should be created by the default column handler
-        if (hasDictionary) {
-          assertTrue(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
+      validateIndex(indexType, column, cardinality, bits, schema, isAutoGenerated, hasDictionary, isSorted,
+          dictionaryElementSize, isSingleValued, maxNumberOfMultiValues, expectedCompressionType);
+    }
+  }
+
+  private void validateIndex(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema,
+      boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
+      boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
+      throws Exception {
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+    assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, isSingleValued));
+    assertEquals(columnMetadata.getCardinality(), cardinality);
+    assertEquals(columnMetadata.getTotalDocs(), 100000);
+    assertEquals(columnMetadata.getBitsPerElement(), bits);
+    assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize);
+    assertEquals(columnMetadata.isSorted(), isSorted);
+    assertEquals(columnMetadata.hasDictionary(), hasDictionary);
+    assertEquals(columnMetadata.getMaxNumberOfMultiValues(), maxNumberOfMultiValues);
+    assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
+    assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
+
+    try (SegmentDirectory segmentDirectory1 = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+        .load(_indexDir.toURI(),
+            new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+        SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
+      assertTrue(reader.hasIndexFor(column, indexType));
+      assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
+
+      // Check if the raw forward index compressionType is correct.
+      if (expectedCompressionType != null) {
+        try (ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(reader, columnMetadata)) {
+          ChunkCompressionType compressionType = fwdIndexReader.getCompressionType();
+          assertTrue(compressionType.equals(expectedCompressionType), compressionType.toString());
+        }
+
+        File inProgressFile = new File(_indexDir, column + ".fwd.inprogress");
+        assertTrue(!inProgressFile.exists());
+        File v1FwdIndexFile = new File(_indexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+        if (segmentMetadata.getVersion() == SegmentVersion.v3) {
+          assertTrue(!v1FwdIndexFile.exists());
         } else {
-          assertFalse(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
+          assertTrue(v1FwdIndexFile.exists());
         }
       }
+
+      // if the text index is enabled on a new column with dictionary,
+      // then dictionary should be created by the default column handler
+      if (hasDictionary) {
+        assertTrue(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
+      } else {
+        assertFalse(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
+      }
     }
   }
 
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index 3ffa80ce97..86331a28ae 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.spi.index.reader;
 import java.io.Closeable;
 import java.math.BigDecimal;
 import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 
@@ -49,6 +50,24 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
    */
   DataType getStoredType();
 
+  /**
+   * Returns the compression type (if valid). Only valid for RAW forward index columns implemented in
+   * BaseChunkForwardIndexReader.
+   * @return
+   */
+  default ChunkCompressionType getCompressionType() {
+    return null;
+  }
+
+  /**
+   * Returns the length of the longest entry. Only valid for RAW forward index columns implemented in
+   * BaseChunkForwardIndexReader. Returns -1 otherwise.
+   * @return
+   */
+  default int getLengthOfLongestEntry() {
+    return -1;
+  }
+
   /**
    * Creates a new {@link ForwardIndexReaderContext} of the reader which can be used to accelerate the reads.
    * NOTE: Caller is responsible for closing the returned reader context.
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexType.java
index 2650fb6bd5..37c9d608af 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexType.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.segment.spi.store;
 
+// Note: It is required to keep DICTIONARY and FORWARD_INDEX as the starting entries in this enum to ensure
+// correctness during preprocessing a segment during segmentReload. Please add new entries at the end (or at least
+// below DICTIONARY and FORWARD_INDEX).
 public enum ColumnIndexType {
   DICTIONARY("dictionary"),
   FORWARD_INDEX("forward_index"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org