You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/07/01 17:41:43 UTC

[incubator-pinot] branch master updated: Clean up SegmentMetadata and ColumnMetadata (#7104)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f38609b  Clean up SegmentMetadata and ColumnMetadata (#7104)
f38609b is described below

commit f38609b8dae9ccd1b2f171de0212aae3ce47bcd8
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jul 1 10:41:30 2021 -0700

    Clean up SegmentMetadata and ColumnMetadata (#7104)
    
    - Change `ColumnMetadata` to an interface, and rename `ColumnMetadata` to `ColumnMetadataImpl`
    - Add apis related to `ColumnMetadata` into `SegmentMetadata`
    - Clean up `SegmentMetadataImpl` and `ColumnMetadataImpl`
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 100 ++--
 .../helix/core/util/ZKMetadataUtils.java           |  13 +-
 .../controller/api/PinotSegmentsMetadataTest.java  |   4 -
 .../PinotLLCRealtimeSegmentManagerTest.java        |   7 +-
 .../controller/utils/SegmentMetadataMockUtils.java |   6 +-
 .../pinot/core/minion/RawIndexConverter.java       |  13 +-
 ...vertToRawIndexMinionClusterIntegrationTest.java |   6 +-
 .../pinot/perf/BenchmarkOfflineIndexReader.java    |   2 +-
 .../RealtimeToOfflineSegmentsTaskExecutorTest.java |   2 +-
 .../indexsegment/immutable/EmptyIndexSegment.java  |   6 +-
 .../immutable/ImmutableSegmentImpl.java            |   2 +-
 .../immutable/ImmutableSegmentLoader.java          |   4 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |   6 +-
 .../index/column/BaseVirtualColumnProvider.java    |  12 +-
 .../DefaultNullValueVirtualColumnProvider.java     |   8 +-
 .../index/column/PhysicalColumnIndexContainer.java |   2 +-
 .../converter/SegmentV1V2ToV3FormatConverter.java  |   2 +-
 .../segment/index/datasource/EmptyDataSource.java  |   2 +-
 .../index/datasource/ImmutableDataSource.java      |   2 +-
 .../local/segment/index/loader/LoaderUtils.java    |   5 +-
 .../loader/bloomfilter/BloomFilterHandler.java     |   4 +-
 .../ColumnMinMaxValueGenerator.java                |   2 +-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  23 +-
 .../defaultcolumn/DefaultColumnHandlerFactory.java |   2 +-
 .../index/loader/invertedindex/H3IndexHandler.java |   4 +-
 .../loader/invertedindex/InvertedIndexHandler.java |   8 +-
 .../loader/invertedindex/JsonIndexHandler.java     |   7 +-
 .../invertedindex/LuceneFSTIndexHandler.java       |   8 +-
 .../loader/invertedindex/RangeIndexHandler.java    |   8 +-
 .../loader/invertedindex/TextIndexHandler.java     |  11 +-
 .../local/segment/store/FilePerIndexDirectory.java |  36 +-
 .../segment/store/SegmentLocalFSDirectory.java     |   8 +-
 .../virtualcolumn/DocIdVirtualColumnProvider.java  |   9 +-
 .../virtualcolumn/VirtualColumnProvider.java       |   2 +-
 .../local/startree/StarTreeBuilderUtils.java       |   4 +-
 .../startree/v2/builder/MultipleTreesBuilder.java  |   5 +-
 .../v2/builder/StarTreeV2BuilderConfig.java        |   6 +-
 .../startree/v2/store/StarTreeLoaderUtils.java     |   2 +-
 .../local/segment/creator/IntArraysTest.java       |   6 +-
 .../local/segment/index/ColumnMetadataTest.java    |  59 +-
 .../segment/index/SegmentMetadataImplTest.java     |   6 +-
 .../DefaultNullValueVirtualColumnProviderTest.java | 208 +++----
 .../SegmentV1V2ToV3FormatConverterTest.java        |   4 +-
 .../SegmentGenerationWithBytesTypeTest.java        |   6 +-
 .../index/creator/SegmentPartitionTest.java        |  10 +-
 .../local/segment/index/loader/LoaderTest.java     |  38 +-
 .../index/loader/SegmentPreProcessorTest.java      | 194 +++----
 .../store/ColumnIndexDirectoryTestHelper.java      |  54 +-
 .../store/SingleFileIndexDirectoryTest.java        |   2 +-
 .../v2/builder/StarTreeV2BuilderConfigTest.java    |   2 +-
 .../apache/pinot/segment/spi/ColumnMetadata.java   |  90 +++
 .../apache/pinot/segment/spi/SegmentMetadata.java  |  59 +-
 .../org/apache/pinot/segment/spi/V1Constants.java  |  18 +-
 .../segment/spi/index/metadata/ColumnMetadata.java | 623 ---------------------
 .../spi/index/metadata/ColumnMetadataImpl.java     | 390 +++++++++++++
 .../spi/index/metadata/SegmentMetadataImpl.java    | 284 ++--------
 .../api/resources/SegmentMetadataFetcher.java      |   6 +-
 .../pinot/server/api/resources/TablesResource.java |   6 +-
 .../pinot/server/api/TablesResourceTest.java       |  48 +-
 .../anonymizer/ArrayBasedGlobalDictionaries.java   |   2 +-
 .../pinot/tools/anonymizer/GlobalDictionaries.java |   2 +-
 .../anonymizer/MapBasedGlobalDictionaries.java     |   2 +-
 .../anonymizer/PinotDataAndQueryAnonymizer.java    |   2 +-
 .../apache/pinot/tools/scan/query/Projection.java  |   2 +-
 .../tools/scan/query/SegmentQueryProcessor.java    |   3 +-
 65 files changed, 1019 insertions(+), 1460 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d26a434..b05715a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -63,8 +63,10 @@ import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegment
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -254,10 +256,11 @@ public class PinotLLCRealtimeSegmentManager {
 
     _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
 
-    PartitionLevelStreamConfig streamConfig =
-        new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
     int numPartitionGroups = newPartitionGroupMetadataList.size();
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
@@ -268,8 +271,9 @@ public class PinotLLCRealtimeSegmentManager {
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
     for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
-      String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata,
-          currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
+      String segmentName =
+          setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
+              numPartitionGroups, numReplicas);
 
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -504,7 +508,8 @@ public class PinotLLCRealtimeSegmentManager {
     List<PartitionGroupMetadata> newPartitionGroupMetadataList =
         getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
     Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
+        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
+            .collect(Collectors.toSet());
     int numPartitionGroups = newPartitionGroupMetadataList.size();
 
     // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new segment metadata
@@ -515,7 +520,8 @@ public class PinotLLCRealtimeSegmentManager {
       LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
           committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
       createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas);
+          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups,
+          numReplicas);
       newConsumingSegmentName = newLLCSegment.getSegmentName();
     }
 
@@ -586,7 +592,10 @@ public class PinotLLCRealtimeSegmentManager {
       committingSegmentZKMetadata.setEndTime(now);
     }
     committingSegmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
-    committingSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
+    SegmentVersion segmentVersion = segmentMetadata.getVersion();
+    if (segmentVersion != null) {
+      committingSegmentZKMetadata.setIndexVersion(segmentVersion.name());
+    }
     committingSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
 
     // Update the partition group metadata based on the segment metadata
@@ -637,7 +646,8 @@ public class PinotLLCRealtimeSegmentManager {
     // Update the flush threshold
     FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas));
+        committingSegmentZKMetadata,
+        getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas));
 
     persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
   }
@@ -669,10 +679,11 @@ public class PinotLLCRealtimeSegmentManager {
     for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
       // NOTE: There is at most one partition column.
       ColumnMetadata columnMetadata = entry.getValue();
-      if (columnMetadata.getPartitionFunction() != null) {
+      PartitionFunction partitionFunction = columnMetadata.getPartitionFunction();
+      if (partitionFunction != null) {
         ColumnPartitionMetadata columnPartitionMetadata =
-            new ColumnPartitionMetadata(columnMetadata.getPartitionFunction().toString(),
-                columnMetadata.getNumPartitions(), columnMetadata.getPartitions());
+            new ColumnPartitionMetadata(partitionFunction.toString(), partitionFunction.getNumPartitions(),
+                columnMetadata.getPartitions());
         return new SegmentPartitionMetadata(Collections.singletonMap(entry.getKey(), columnPartitionMetadata));
       }
     }
@@ -706,8 +717,8 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig,
       List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) {
-    return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig,
-        currentPartitionGroupConsumptionStatusList);
+    return PinotTableIdealStateBuilder
+        .getPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
   }
 
   /**
@@ -756,15 +767,15 @@ public class PinotLLCRealtimeSegmentManager {
       LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
       latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), (partitionId, latestLLCSegmentName) -> {
         if (latestLLCSegmentName == null) {
-              return llcSegmentName;
-            } else {
-              if (llcSegmentName.getSequenceNumber() > latestLLCSegmentName.getSequenceNumber()) {
-                return llcSegmentName;
-              } else {
-                return latestLLCSegmentName;
-              }
-            }
-          });
+          return llcSegmentName;
+        } else {
+          if (llcSegmentName.getSequenceNumber() > latestLLCSegmentName.getSequenceNumber()) {
+            return llcSegmentName;
+          } else {
+            return latestLLCSegmentName;
+          }
+        }
+      });
     }
 
     Map<Integer, LLCRealtimeSegmentZKMetadata> latestSegmentZKMetadataMap = new HashMap<>();
@@ -815,7 +826,6 @@ public class PinotLLCRealtimeSegmentManager {
         List<PartitionGroupMetadata> newPartitionGroupMetadataList =
             getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList);
-
       } else {
         LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
         return idealState;
@@ -876,14 +886,17 @@ public class PinotLLCRealtimeSegmentManager {
       for (String segmentNameStr : instanceStatesMap.keySet()) {
         LLCSegmentName llcSegmentName = new LLCSegmentName(segmentNameStr);
         if (llcSegmentName.getPartitionGroupId() == partitionId && llcSegmentName.getSequenceNumber() == seqNum) {
-          String errorMsg = String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr);
+          String errorMsg =
+              String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr);
           LOGGER.error(errorMsg);
           throw new HelixHelper.PermanentUpdaterException(errorMsg);
         }
       }
       // Assign instances to the new segment and add instances as state CONSUMING
-      List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
-      instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+      List<String> instancesAssigned =
+          segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
+      instanceStatesMap.put(newSegmentName,
+          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
       LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
     }
   }
@@ -964,7 +977,8 @@ public class PinotLLCRealtimeSegmentManager {
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
     int numPartitions = newPartitionGroupMetadataList.size();
     Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
+        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
+            .collect(Collectors.toSet());
 
     SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -1016,17 +1030,17 @@ public class PinotLLCRealtimeSegmentManager {
 
               LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
               String newSegmentName = newLLCSegmentName.getSegmentName();
-              CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName,
-                  (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
+              CommittingSegmentDescriptor committingSegmentDescriptor =
+                  new CommittingSegmentDescriptor(latestSegmentName,
+                      (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
               createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
                   committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
               updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
                   segmentAssignment, instancePartitionsMap);
             } else { // partition group reached end of life
-              LOGGER.info(
-                  "PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
-                      + "Skipping creation of new ZK metadata and new segment in ideal state",
-                  partitionGroupId, latestSegmentName);
+              LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
+                      + "Skipping creation of new ZK metadata and new segment in ideal state", partitionGroupId,
+                  latestSegmentName);
               updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
                   instancePartitionsMap);
             }
@@ -1068,7 +1082,8 @@ public class PinotLLCRealtimeSegmentManager {
               // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In
               // that case, we need to either extend this part to handle the state, or prevent segments from getting into
               // such state.
-              LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName);
+              LOGGER
+                  .error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName);
             }
             // else, the partition group has reached end of life. This is an acceptable state
           }
@@ -1116,8 +1131,8 @@ public class PinotLLCRealtimeSegmentManager {
       int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
       if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
         String newSegmentName =
-            setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitions,
-                numReplicas);
+            setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
+                numPartitions, numReplicas);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
             instancePartitionsMap);
       }
@@ -1154,8 +1169,9 @@ public class PinotLLCRealtimeSegmentManager {
    * Sets up a new partition group.
    * <p>Persists the ZK metadata for the first CONSUMING segment, and returns the segment name.
    */
-  private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata,
-      long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) {
+  private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
+      PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
+      int numPartitionGroups, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
     String startOffset = partitionGroupMetadata.getStartOffset().toString();
@@ -1166,15 +1182,13 @@ public class PinotLLCRealtimeSegmentManager {
         new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
     String newSegmentName = newLLCSegmentName.getSegmentName();
 
-    CommittingSegmentDescriptor committingSegmentDescriptor =
-        new CommittingSegmentDescriptor(null, startOffset, 0);
+    CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
         committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas);
 
     return newSegmentName;
   }
 
-
   @VisibleForTesting
   long getCurrentTimeMs() {
     return System.currentTimeMillis();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 2aabb52..7dfc369 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -23,8 +23,9 @@ import java.util.Map;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
@@ -39,7 +40,10 @@ public class ZKMetadataUtils {
       SegmentType segmentType) {
     segmentZKMetadata.setSegmentName(segmentMetadata.getName());
     segmentZKMetadata.setTableName(segmentMetadata.getTableName());
-    segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
+    SegmentVersion segmentVersion = segmentMetadata.getVersion();
+    if (segmentVersion != null) {
+      segmentZKMetadata.setIndexVersion(segmentVersion.name());
+    }
     segmentZKMetadata.setSegmentType(segmentType);
     if (segmentMetadata.getTimeInterval() != null) {
       segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
@@ -57,15 +61,14 @@ public class ZKMetadataUtils {
     // Extract column partition metadata (if any), and set it into segment ZK metadata.
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
     if (segmentMetadata instanceof SegmentMetadataImpl) {
-      SegmentMetadataImpl metadata = (SegmentMetadataImpl) segmentMetadata;
-      for (Map.Entry<String, ColumnMetadata> entry : metadata.getColumnMetadataMap().entrySet()) {
+      for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
         String column = entry.getKey();
         ColumnMetadata columnMetadata = entry.getValue();
         PartitionFunction partitionFunction = columnMetadata.getPartitionFunction();
 
         if (partitionFunction != null) {
           ColumnPartitionMetadata columnPartitionMetadata =
-              new ColumnPartitionMetadata(partitionFunction.toString(), columnMetadata.getNumPartitions(),
+              new ColumnPartitionMetadata(partitionFunction.toString(), partitionFunction.getNumPartitions(),
                   columnMetadata.getPartitions());
           columnPartitionMap.put(column, columnPartitionMetadata);
         }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java
index 086628a..25664d4 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java
@@ -216,10 +216,6 @@ public class PinotSegmentsMetadataTest {
             "  \"startTimeReadable\" : null,\n" +
             "  \"endTimeMillis\" : null,\n" +
             "  \"endTimeReadable\" : null,\n" +
-            "  \"pushTimeMillis\" : -9223372036854775808,\n" +
-            "  \"pushTimeReadable\" : null,\n" +
-            "  \"refreshTimeMillis\" : -9223372036854775808,\n" +
-            "  \"refreshTimeReadable\" : null,\n" +
             "  \"segmentVersion\" : \"v3\",\n" +
             "  \"creatorName\" : null,\n" +
             "  \"paddingCharacter\" : \"\\u0000\",\n" +
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 64c5cb8..e2046cd 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -91,8 +91,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
   static final long END_TIME_MS = START_TIME_MS + TimeUnit.HOURS.toMillis(RANDOM.nextInt(24) + 1);
   static final Interval INTERVAL = new Interval(START_TIME_MS, END_TIME_MS);
   static final String CRC = Long.toString(RANDOM.nextLong());
-  static final String SEGMENT_VERSION =
-      RANDOM.nextBoolean() ? SegmentVersion.v1.toString() : SegmentVersion.v3.toString();
+  static final SegmentVersion SEGMENT_VERSION = RANDOM.nextBoolean() ? SegmentVersion.v1 : SegmentVersion.v3;
   static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
 
   @AfterClass
@@ -214,7 +213,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
         new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString());
     assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
     assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
-    assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION);
+    assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION.name());
     assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
 
     LLCRealtimeSegmentZKMetadata consumingSegmentZKMetadata =
@@ -289,7 +288,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertEquals(committedSegmentZKMetadata.getEndOffset(), committingSegmentEndOffset);
     assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
     assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
-    assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION);
+    assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION.name());
     assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
 
     consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index 3f18b9e..e6d68f8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -24,8 +24,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.partition.MurmurPartitionFunction;
 import org.joda.time.Interval;
@@ -46,8 +46,6 @@ public class SegmentMetadataMockUtils {
     Mockito.when(segmentMetadata.getName()).thenReturn(segmentName);
     Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(numTotalDocs);
     Mockito.when(segmentMetadata.getCrc()).thenReturn(crc);
-    Mockito.when(segmentMetadata.getPushTime()).thenReturn(Long.MIN_VALUE);
-    Mockito.when(segmentMetadata.getRefreshTime()).thenReturn(Long.MIN_VALUE);
     Mockito.when(segmentMetadata.getEndTime()).thenReturn(10L);
     Mockito.when(segmentMetadata.getTimeInterval()).thenReturn(new Interval(0, 20));
     Mockito.when(segmentMetadata.getTimeUnit()).thenReturn(TimeUnit.DAYS);
@@ -100,8 +98,6 @@ public class SegmentMetadataMockUtils {
     Mockito.when(segmentMetadata.getName()).thenReturn(segmentName);
     Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(10);
     Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
-    Mockito.when(segmentMetadata.getPushTime()).thenReturn(Long.MIN_VALUE);
-    Mockito.when(segmentMetadata.getRefreshTime()).thenReturn(Long.MIN_VALUE);
     Mockito.when(segmentMetadata.getEndTime()).thenReturn(endTime);
     Mockito.when(segmentMetadata.getTimeInterval()).thenReturn(new Interval(endTime - 10, endTime + 10));
     Mockito.when(segmentMetadata.getTimeUnit()).thenReturn(TimeUnit.DAYS);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index 21c594d..d305cbd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -31,14 +31,14 @@ import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexC
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.utils.CrcUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -78,7 +78,7 @@ public class RawIndexConverter {
 
   private final String _rawTableName;
   private final ImmutableSegment _originalImmutableSegment;
-  private final SegmentMetadataImpl _originalSegmentMetadata;
+  private final SegmentMetadata _originalSegmentMetadata;
   private final File _convertedIndexDir;
   private final PropertiesConfiguration _convertedProperties;
   private final String _columnsToConvert;
@@ -96,7 +96,7 @@ public class RawIndexConverter {
     indexLoadingConfig.setReadMode(ReadMode.mmap);
     _rawTableName = rawTableName;
     _originalImmutableSegment = ImmutableSegmentLoader.load(originalIndexDir, indexLoadingConfig);
-    _originalSegmentMetadata = (SegmentMetadataImpl) _originalImmutableSegment.getSegmentMetadata();
+    _originalSegmentMetadata = _originalImmutableSegment.getSegmentMetadata();
     _convertedIndexDir = convertedIndexDir;
     _convertedProperties =
         new PropertiesConfiguration(new File(_convertedIndexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
@@ -113,7 +113,8 @@ public class RawIndexConverter {
     if (_columnsToConvert == null) {
       LOGGER.info("Columns to convert are not specified, check each metric column");
       for (MetricFieldSpec metricFieldSpec : schema.getMetricFieldSpecs()) {
-        if (_originalSegmentMetadata.hasDictionary(metricFieldSpec.getName()) && shouldConvertColumn(metricFieldSpec)) {
+        if (_originalSegmentMetadata.getColumnMetadataFor(metricFieldSpec.getName()).hasDictionary()
+            && shouldConvertColumn(metricFieldSpec)) {
           columnsToConvert.add(metricFieldSpec);
         }
       }
@@ -129,7 +130,7 @@ public class RawIndexConverter {
           LOGGER.warn("Skip converting column: {} because it's a multi-value column", columnsToConvert);
           continue;
         }
-        if (!_originalSegmentMetadata.hasDictionary(columnToConvert)) {
+        if (!_originalSegmentMetadata.getColumnMetadataFor(columnToConvert).hasDictionary()) {
           LOGGER.warn("Skip converting column: {} because its index is not dictionary-based", columnsToConvert);
           continue;
         }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
index 83312b2..7edf098 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
@@ -104,7 +104,7 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
     for (File indexDir : indexDirs) {
       SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
       for (String columnName : segmentMetadata.getSchema().getColumnNames()) {
-        Assert.assertTrue(segmentMetadata.hasDictionary(columnName));
+        Assert.assertTrue(segmentMetadata.getColumnMetadataFor(columnName).hasDictionary());
       }
     }
 
@@ -155,11 +155,11 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
         List<String> rawIndexColumns = Arrays.asList(StringUtils.split(COLUMNS_TO_CONVERT, ','));
         for (String columnName : segmentMetadata.getSchema().getColumnNames()) {
           if (rawIndexColumns.contains(columnName)) {
-            if (segmentMetadata.hasDictionary(columnName)) {
+            if (segmentMetadata.getColumnMetadataFor(columnName).hasDictionary()) {
               return false;
             }
           } else {
-            if (!segmentMetadata.hasDictionary(columnName)) {
+            if (!segmentMetadata.getColumnMetadataFor(columnName).hasDictionary()) {
               return false;
             }
           }
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOfflineIndexReader.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOfflineIndexReader.java
index fc93f9d..7f34d13 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOfflineIndexReader.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOfflineIndexReader.java
@@ -40,8 +40,8 @@ import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index a4538a7..2c664af 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -36,8 +36,8 @@ import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
index d9c56c5..98a9cd6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
@@ -23,18 +23,16 @@ import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.segment.index.datasource.EmptyDataSource;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -42,8 +40,6 @@ import org.slf4j.LoggerFactory;
  * Such an IndexSegment contains only the metadata, and no indexes
  */
 public class EmptyIndexSegment implements ImmutableSegment {
-  private static final Logger LOGGER = LoggerFactory.getLogger(EmptyIndexSegment.class);
-
   private final SegmentMetadataImpl _segmentMetadata;
 
   public EmptyIndexSegment(SegmentMetadataImpl segmentMetadata) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 4f110df..d642179 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -28,11 +28,11 @@ import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSour
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index fd1ae57..7ca7246 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -33,11 +33,11 @@ import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext
 import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
 import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
 import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.converter.SegmentFormatConverter;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
@@ -87,7 +87,7 @@ public class ImmutableSegmentLoader {
     SegmentMetadataImpl localSegmentMetadata = new SegmentMetadataImpl(indexDir);
     if (segmentVersionToLoad != null && !SegmentDirectoryPaths.segmentDirectoryFor(indexDir, segmentVersionToLoad)
         .isDirectory()) {
-      SegmentVersion segmentVersionOnDisk = localSegmentMetadata.getSegmentVersion();
+      SegmentVersion segmentVersionOnDisk = localSegmentMetadata.getVersion();
       if (segmentVersionOnDisk != segmentVersionToLoad) {
         LOGGER.info("Segment: {} needs to be converted from version: {} to {}", segmentName, segmentVersionOnDisk,
             segmentVersionToLoad);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 96f71fe..b16cade 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -171,9 +171,7 @@ public class MutableSegmentImpl implements MutableSegment {
     final RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = config.getRealtimeSegmentZKMetadata();
     _segmentMetadata =
         new SegmentMetadataImpl(realtimeSegmentZKMetadata.getTableName(), realtimeSegmentZKMetadata.getSegmentName(),
-            realtimeSegmentZKMetadata.getCreationTime(), realtimeSegmentZKMetadata.getStartTime(),
-            realtimeSegmentZKMetadata.getEndTime(), realtimeSegmentZKMetadata.getTimeUnit(),
-            realtimeSegmentZKMetadata.getTotalDocs(), realtimeSegmentZKMetadata.getCrc(), _schema) {
+            _schema, realtimeSegmentZKMetadata.getCreationTime()) {
           @Override
           public int getTotalDocs() {
             return _numDocsIndexed;
@@ -876,8 +874,6 @@ public class MutableSegmentImpl implements MutableSegment {
       }
     }
 
-    _segmentMetadata.close();
-
     // NOTE: Close the memory manager as the last step. It will release all the PinotDataBuffers allocated.
     try {
       _memoryManager.close();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java
index 4c54e57..8f6528f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java
@@ -22,8 +22,7 @@ import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext
 import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnIndexContainer;
 import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
 
 
 /**
@@ -31,12 +30,9 @@ import org.apache.pinot.spi.data.FieldSpec;
  */
 public abstract class BaseVirtualColumnProvider implements VirtualColumnProvider {
 
-  protected ColumnMetadata.Builder getColumnMetadataBuilder(VirtualColumnContext context) {
-    FieldSpec fieldSpec = context.getFieldSpec();
-    return new ColumnMetadata.Builder().setVirtual(true).setColumnName(fieldSpec.getName())
-        .setFieldType(fieldSpec.getFieldType()).setDataType(fieldSpec.getDataType())
-        .setTotalDocs(context.getTotalDocCount()).setSingleValue(fieldSpec.isSingleValueField())
-        .setDefaultNullValueString(context.getFieldSpec().getDefaultNullValueString());
+  protected ColumnMetadataImpl.Builder getColumnMetadataBuilder(VirtualColumnContext context) {
+    return new ColumnMetadataImpl.Builder().setFieldSpec(context.getFieldSpec())
+        .setTotalDocs(context.getTotalDocCount());
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
index 0405547..23ed46d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
@@ -28,7 +28,7 @@ import org.apache.pinot.segment.local.segment.index.readers.constant.ConstantMVF
 import org.apache.pinot.segment.local.segment.index.readers.constant.ConstantMVInvertedIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.constant.ConstantSortedIndexReader;
 import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
@@ -80,8 +80,8 @@ public class DefaultNullValueVirtualColumnProvider extends BaseVirtualColumnProv
   }
 
   @Override
-  public ColumnMetadata buildMetadata(VirtualColumnContext context) {
-    return getColumnMetadataBuilder(context).setCardinality(1).setHasDictionary(true).setHasInvertedIndex(true)
-        .setIsSorted(context.getFieldSpec().isSingleValueField()).build();
+  public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) {
+    return getColumnMetadataBuilder(context).setCardinality(1).setSorted(context.getFieldSpec().isSingleValueField())
+        .setHasDictionary(true).build();
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index 43ab10c..d6f3ba7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -48,8 +48,8 @@ import org.apache.pinot.segment.local.segment.index.readers.geospatial.Immutable
 import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
 import org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
index 129d968..ecb3fc3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
@@ -71,7 +71,7 @@ public class SegmentV1V2ToV3FormatConverter implements SegmentFormatConverter {
 
     // check existing segment version
     SegmentMetadataImpl v2Metadata = new SegmentMetadataImpl(v2SegmentDirectory);
-    SegmentVersion oldVersion = SegmentVersion.valueOf(v2Metadata.getVersion());
+    SegmentVersion oldVersion = v2Metadata.getVersion();
     Preconditions.checkState(oldVersion != SegmentVersion.v3, "Segment {} is already in v3 format but at wrong path",
         v2Metadata.getName());
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java
index 52adf88..4ceeaf9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java
@@ -20,8 +20,8 @@ package org.apache.pinot.segment.local.segment.index.datasource;
 
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.data.FieldSpec;
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/ImmutableDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/ImmutableDataSource.java
index 633ff20..415bd8f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/ImmutableDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/ImmutableDataSource.java
@@ -20,9 +20,9 @@ package org.apache.pinot.segment.local.segment.index.datasource;
 
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.data.FieldSpec;
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java
index 8e47f9f..a5b0843 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import javax.annotation.Nonnull;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
@@ -33,7 +32,7 @@ import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVFo
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -157,7 +156,7 @@ public class LoaderUtils {
    * </ul>
    * <p>Should be called before trying to load the segment or metadata from index directory.
    */
-  public static void reloadFailureRecovery(@Nonnull File indexDir)
+  public static void reloadFailureRecovery(File indexDir)
       throws IOException {
     File parentDir = indexDir.getParentFile();
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
index c2c8927..f0aedc7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -34,10 +34,10 @@ import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -64,7 +64,7 @@ public class BloomFilterHandler {
     _indexDir = indexDir;
     _segmentWriter = segmentWriter;
     _segmentName = segmentMetadata.getName();
-    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+    _segmentVersion = segmentMetadata.getVersion();
     _bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs();
 
     for (String column : _bloomFilterConfigs.keySet()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index 7c7e442..b87d0ea 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -30,7 +30,7 @@ import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 351b4f1..6329be2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -50,12 +50,12 @@ import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPre
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.TextIndexType;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -66,11 +66,9 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.ByteArray;
-import org.apache.pinot.spi.utils.BytesUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.pinot.spi.data.FieldSpec.DataType.BYTES;
 import static org.apache.pinot.spi.data.FieldSpec.FieldType.DATE_TIME;
 import static org.apache.pinot.spi.data.FieldSpec.FieldType.DIMENSION;
 import static org.apache.pinot.spi.data.FieldSpec.FieldType.METRIC;
@@ -230,7 +228,8 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
         }
 
         // Check the field type matches.
-        FieldSpec.FieldType fieldTypeInMetadata = columnMetadata.getFieldType();
+        FieldSpec fieldSpecInMetadata = columnMetadata.getFieldSpec();
+        FieldSpec.FieldType fieldTypeInMetadata = fieldSpecInMetadata.getFieldType();
         if (fieldTypeInMetadata != fieldTypeInSchema) {
           String failureMessage = "Field type: " + fieldTypeInMetadata + " for auto-generated column: " + column
               + " does not match field type: " + fieldTypeInSchema
@@ -239,18 +238,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
         }
 
         // Check the data type and default value matches.
-        DataType dataTypeInMetadata = columnMetadata.getDataType();
+        DataType dataTypeInMetadata = fieldSpecInMetadata.getDataType();
         DataType dataTypeInSchema = fieldSpecInSchema.getDataType();
-        boolean isSingleValueInMetadata = columnMetadata.isSingleValue();
+        boolean isSingleValueInMetadata = fieldSpecInMetadata.isSingleValueField();
         boolean isSingleValueInSchema = fieldSpecInSchema.isSingleValueField();
-        String defaultValueInMetadata = columnMetadata.getDefaultNullValueString();
-
-        String defaultValueInSchema;
-        if (dataTypeInSchema == BYTES) {
-          defaultValueInSchema = BytesUtils.toHexString((byte[]) fieldSpecInSchema.getDefaultNullValue());
-        } else {
-          defaultValueInSchema = fieldSpecInSchema.getDefaultNullValue().toString();
-        }
+        String defaultValueInMetadata = fieldSpecInMetadata.getDefaultNullValueString();
+        String defaultValueInSchema = fieldSpecInSchema.getDefaultNullValueString();
 
         if (fieldTypeInMetadata == DIMENSION) {
           if (dataTypeInMetadata != dataTypeInSchema) {
@@ -302,7 +295,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
 
         // Only remove auto-generated columns.
         if (columnMetadata.isAutoGenerated()) {
-          FieldSpec.FieldType fieldTypeInMetadata = columnMetadata.getFieldType();
+          FieldSpec.FieldType fieldTypeInMetadata = columnMetadata.getFieldSpec().getFieldType();
           if (fieldTypeInMetadata == DIMENSION) {
             defaultColumnActionMap.put(column, DefaultColumnAction.REMOVE_DIMENSION);
           } else if (fieldTypeInMetadata == METRIC) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerFactory.java
index a446ace..f723fd5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerFactory.java
@@ -32,7 +32,7 @@ public class DefaultColumnHandlerFactory {
 
   public static DefaultColumnHandler getDefaultColumnHandler(File indexDir, SegmentMetadataImpl segmentMetadata,
       IndexLoadingConfig indexLoadingConfig, Schema schema, SegmentDirectory.Writer segmentWriter) {
-    if (SegmentVersion.valueOf(segmentMetadata.getVersion()) == SegmentVersion.v3) {
+    if (segmentMetadata.getVersion() == SegmentVersion.v3) {
       return new V3DefaultColumnHandler(indexDir, segmentMetadata, indexLoadingConfig, schema, segmentWriter);
     } else {
       return new V1DefaultColumnHandler(indexDir, segmentMetadata, indexLoadingConfig, schema, segmentWriter);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
index bb74b24..d713e96 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
@@ -30,10 +30,10 @@ import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHea
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -61,7 +61,7 @@ public class H3IndexHandler {
     _indexDir = indexDir;
     _segmentWriter = segmentWriter;
     _segmentName = segmentMetadata.getName();
-    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+    _segmentVersion = segmentMetadata.getVersion();
 
     for (Map.Entry<String, H3IndexConfig> entry : indexLoadingConfig.getH3IndexConfigs().entrySet()) {
       String column = entry.getKey();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
index 625d4ea..f19d49e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -26,10 +26,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -48,12 +48,12 @@ public class InvertedIndexHandler {
   private final SegmentVersion _segmentVersion;
   private final Set<ColumnMetadata> _invertedIndexColumns = new HashSet<>();
 
-  public InvertedIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+  public InvertedIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
       SegmentDirectory.Writer segmentWriter) {
     _indexDir = indexDir;
     _segmentWriter = segmentWriter;
     _segmentName = segmentMetadata.getName();
-    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+    _segmentVersion = segmentMetadata.getVersion();
 
     // Only create inverted index on dictionary-encoded unsorted columns
     for (String column : indexLoadingConfig.getInvertedIndexColumns()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
index 3773989..568bd8a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
@@ -28,9 +28,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -52,12 +53,12 @@ public class JsonIndexHandler {
   private final SegmentVersion _segmentVersion;
   private final Set<ColumnMetadata> _jsonIndexColumns = new HashSet<>();
 
-  public JsonIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+  public JsonIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
       SegmentDirectory.Writer segmentWriter) {
     _indexDir = indexDir;
     _segmentWriter = segmentWriter;
     _segmentName = segmentMetadata.getName();
-    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+    _segmentVersion = segmentMetadata.getVersion();
 
     for (String column : indexLoadingConfig.getJsonIndexColumns()) {
       ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/LuceneFSTIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/LuceneFSTIndexHandler.java
index 8add430..9de3bf0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/LuceneFSTIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/LuceneFSTIndexHandler.java
@@ -27,9 +27,9 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
@@ -67,12 +67,12 @@ public class LuceneFSTIndexHandler {
   private final SegmentVersion _segmentVersion;
   private final Set<ColumnMetadata> _fstIndexColumns = new HashSet<>();
 
-  public LuceneFSTIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, Set<String> fstIndexColumns,
+  public LuceneFSTIndexHandler(File indexDir, SegmentMetadata segmentMetadata, Set<String> fstIndexColumns,
       SegmentDirectory.Writer segmentWriter) {
     _indexDir = indexDir;
     _segmentWriter = segmentWriter;
     _segmentName = segmentMetadata.getName();
-    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+    _segmentVersion = segmentMetadata.getVersion();
 
     for (String column : fstIndexColumns) {
       ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
index 60331a2..77fbca6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -26,10 +26,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -49,12 +49,12 @@ public class RangeIndexHandler {
   private final SegmentVersion _segmentVersion;
   private final Set<ColumnMetadata> _rangeIndexColumns = new HashSet<>();
 
-  public RangeIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+  public RangeIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
       SegmentDirectory.Writer segmentWriter) {
     _indexDir = indexDir;
     _segmentWriter = segmentWriter;
     _segmentName = segmentMetadata.getName();
-    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+    _segmentVersion = segmentMetadata.getVersion();
 
     // Only create range index on dictionary-encoded unsorted columns
     for (String column : indexLoadingConfig.getRangeIndexColumns()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
index 3a06f17..878be6a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -45,9 +45,10 @@ import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
 import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.TextIndexType;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -91,12 +92,12 @@ public class TextIndexHandler {
   private final SegmentVersion _segmentVersion;
   private final Set<ColumnMetadata> _textIndexColumns = new HashSet<>();
 
-  public TextIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, Set<String> textIndexColumns,
+  public TextIndexHandler(File indexDir, SegmentMetadata segmentMetadata, Set<String> textIndexColumns,
       SegmentDirectory.Writer segmentWriter) {
     _indexDir = indexDir;
     _segmentWriter = segmentWriter;
     _segmentName = segmentMetadata.getName();
-    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+    _segmentVersion = segmentMetadata.getVersion();
 
     for (String column : textIndexColumns) {
       ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
@@ -155,8 +156,8 @@ public class TextIndexHandler {
       if (!hasDictionary) {
         // text index on raw column, just read the raw forward index
         VarByteChunkSVForwardIndexReader rawIndexReader = (VarByteChunkSVForwardIndexReader) forwardIndexReader;
-        BaseChunkSVForwardIndexReader.ChunkReaderContext
-            chunkReaderContext = (BaseChunkSVForwardIndexReader.ChunkReaderContext) readerContext;
+        BaseChunkSVForwardIndexReader.ChunkReaderContext chunkReaderContext =
+            (BaseChunkSVForwardIndexReader.ChunkReaderContext) readerContext;
         for (int docId = 0; docId < numDocs; docId++) {
           textIndexCreator.add(rawIndexReader.getString(docId, chunkReaderContext));
         }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
index 35c5583..7a7eaa5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteOrder;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -134,39 +135,50 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
 
   @VisibleForTesting
   File getFileFor(String column, ColumnIndexType indexType) {
-    String filename;
+    String fileExtension;
     switch (indexType) {
       case DICTIONARY:
-        filename = _segmentMetadata.getDictionaryFileName(column);
+        fileExtension = V1Constants.Dict.FILE_EXTENSION;
         break;
       case FORWARD_INDEX:
-        filename = _segmentMetadata.getForwardIndexFileName(column);
+        ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
+        if (columnMetadata.isSingleValue()) {
+          if (!columnMetadata.hasDictionary()) {
+            fileExtension = V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION;
+          } else if (columnMetadata.isSorted()) {
+            fileExtension = V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+          } else {
+            fileExtension = V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+          }
+        } else {
+          fileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+        }
         break;
       case INVERTED_INDEX:
-        filename = _segmentMetadata.getBitmapInvertedIndexFileName(column);
+        fileExtension = V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
         break;
       case RANGE_INDEX:
-        filename = _segmentMetadata.getBitmapRangeIndexFileName(column);
+        fileExtension = V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
         break;
       case BLOOM_FILTER:
-        filename = _segmentMetadata.getBloomFilterFileName(column);
+        fileExtension = V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION;
         break;
       case NULLVALUE_VECTOR:
-        filename = _segmentMetadata.getNullValueVectorFileName(column);
+        fileExtension = V1Constants.Indexes.NULLVALUE_VECTOR_FILE_EXTENSION;
         break;
       case TEXT_INDEX:
-        filename = column + V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION;
+        fileExtension = V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION;
         break;
       case FST_INDEX:
-        filename = column + V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+        fileExtension = V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
         break;
       case JSON_INDEX:
-        filename = column + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION;
+        fileExtension = V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION;
         break;
       default:
-        throw new UnsupportedOperationException("Unknown index type: " + indexType.toString());
+        throw new IllegalStateException("Unsupported index type: " + indexType);
     }
-    return new File(_segmentDirectory, filename);
+    return new File(_segmentDirectory, column + fileExtension);
   }
 
   private PinotDataBuffer mapForWrites(File file, long sizeBytes, String context)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
index 128b7a5..f62bed7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
 import static org.apache.pinot.segment.spi.creator.SegmentVersion.v1;
 import static org.apache.pinot.segment.spi.creator.SegmentVersion.v2;
 import static org.apache.pinot.segment.spi.creator.SegmentVersion.v3;
-import static org.apache.pinot.segment.spi.creator.SegmentVersion.valueOf;
 
 
 public class SegmentLocalFSDirectory extends SegmentDirectory {
@@ -74,7 +73,7 @@ public class SegmentLocalFSDirectory extends SegmentDirectory {
     Preconditions.checkNotNull(metadata);
 
     _indexDir = directoryFile;
-    _segmentDirectory = getSegmentPath(directoryFile, metadata.getSegmentVersion());
+    _segmentDirectory = getSegmentPath(directoryFile, metadata.getVersion());
     Preconditions.checkState(_segmentDirectory.exists(), "Segment directory: " + directoryFile + " must exist");
 
     _segmentLock = new SegmentLock();
@@ -206,10 +205,7 @@ public class SegmentLocalFSDirectory extends SegmentDirectory {
       return;
     }
 
-    String version = _segmentMetadata.getVersion();
-    SegmentVersion segmentVersion = valueOf(version);
-
-    switch (segmentVersion) {
+    switch (_segmentMetadata.getVersion()) {
       case v1:
       case v2:
         _columnIndexDirectory = new FilePerIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java
index 1d1cb5c..a1770ef 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java
@@ -21,7 +21,7 @@ package org.apache.pinot.segment.local.segment.virtualcolumn;
 import java.io.IOException;
 import org.apache.pinot.segment.local.segment.index.column.BaseVirtualColumnProvider;
 import org.apache.pinot.segment.local.segment.index.readers.DocIdDictionary;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -47,10 +47,9 @@ public class DocIdVirtualColumnProvider extends BaseVirtualColumnProvider {
   }
 
   @Override
-  public ColumnMetadata buildMetadata(VirtualColumnContext context) {
-    ColumnMetadata.Builder columnMetadataBuilder = super.getColumnMetadataBuilder(context);
-    columnMetadataBuilder.setCardinality(context.getTotalDocCount()).setHasDictionary(true).setHasInvertedIndex(true)
-        .setSingleValue(true).setIsSorted(true);
+  public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) {
+    ColumnMetadataImpl.Builder columnMetadataBuilder = super.getColumnMetadataBuilder(context);
+    columnMetadataBuilder.setCardinality(context.getTotalDocCount()).setSorted(true).setHasDictionary(true);
     return columnMetadataBuilder.build();
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java
index 658a4a4..ee30e08 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.segment.local.segment.virtualcolumn;
 
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/StarTreeBuilderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/StarTreeBuilderUtils.java
index a991feb..c16bf3c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/StarTreeBuilderUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/StarTreeBuilderUtils.java
@@ -32,8 +32,8 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -65,7 +65,7 @@ public class StarTreeBuilderUtils {
    * Generates the deduplicated star-tree builder configs.
    */
   public static List<StarTreeV2BuilderConfig> generateBuilderConfigs(@Nullable List<StarTreeIndexConfig> indexConfigs,
-      boolean enableDefaultStarTree, SegmentMetadataImpl segmentMetadata) {
+      boolean enableDefaultStarTree, SegmentMetadata segmentMetadata) {
     List<StarTreeV2BuilderConfig> builderConfigs = new ArrayList<>();
     if (indexConfigs != null) {
       for (StarTreeIndexConfig indexConfig : indexConfigs) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/MultipleTreesBuilder.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/MultipleTreesBuilder.java
index 31e6e2c..68303c2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/MultipleTreesBuilder.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/MultipleTreesBuilder.java
@@ -38,7 +38,6 @@ import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils.In
 import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils.IndexValue;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants.MetadataKey;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
@@ -108,8 +107,8 @@ public class MultipleTreesBuilder implements Closeable {
     Preconditions.checkState(!_metadataProperties.containsKey(MetadataKey.STAR_TREE_COUNT), "Star-tree already exists");
     _segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
     try {
-      _builderConfigs = StarTreeBuilderUtils.generateBuilderConfigs(indexConfigs, enableDefaultStarTree,
-          (SegmentMetadataImpl) _segment.getSegmentMetadata());
+      _builderConfigs = StarTreeBuilderUtils
+          .generateBuilderConfigs(indexConfigs, enableDefaultStarTree, _segment.getSegmentMetadata());
     } catch (Exception e) {
       _segment.destroy();
       throw e;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java
index e828506..1c8fde3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java
@@ -28,8 +28,8 @@ import java.util.Set;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -95,7 +95,7 @@ public class StarTreeV2BuilderConfig {
    *   <li>Use default value for max leaf records</li>
    * </ul>
    */
-  public static StarTreeV2BuilderConfig generateDefaultConfig(SegmentMetadataImpl segmentMetadata) {
+  public static StarTreeV2BuilderConfig generateDefaultConfig(SegmentMetadata segmentMetadata) {
     Schema schema = segmentMetadata.getSchema();
     List<ColumnMetadata> dimensionColumnMetadataList = new ArrayList<>();
     List<ColumnMetadata> timeColumnMetadataList = new ArrayList<>();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
index e03b267..b3c5be2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
@@ -30,9 +30,9 @@ import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVFo
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.startree.OffHeapStarTree;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.segment.spi.index.startree.StarTree;
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/IntArraysTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/IntArraysTest.java
index 94e96f5..3b40f62 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/IntArraysTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/IntArraysTest.java
@@ -28,11 +28,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.spi.utils.ReadMode;
@@ -92,8 +91,7 @@ public class IntArraysTest {
       throws Exception {
     ImmutableSegment heapSegment = ImmutableSegmentLoader.load(INDEX_DIR.listFiles()[0], ReadMode.heap);
     ImmutableSegment mmapSegment = ImmutableSegmentLoader.load(INDEX_DIR.listFiles()[0], ReadMode.mmap);
-    Map<String, ColumnMetadata> metadataMap =
-        ((SegmentMetadataImpl) heapSegment.getSegmentMetadata()).getColumnMetadataMap();
+    Map<String, ColumnMetadata> metadataMap = heapSegment.getSegmentMetadata().getColumnMetadataMap();
 
     for (String column : metadataMap.keySet()) {
       ForwardIndexReader heapForwardIndex = heapSegment.getForwardIndex(column);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
index 88fd1ba..081e254 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
@@ -24,12 +24,13 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -79,64 +80,47 @@ public class ColumnMetadataTest {
     return config;
   }
 
-  public void verifySegmentAfterLoading(SegmentMetadataImpl metadata) {
+  public void verifySegmentAfterLoading(SegmentMetadata segmentMetadata) {
     // Multi-value numeric dimension column.
-    ColumnMetadata col7Meta = metadata.getColumnMetadataFor("column7");
-    Assert.assertEquals(col7Meta.getColumnName(), "column7");
+    ColumnMetadata col7Meta = segmentMetadata.getColumnMetadataFor("column7");
+    Assert.assertEquals(col7Meta.getFieldSpec(), new DimensionFieldSpec("column7", DataType.INT, false));
     Assert.assertEquals(col7Meta.getCardinality(), 359);
     Assert.assertEquals(col7Meta.getTotalDocs(), 100000);
-    Assert.assertEquals(col7Meta.getDataType(), FieldSpec.DataType.INT);
     Assert.assertEquals(col7Meta.getBitsPerElement(), 9);
     Assert.assertEquals(col7Meta.getColumnMaxLength(), 0);
-    Assert.assertEquals(col7Meta.getFieldType(), FieldSpec.FieldType.DIMENSION);
     Assert.assertFalse(col7Meta.isSorted());
-    Assert.assertFalse(col7Meta.hasNulls());
     Assert.assertTrue(col7Meta.hasDictionary());
-    Assert.assertTrue(col7Meta.hasInvertedIndex());
-    Assert.assertFalse(col7Meta.isSingleValue());
     Assert.assertEquals(col7Meta.getMaxNumberOfMultiValues(), 24);
     Assert.assertEquals(col7Meta.getTotalNumberOfEntries(), 134090);
     Assert.assertFalse(col7Meta.isAutoGenerated());
-    Assert.assertEquals(col7Meta.getDefaultNullValueString(), String.valueOf(Integer.MIN_VALUE));
 
     // Single-value string dimension column.
-    ColumnMetadata col3Meta = metadata.getColumnMetadataFor("column3");
-    Assert.assertEquals(col3Meta.getColumnName(), "column3");
+    ColumnMetadata col3Meta = segmentMetadata.getColumnMetadataFor("column3");
+    Assert.assertEquals(col3Meta.getFieldSpec(), new DimensionFieldSpec("column3", DataType.STRING, true));
     Assert.assertEquals(col3Meta.getCardinality(), 5);
     Assert.assertEquals(col3Meta.getTotalDocs(), 100000);
-    Assert.assertEquals(col3Meta.getDataType(), FieldSpec.DataType.STRING);
     Assert.assertEquals(col3Meta.getBitsPerElement(), 3);
     Assert.assertEquals(col3Meta.getColumnMaxLength(), 4);
-    Assert.assertEquals(col3Meta.getFieldType(), FieldSpec.FieldType.DIMENSION);
     Assert.assertFalse(col3Meta.isSorted());
-    Assert.assertFalse(col3Meta.hasNulls());
     Assert.assertTrue(col3Meta.hasDictionary());
-    Assert.assertTrue(col3Meta.hasInvertedIndex());
-    Assert.assertFalse(col3Meta.hasFSTIndex());
-    Assert.assertTrue(col3Meta.isSingleValue());
     Assert.assertEquals(col3Meta.getMaxNumberOfMultiValues(), 0);
     Assert.assertEquals(col3Meta.getTotalNumberOfEntries(), 100000);
     Assert.assertFalse(col3Meta.isAutoGenerated());
-    Assert.assertEquals(col3Meta.getDefaultNullValueString(), "null");
 
     // Time column.
-    ColumnMetadata timeColumn = metadata.getColumnMetadataFor("daysSinceEpoch");
+    // FIXME: Currently it is modeled as dimension in the auto-generated schema
+    ColumnMetadata timeColumn = segmentMetadata.getColumnMetadataFor("daysSinceEpoch");
+    Assert.assertEquals(timeColumn.getFieldSpec(), new DimensionFieldSpec("daysSinceEpoch", DataType.INT, true));
     Assert.assertEquals(timeColumn.getColumnName(), "daysSinceEpoch");
     Assert.assertEquals(timeColumn.getCardinality(), 1);
     Assert.assertEquals(timeColumn.getTotalDocs(), 100000);
-    Assert.assertEquals(timeColumn.getDataType(), FieldSpec.DataType.INT);
     Assert.assertEquals(timeColumn.getBitsPerElement(), 1);
     Assert.assertEquals(timeColumn.getColumnMaxLength(), 0);
-    Assert.assertEquals(timeColumn.getFieldType(), FieldSpec.FieldType.DIMENSION);
     Assert.assertTrue(timeColumn.isSorted());
-    Assert.assertFalse(timeColumn.hasNulls());
     Assert.assertTrue(timeColumn.hasDictionary());
-    Assert.assertTrue(timeColumn.hasInvertedIndex());
-    Assert.assertTrue(timeColumn.isSingleValue());
     Assert.assertEquals(timeColumn.getMaxNumberOfMultiValues(), 0);
     Assert.assertEquals(timeColumn.getTotalNumberOfEntries(), 100000);
     Assert.assertFalse(timeColumn.isAutoGenerated());
-    Assert.assertEquals(timeColumn.getDefaultNullValueString(), String.valueOf(Integer.MIN_VALUE));
   }
 
   @Test
@@ -150,11 +134,11 @@ public class ColumnMetadataTest {
 
     // Load segment metadata.
     IndexSegment segment = ImmutableSegmentLoader.load(INDEX_DIR.listFiles()[0], ReadMode.mmap);
-    SegmentMetadataImpl metadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
-    verifySegmentAfterLoading(metadata);
+    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+    verifySegmentAfterLoading(segmentMetadata);
 
     // Make sure we got the creator name as well.
-    String creatorName = metadata.getCreatorName();
+    String creatorName = segmentMetadata.getCreatorName();
     Assert.assertEquals(creatorName, CREATOR_VERSION);
   }
 
@@ -169,12 +153,11 @@ public class ColumnMetadataTest {
 
     // Load segment metadata.
     IndexSegment segment = ImmutableSegmentLoader.load(INDEX_DIR.listFiles()[0], ReadMode.mmap);
-    SegmentMetadataImpl metadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
-    verifySegmentAfterLoading(metadata);
+    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+    verifySegmentAfterLoading(segmentMetadata);
 
     // Make sure we get null for creator name.
-    String creatorName = metadata.getCreatorName();
-    Assert.assertEquals(creatorName, null);
+    Assert.assertNull(segmentMetadata.getCreatorName());
   }
 
   @Test
@@ -188,11 +171,7 @@ public class ColumnMetadataTest {
 
     // Load segment metadata.
     IndexSegment segment = ImmutableSegmentLoader.load(INDEX_DIR.listFiles()[0], ReadMode.mmap);
-    SegmentMetadataImpl metadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+    SegmentMetadata metadata = segment.getSegmentMetadata();
     verifySegmentAfterLoading(metadata);
-
-    // Make sure we get null for creator name.
-    char paddingCharacter = metadata.getPaddingCharacter();
-    Assert.assertEquals(paddingCharacter, '\0');
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/SegmentMetadataImplTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/SegmentMetadataImplTest.java
index d559923..46d24bd 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/SegmentMetadataImplTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/SegmentMetadataImplTest.java
@@ -26,9 +26,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -82,13 +82,10 @@ public class SegmentMetadataImplTest {
     JsonNode jsonMeta = metadata.toJson(null);
     assertEquals(jsonMeta.get("segmentName").asText(), metadata.getName());
     Assert.assertEquals(jsonMeta.get("crc").asLong(), Long.valueOf(metadata.getCrc()).longValue());
-    Assert.assertEquals(jsonMeta.get("paddingCharacter").asText(), String.valueOf(metadata.getPaddingCharacter()));
     Assert.assertTrue(jsonMeta.get("creatorName").isNull());
     assertEquals(jsonMeta.get("creationTimeMillis").asLong(), metadata.getIndexCreationTime());
     assertEquals(jsonMeta.get("startTimeMillis").asLong(), metadata.getTimeInterval().getStartMillis());
     assertEquals(jsonMeta.get("endTimeMillis").asLong(), metadata.getTimeInterval().getEndMillis());
-    assertEquals(jsonMeta.get("pushTimeMillis").asLong(), metadata.getPushTime());
-    assertEquals(jsonMeta.get("refreshTimeMillis").asLong(), metadata.getPushTime());
     assertEquals(jsonMeta.get("custom").get("k1").asText(), metadata.getCustomMap().get("k1"));
     assertEquals(jsonMeta.get("custom").get("k2").asText(), metadata.getCustomMap().get("k2"));
 
@@ -102,7 +99,6 @@ public class SegmentMetadataImplTest {
       assertEquals(jsonColumnMeta.get("cardinality").asInt(), columnMeta.getCardinality());
       assertEquals(jsonColumnMeta.get("bitsPerElement").asInt(), columnMeta.getBitsPerElement());
       assertEquals(jsonColumnMeta.get("sorted").asBoolean(), columnMeta.isSorted());
-      assertEquals(jsonColumnMeta.get("containsNulls").asBoolean(), columnMeta.hasNulls());
       assertEquals(jsonColumnMeta.get("hasDictionary").asBoolean(), columnMeta.hasDictionary());
     }
   }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProviderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProviderTest.java
index 2f9cd91..4c25b8e 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProviderTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProviderTest.java
@@ -18,160 +18,138 @@
  */
 package org.apache.pinot.segment.local.segment.index.column;
 
+import org.apache.pinot.segment.local.segment.index.readers.ConstantValueBytesDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.ConstantValueDoubleDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.ConstantValueFloatDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.ConstantValueIntDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.ConstantValueLongDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.ConstantValueStringDictionary;
 import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.FieldSpec.FieldType;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
 
-public class DefaultNullValueVirtualColumnProviderTest {
 
-  FieldSpec svStringFieldSpec = new DimensionFieldSpec("svStringColumn", DataType.STRING, true);
-  FieldSpec svIntFieldSpec = new DimensionFieldSpec("svIntColumn", DataType.INT, true);
-  FieldSpec svLongFieldSpec = new DimensionFieldSpec("svLongColumn", DataType.LONG, true);
-  FieldSpec svDoubleFieldSpec = new DimensionFieldSpec("svDoubleColumn", DataType.DOUBLE, true);
-  FieldSpec svFloatFieldSpec = new DimensionFieldSpec("svFloatColumn", DataType.FLOAT, true);
-  FieldSpec mvStringFieldSpec = new DimensionFieldSpec("mvStringColumn", DataType.STRING, false);
-  FieldSpec mvIntFieldSpec = new DimensionFieldSpec("mvIntColumn", DataType.INT, false);
-  FieldSpec mvLongFieldSpec = new DimensionFieldSpec("mvLongColumn", DataType.LONG, false);
-  FieldSpec mvDoubleFieldSpec = new DimensionFieldSpec("mvDoubleColumn", DataType.DOUBLE, false);
-  FieldSpec mvFloatFieldSpec = new DimensionFieldSpec("mvFloatColumn", DataType.FLOAT, false);
+public class DefaultNullValueVirtualColumnProviderTest {
+  private static final FieldSpec SV_INT = new DimensionFieldSpec("svIntColumn", DataType.INT, true);
+  private static final FieldSpec SV_LONG = new DimensionFieldSpec("svLongColumn", DataType.LONG, true);
+  private static final FieldSpec SV_FLOAT = new DimensionFieldSpec("svFloatColumn", DataType.FLOAT, true);
+  private static final FieldSpec SV_DOUBLE = new DimensionFieldSpec("svDoubleColumn", DataType.DOUBLE, true);
+  private static final FieldSpec SV_STRING = new DimensionFieldSpec("svStringColumn", DataType.STRING, true);
+  private static final FieldSpec SV_BYTES = new DimensionFieldSpec("svBytesColumn", DataType.BYTES, true);
+  private static final FieldSpec MV_INT = new DimensionFieldSpec("mvIntColumn", DataType.INT, false);
+  private static final FieldSpec MV_LONG = new DimensionFieldSpec("mvLongColumn", DataType.LONG, false);
+  private static final FieldSpec MV_FLOAT = new DimensionFieldSpec("mvFloatColumn", DataType.FLOAT, false);
+  private static final FieldSpec MV_DOUBLE = new DimensionFieldSpec("mvDoubleColumn", DataType.DOUBLE, false);
+  private static final FieldSpec MV_STRING = new DimensionFieldSpec("mvStringColumn", DataType.STRING, false);
 
   @Test
   public void testBuildMetadata() {
-    VirtualColumnContext virtualColumnContext = new VirtualColumnContext(svStringFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("svStringColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.STRING).setTotalDocs(1).setSingleValue(true).setDefaultNullValueString("null")
-            .setCardinality(1).setHasDictionary(true).setHasInvertedIndex(true).setIsSorted(true).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
-
-    virtualColumnContext = new VirtualColumnContext(svIntFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("svIntColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.INT).setTotalDocs(1).setSingleValue(true).setDefaultNullValueString("-2147483648")
-            .setCardinality(1).setHasDictionary(true).setHasInvertedIndex(true).setIsSorted(true).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
-
-    virtualColumnContext = new VirtualColumnContext(svLongFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("svLongColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.LONG).setTotalDocs(1).setSingleValue(true)
-            .setDefaultNullValueString("-9223372036854775808").setCardinality(1).setHasDictionary(true)
-            .setHasInvertedIndex(true).setIsSorted(true).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
-
-    virtualColumnContext = new VirtualColumnContext(svDoubleFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("svDoubleColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.DOUBLE).setTotalDocs(1).setSingleValue(true).setDefaultNullValueString("-Infinity")
-            .setCardinality(1).setHasDictionary(true).setHasInvertedIndex(true).setIsSorted(true).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
-
-    virtualColumnContext = new VirtualColumnContext(svFloatFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("svFloatColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.FLOAT).setTotalDocs(1).setSingleValue(true).setDefaultNullValueString("-Infinity")
-            .setCardinality(1).setHasDictionary(true).setHasInvertedIndex(true).setIsSorted(true).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
-
-    virtualColumnContext = new VirtualColumnContext(mvStringFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("mvStringColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.STRING).setTotalDocs(1).setSingleValue(false).setDefaultNullValueString("null")
-            .setCardinality(1).setHasDictionary(true).setHasInvertedIndex(true).setIsSorted(false).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
-
-    virtualColumnContext = new VirtualColumnContext(mvIntFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("mvIntColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.INT).setTotalDocs(1).setSingleValue(false).setDefaultNullValueString("-2147483648")
-            .setCardinality(1).setHasDictionary(true).setHasInvertedIndex(true).setIsSorted(false).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
-
-    virtualColumnContext = new VirtualColumnContext(mvLongFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("mvLongColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.LONG).setTotalDocs(1).setSingleValue(false)
-            .setDefaultNullValueString("-9223372036854775808").setCardinality(1).setHasDictionary(true)
-            .setHasInvertedIndex(true).setIsSorted(false).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
-
-    virtualColumnContext = new VirtualColumnContext(mvDoubleFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("mvDoubleColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.DOUBLE).setTotalDocs(1).setSingleValue(false).setDefaultNullValueString("-Infinity")
-            .setCardinality(1).setHasDictionary(true).setHasInvertedIndex(true).setIsSorted(false).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
-
-    virtualColumnContext = new VirtualColumnContext(mvFloatFieldSpec, 1);
-    Assert.assertEquals(
-        new ColumnMetadata.Builder().setVirtual(true).setColumnName("mvFloatColumn").setFieldType(FieldType.DIMENSION)
-            .setDataType(DataType.FLOAT).setTotalDocs(1).setSingleValue(false).setDefaultNullValueString("-Infinity")
-            .setCardinality(1).setHasDictionary(true).setHasInvertedIndex(true).setIsSorted(false).build(),
-        new DefaultNullValueVirtualColumnProvider().buildMetadata(virtualColumnContext));
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(SV_INT, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(SV_INT).setTotalDocs(1).setCardinality(1).setSorted(true)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(SV_LONG, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(SV_LONG).setTotalDocs(1).setCardinality(1).setSorted(true)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(SV_FLOAT, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(SV_FLOAT).setTotalDocs(1).setCardinality(1).setSorted(true)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(SV_DOUBLE, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(SV_DOUBLE).setTotalDocs(1).setCardinality(1).setSorted(true)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(SV_STRING, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(SV_STRING).setTotalDocs(1).setCardinality(1).setSorted(true)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(SV_BYTES, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(SV_BYTES).setTotalDocs(1).setCardinality(1).setSorted(true)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(MV_INT, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(MV_INT).setTotalDocs(1).setCardinality(1).setSorted(false)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(MV_LONG, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(MV_LONG).setTotalDocs(1).setCardinality(1).setSorted(false)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(MV_FLOAT, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(MV_FLOAT).setTotalDocs(1).setCardinality(1).setSorted(false)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(MV_DOUBLE, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(MV_DOUBLE).setTotalDocs(1).setCardinality(1).setSorted(false)
+            .setHasDictionary(true).build());
+
+    assertEquals(new DefaultNullValueVirtualColumnProvider().buildMetadata(new VirtualColumnContext(MV_STRING, 1)),
+        new ColumnMetadataImpl.Builder().setFieldSpec(MV_STRING).setTotalDocs(1).setCardinality(1).setSorted(false)
+            .setHasDictionary(true).build());
   }
 
   @Test
   public void testBuildDictionary() {
-    VirtualColumnContext virtualColumnContext = new VirtualColumnContext(svStringFieldSpec, 1);
+    VirtualColumnContext virtualColumnContext = new VirtualColumnContext(SV_INT, 1);
     Dictionary dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueStringDictionary.class, dictionary.getClass());
-    Assert.assertEquals("null", dictionary.getStringValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueIntDictionary.class);
+    assertEquals(dictionary.getIntValue(0), Integer.MIN_VALUE);
+
+    virtualColumnContext = new VirtualColumnContext(SV_LONG, 1);
+    dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
+    assertEquals(dictionary.getClass(), ConstantValueLongDictionary.class);
+    assertEquals(dictionary.getLongValue(0), Long.MIN_VALUE);
 
-    virtualColumnContext = new VirtualColumnContext(svIntFieldSpec, 1);
+    virtualColumnContext = new VirtualColumnContext(SV_FLOAT, 1);
     dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueIntDictionary.class, dictionary.getClass());
-    Assert.assertEquals(Integer.MIN_VALUE, dictionary.getIntValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueFloatDictionary.class);
+    assertEquals(dictionary.getFloatValue(0), Float.NEGATIVE_INFINITY);
 
-    virtualColumnContext = new VirtualColumnContext(svLongFieldSpec, 1);
+    virtualColumnContext = new VirtualColumnContext(SV_DOUBLE, 1);
     dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueLongDictionary.class, dictionary.getClass());
-    Assert.assertEquals(Long.MIN_VALUE, dictionary.getLongValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueDoubleDictionary.class);
+    assertEquals(dictionary.getDoubleValue(0), Double.NEGATIVE_INFINITY);
 
-    virtualColumnContext = new VirtualColumnContext(svDoubleFieldSpec, 1);
+    virtualColumnContext = new VirtualColumnContext(SV_STRING, 1);
     dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueDoubleDictionary.class, dictionary.getClass());
-    Assert.assertEquals(Double.NEGATIVE_INFINITY, dictionary.getDoubleValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueStringDictionary.class);
+    assertEquals(dictionary.getStringValue(0), "null");
 
-    virtualColumnContext = new VirtualColumnContext(svFloatFieldSpec, 1);
+    virtualColumnContext = new VirtualColumnContext(SV_BYTES, 1);
     dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueFloatDictionary.class, dictionary.getClass());
-    Assert.assertEquals(Float.NEGATIVE_INFINITY, dictionary.getFloatValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueBytesDictionary.class);
+    assertEquals(dictionary.getBytesValue(0), new byte[0]);
 
-    virtualColumnContext = new VirtualColumnContext(mvStringFieldSpec, 1);
+    virtualColumnContext = new VirtualColumnContext(MV_INT, 1);
     dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueStringDictionary.class, dictionary.getClass());
-    Assert.assertEquals("null", dictionary.getStringValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueIntDictionary.class);
+    assertEquals(dictionary.getIntValue(0), Integer.MIN_VALUE);
 
-    virtualColumnContext = new VirtualColumnContext(mvIntFieldSpec, 1);
+    virtualColumnContext = new VirtualColumnContext(MV_LONG, 1);
     dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueIntDictionary.class, dictionary.getClass());
-    Assert.assertEquals(Integer.MIN_VALUE, dictionary.getIntValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueLongDictionary.class);
+    assertEquals(dictionary.getLongValue(0), Long.MIN_VALUE);
 
-    virtualColumnContext = new VirtualColumnContext(mvLongFieldSpec, 1);
+    virtualColumnContext = new VirtualColumnContext(MV_FLOAT, 1);
     dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueLongDictionary.class, dictionary.getClass());
-    Assert.assertEquals(Long.MIN_VALUE, dictionary.getLongValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueFloatDictionary.class);
+    assertEquals(dictionary.getFloatValue(0), Float.NEGATIVE_INFINITY);
 
-    virtualColumnContext = new VirtualColumnContext(mvDoubleFieldSpec, 1);
+    virtualColumnContext = new VirtualColumnContext(MV_DOUBLE, 1);
     dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueDoubleDictionary.class, dictionary.getClass());
-    Assert.assertEquals(Double.NEGATIVE_INFINITY, dictionary.getDoubleValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueDoubleDictionary.class);
+    assertEquals(dictionary.getDoubleValue(0), Double.NEGATIVE_INFINITY);
 
-    virtualColumnContext = new VirtualColumnContext(mvFloatFieldSpec, 1);
+    virtualColumnContext = new VirtualColumnContext(MV_STRING, 1);
     dictionary = new DefaultNullValueVirtualColumnProvider().buildDictionary(virtualColumnContext);
-    Assert.assertEquals(ConstantValueFloatDictionary.class, dictionary.getClass());
-    Assert.assertEquals(Float.NEGATIVE_INFINITY, dictionary.getFloatValue(0));
+    assertEquals(dictionary.getClass(), ConstantValueStringDictionary.class);
+    assertEquals(dictionary.getStringValue(0), "null");
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverterTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverterTest.java
index 38beff3..6481976 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverterTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverterTest.java
@@ -100,7 +100,7 @@ public class SegmentV1V2ToV3FormatConverterTest {
     Assert.assertTrue(v3Location.isDirectory());
 
     SegmentMetadataImpl metadata = new SegmentMetadataImpl(v3Location);
-    Assert.assertEquals(metadata.getVersion(), SegmentVersion.v3.toString());
+    Assert.assertEquals(metadata.getVersion(), SegmentVersion.v3);
     Assert.assertTrue(new File(v3Location, V1Constants.SEGMENT_CREATION_META).exists());
     FileTime afterConversionTime = Files.getLastModifiedTime(v3Location.toPath());
 
@@ -109,7 +109,7 @@ public class SegmentV1V2ToV3FormatConverterTest {
     IndexSegment indexSegment = ImmutableSegmentLoader.load(_segmentDirectory, _v3IndexLoadingConfig);
     Assert.assertNotNull(indexSegment);
     Assert.assertEquals(indexSegment.getSegmentName(), metadata.getName());
-    Assert.assertEquals(SegmentVersion.v3, SegmentVersion.valueOf(indexSegment.getSegmentMetadata().getVersion()));
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
 
     FileTime afterLoadTime = Files.getLastModifiedTime(v3Location.toPath());
     Assert.assertEquals(afterConversionTime, afterLoadTime);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithBytesTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
index 75c821a..35bb6bb 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
@@ -155,7 +155,7 @@ public class SegmentGenerationWithBytesTypeTest {
   @Test
   public void testMetadata() {
     Assert.assertTrue(_segment.getDataSource(FIXED_BYTE_SORTED_COLUMN).getDataSourceMetadata().isSorted());
-    Assert.assertFalse(_segment.getSegmentMetadata().hasDictionary(FIXED_BYTES_NO_DICT_COLUMN));
+    Assert.assertFalse(_segment.getSegmentMetadata().getColumnMetadataFor(FIXED_BYTES_NO_DICT_COLUMN).hasDictionary());
   }
 
   @Test
@@ -207,8 +207,8 @@ public class SegmentGenerationWithBytesTypeTest {
     IndexSegment segment = buildSegmentFromAvro(schema, AVRO_DIR_NAME, AVRO_NAME, SEGMENT_NAME);
     SegmentMetadata metadata = segment.getSegmentMetadata();
 
-    Assert.assertTrue(metadata.hasDictionary(FIXED_BYTES_UNSORTED_COLUMN));
-    Assert.assertTrue(metadata.hasDictionary(VARIABLE_BYTES_COLUMN));
+    Assert.assertTrue(metadata.getColumnMetadataFor(FIXED_BYTES_UNSORTED_COLUMN).hasDictionary());
+    Assert.assertTrue(metadata.getColumnMetadataFor(VARIABLE_BYTES_COLUMN).hasDictionary());
 
     PinotSegmentRecordReader reader = new PinotSegmentRecordReader(new File(AVRO_DIR_NAME, SEGMENT_NAME));
     GenericRow row = new GenericRow();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
index c693704..450ef6c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentPartitionTest.java
@@ -33,10 +33,10 @@ import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.partition.ModuloPartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -104,15 +104,15 @@ public class SegmentPartitionTest {
    */
   @Test
   public void testMetadata() {
-    SegmentMetadataImpl metadata = (SegmentMetadataImpl) _segment.getSegmentMetadata();
-    ColumnMetadata columnMetadata = metadata.getColumnMetadataFor(PARTITIONED_COLUMN_NAME);
+    SegmentMetadata segmentMetadata = _segment.getSegmentMetadata();
+    ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(PARTITIONED_COLUMN_NAME);
 
     Assert.assertTrue(columnMetadata.getPartitionFunction() instanceof ModuloPartitionFunction);
 
     Set<Integer> actualPartitions = columnMetadata.getPartitions();
     Assert.assertEquals(actualPartitions, _expectedPartitions);
 
-    columnMetadata = metadata.getColumnMetadataFor(NON_PARTITIONED_COLUMN_NAME);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NON_PARTITIONED_COLUMN_NAME);
     Assert.assertNull(columnMetadata.getPartitionFunction());
     Assert.assertNull(columnMetadata.getPartitions());
   }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
index 53a631b..7d7871c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
@@ -33,12 +33,12 @@ import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
 import org.apache.pinot.segment.local.segment.index.converter.SegmentV1V2ToV3FormatConverter;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
@@ -123,7 +123,7 @@ public class LoaderTest {
   public void testLoad()
       throws Exception {
     constructV1Segment();
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getSegmentVersion(), SegmentVersion.v1);
+    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), SegmentVersion.v1);
     Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
 
     testConversion();
@@ -148,19 +148,19 @@ public class LoaderTest {
       throws Exception {
     // Do not set segment version, should not convert the segment
     IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, ReadMode.mmap);
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
     Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     indexSegment.destroy();
 
     // Set segment version to v1, should not convert the segment
     indexSegment = ImmutableSegmentLoader.load(_indexDir, _v1IndexLoadingConfig);
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
     Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     indexSegment.destroy();
 
     // Set segment version to v3, should convert the segment to v3
     indexSegment = ImmutableSegmentLoader.load(_indexDir, _v3IndexLoadingConfig);
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     indexSegment.destroy();
   }
@@ -308,7 +308,7 @@ public class LoaderTest {
   public void testFSTIndexLoad()
       throws Exception {
     constructSegmentWithFSTIndex(SegmentVersion.v3);
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getSegmentVersion(), SegmentVersion.v3);
+    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), SegmentVersion.v3);
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     verifyIndexDirIsV3(_indexDir);
     File fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, FST_INDEX_COL_NAME);
@@ -319,7 +319,7 @@ public class LoaderTest {
     indexLoadingConfig.setReadMode(ReadMode.mmap);
     IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should have is V3)
     verifyIndexDirIsV3(_indexDir);
@@ -336,7 +336,7 @@ public class LoaderTest {
     indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
     indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should have is V3)
     verifyIndexDirIsV3(_indexDir);
@@ -354,7 +354,7 @@ public class LoaderTest {
     constructSegmentWithFSTIndex(SegmentVersion.v1);
 
     // check that segment on-disk version is V1 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getSegmentVersion(), SegmentVersion.v1);
+    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), SegmentVersion.v1);
     // check that segment v1 dir exists
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
     // check that v3 index sub-dir does not exist
@@ -374,7 +374,7 @@ public class LoaderTest {
     indexLoadingConfig.setReadMode(ReadMode.mmap);
     indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
     Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
@@ -391,7 +391,7 @@ public class LoaderTest {
     indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
     indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
     Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
@@ -408,7 +408,7 @@ public class LoaderTest {
     indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
     indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
     // the index dir should exist in v3 format due to conversion
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     verifyIndexDirIsV3(_indexDir);
@@ -449,7 +449,7 @@ public class LoaderTest {
     constructSegmentWithTextIndex(SegmentVersion.v3);
 
     // check that segment on-disk version is V3 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getSegmentVersion(), SegmentVersion.v3);
+    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), SegmentVersion.v3);
     // check that V3 index sub-dir exists
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should have is V3)
@@ -470,7 +470,7 @@ public class LoaderTest {
     indexLoadingConfig.setReadMode(ReadMode.mmap);
     IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
     // no change/conversion should have happened in indexDir
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should have is V3)
@@ -499,7 +499,7 @@ public class LoaderTest {
     indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
     indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
     // no change/conversion should have happened in indexDir
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should have is V3)
@@ -529,7 +529,7 @@ public class LoaderTest {
     constructSegmentWithTextIndex(SegmentVersion.v1);
 
     // check that segment on-disk version is V1 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getSegmentVersion(), SegmentVersion.v1);
+    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), SegmentVersion.v1);
     // check that segment v1 dir exists
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
     // check that v3 index sub-dir does not exist
@@ -551,7 +551,7 @@ public class LoaderTest {
     indexLoadingConfig.setReadMode(ReadMode.mmap);
     indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
     Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
@@ -577,7 +577,7 @@ public class LoaderTest {
     indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
     indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
     Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
@@ -603,7 +603,7 @@ public class LoaderTest {
     indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
     indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
     // the index dir should exist in v3 format due to conversion
     Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
     verifyIndexDirIsV3(_indexDir);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index b96744a..9e9d3ba 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -36,12 +36,11 @@ import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
 import org.apache.pinot.segment.local.segment.index.converter.SegmentV1V2ToV3FormatConverter;
 import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.creator.TextIndexType;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -51,12 +50,12 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.ByteArray;
-import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
@@ -228,15 +227,17 @@ public class SegmentPreProcessorTest {
     _indexLoadingConfig.setFSTIndexColumns(fstColumns);
     _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
     constructV3Segment();
-    SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-        .load(_indexDir.toURI(), _configuration);
-    SegmentPreProcessor v3Processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchema4);
+    SegmentDirectory segmentDirectory =
+        SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader().load(_indexDir.toURI(), _configuration);
+    SegmentPreProcessor v3Processor =
+        new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchema4);
     Assert.expectThrows(UnsupportedOperationException.class, () -> v3Processor.process());
 
     constructV1Segment();
-    segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-        .load(_indexDir.toURI(), _configuration);
-    SegmentPreProcessor v1Processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchema4);
+    segmentDirectory =
+        SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader().load(_indexDir.toURI(), _configuration);
+    SegmentPreProcessor v1Processor =
+        new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchema4);
     Assert.expectThrows(UnsupportedOperationException.class, () -> v1Processor.process());
   }
 
@@ -323,18 +324,14 @@ public class SegmentPreProcessorTest {
     constructV3Segment();
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
     ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
-    // column exists and does not have text index enabled
     Assert.assertNotNull(columnMetadata);
-    Assert.assertEquals(columnMetadata.getTextIndexType(), TextIndexType.NONE);
     checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
 
     // Create a segment in V1, add a new column with text index enabled
     constructV1Segment();
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
     columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
-    // column exists and does not have text index enabled
     Assert.assertNotNull(columnMetadata);
-    Assert.assertEquals(columnMetadata.getTextIndexType(), TextIndexType.NONE);
     checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
   }
 
@@ -356,9 +353,7 @@ public class SegmentPreProcessorTest {
     constructV3Segment();
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
     ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
-    // column exists and does not have text index enabled
     Assert.assertNotNull(columnMetadata);
-    Assert.assertEquals(columnMetadata.getTextIndexType(), TextIndexType.NONE);
     // SegmentPreprocessor should have created the text index using TextIndexHandler
     checkTextIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, true, false, 26);
 
@@ -366,9 +361,7 @@ public class SegmentPreProcessorTest {
     constructV1Segment();
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
     columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
-    // column exists and does not have text index enabled
     Assert.assertNotNull(columnMetadata);
-    Assert.assertEquals(columnMetadata.getTextIndexType(), TextIndexType.NONE);
     // SegmentPreprocessor should have created the text index using TextIndexHandler
     checkTextIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, true, false, 26);
   }
@@ -383,43 +376,33 @@ public class SegmentPreProcessorTest {
   private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
       boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
       throws Exception {
-    ColumnMetadata columnMetadata =
-        checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
-            hasDictionary, isSorted, dictionaryElementSize);
-    Assert.assertEquals(columnMetadata.getTextIndexType(), TextIndexType.LUCENE);
+    checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated, hasDictionary,
+        isSorted, dictionaryElementSize);
   }
 
-  private ColumnMetadata checkIndexCreation(ColumnIndexType indexType, String column, int cardinality, int bits,
-      Schema schema, boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
+  private void checkIndexCreation(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema,
+      boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
       throws Exception {
 
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration);
         SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, schema)) {
       processor.process();
       SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
       ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
-
+      Assert.assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, true));
       Assert.assertEquals(columnMetadata.getCardinality(), cardinality);
       Assert.assertEquals(columnMetadata.getTotalDocs(), 100000);
-      Assert.assertEquals(columnMetadata.getDataType(), DataType.STRING);
       Assert.assertEquals(columnMetadata.getBitsPerElement(), bits);
       Assert.assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize);
-      Assert.assertEquals(columnMetadata.getFieldType(), FieldSpec.FieldType.DIMENSION);
       Assert.assertEquals(columnMetadata.isSorted(), isSorted);
-      Assert.assertFalse(columnMetadata.hasNulls());
       Assert.assertEquals(columnMetadata.hasDictionary(), hasDictionary);
-      Assert.assertTrue(columnMetadata.isSingleValue());
       Assert.assertEquals(columnMetadata.getMaxNumberOfMultiValues(), 0);
       Assert.assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
       Assert.assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
-      Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "null");
 
-      try (
-          SegmentDirectory segmentDirectory1 = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-              .load(_indexDir.toURI(), _configuration);
-          SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
+      try (SegmentDirectory segmentDirectory1 = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+          .load(_indexDir.toURI(), _configuration); SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
         Assert.assertTrue(reader.hasIndexFor(column, indexType));
         Assert.assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
         // if the text index is enabled on a new column with dictionary,
@@ -430,7 +413,6 @@ public class SegmentPreProcessorTest {
           Assert.assertFalse(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
         }
       }
-      return columnMetadata;
     }
   }
 
@@ -440,12 +422,13 @@ public class SegmentPreProcessorTest {
     constructV1Segment();
 
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
-    Assert.assertEquals(segmentMetadata.getSegmentVersion(), SegmentVersion.v1);
+    Assert.assertEquals(segmentMetadata.getVersion(), SegmentVersion.v1);
+
+    String col1FileName = COLUMN1_NAME + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
+    String col7FileName = COLUMN7_NAME + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
+    String col13FileName = COLUMN13_NAME + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
+    String badColFileName = NO_SUCH_COLUMN_NAME + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
 
-    String col1FileName = segmentMetadata.getBitmapInvertedIndexFileName(COLUMN1_NAME);
-    String col7FileName = segmentMetadata.getBitmapInvertedIndexFileName(COLUMN7_NAME);
-    String col13FileName = segmentMetadata.getBitmapInvertedIndexFileName(COLUMN13_NAME);
-    String badColFileName = segmentMetadata.getBitmapInvertedIndexFileName(NO_SUCH_COLUMN_NAME);
     File col1File = new File(_indexDir, col1FileName);
     File col7File = new File(_indexDir, col7FileName);
     File col13File = new File(_indexDir, col13FileName);
@@ -491,7 +474,7 @@ public class SegmentPreProcessorTest {
     constructV3Segment();
 
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
-    Assert.assertEquals(segmentMetadata.getSegmentVersion(), SegmentVersion.v3);
+    Assert.assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
 
     File segmentDirectoryPath = SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3);
     File singleFileIndex = new File(segmentDirectoryPath, "columns.psf");
@@ -504,10 +487,8 @@ public class SegmentPreProcessorTest {
     // Create inverted index the first time.
     checkInvertedIndexCreation(false);
     long addedLength = 0L;
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
-        SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration); SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
       // 8 bytes overhead is for checking integrity of the segment.
       addedLength += reader.getIndexFor(COLUMN1_NAME, ColumnIndexType.INVERTED_INDEX).size() + 8;
       addedLength += reader.getIndexFor(COLUMN13_NAME, ColumnIndexType.INVERTED_INDEX).size() + 8;
@@ -528,10 +509,8 @@ public class SegmentPreProcessorTest {
 
   private void checkInvertedIndexCreation(boolean reCreate)
       throws Exception {
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
-        SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration); SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
       if (reCreate) {
         Assert.assertTrue(reader.hasIndexFor(COLUMN1_NAME, ColumnIndexType.INVERTED_INDEX));
         Assert.assertTrue(reader.hasIndexFor(COLUMN13_NAME, ColumnIndexType.INVERTED_INDEX));
@@ -545,17 +524,14 @@ public class SegmentPreProcessorTest {
       }
     }
 
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration);
         SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, null)) {
       processor.process();
     }
 
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
-        SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration); SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
       Assert.assertTrue(reader.hasIndexFor(COLUMN1_NAME, ColumnIndexType.INVERTED_INDEX));
       Assert.assertTrue(reader.hasIndexFor(COLUMN13_NAME, ColumnIndexType.INVERTED_INDEX));
       Assert.assertTrue(reader.hasIndexFor(COLUMN7_NAME, ColumnIndexType.INVERTED_INDEX));
@@ -575,40 +551,36 @@ public class SegmentPreProcessorTest {
     // Try to use the third schema and update default value again.
     // For the third schema, we changed the default value for column 'newStringMVDimension' to 'notSameLength', which
     // is not the same length as before. This should be fine for segment format v1.
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
-        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchema3)) {
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration);
+        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig,
+            _newColumnsSchema3)) {
       processor.process();
     }
 
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
     ColumnMetadata hllMetricMetadata = segmentMetadata.getColumnMetadataFor(NEW_HLL_BYTE_METRIC_COLUMN_NAME);
-    Assert.assertEquals(hllMetricMetadata.getDataType(), DataType.BYTES);
-    String expectedDefaultValueString =
-        "00000008000000ac00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000";
-    ByteArray expectedDefaultValue = BytesUtils.toByteArray(expectedDefaultValueString);
+    FieldSpec expectedHllMetricFieldSpec = _newColumnsSchema3.getFieldSpecFor(NEW_HLL_BYTE_METRIC_COLUMN_NAME);
+    Assert.assertEquals(hllMetricMetadata.getFieldSpec(), expectedHllMetricFieldSpec);
+    ByteArray expectedDefaultValue = new ByteArray((byte[]) expectedHllMetricFieldSpec.getDefaultNullValue());
     Assert.assertEquals(hllMetricMetadata.getMinValue(), expectedDefaultValue);
     Assert.assertEquals(hllMetricMetadata.getMaxValue(), expectedDefaultValue);
-    Assert.assertEquals(hllMetricMetadata.getDefaultNullValueString(), expectedDefaultValueString);
 
     ColumnMetadata tDigestMetricMetadata = segmentMetadata.getColumnMetadataFor(NEW_TDIGEST_BYTE_METRIC_COLUMN_NAME);
-    Assert.assertEquals(tDigestMetricMetadata.getDataType(), DataType.BYTES);
-    expectedDefaultValueString =
-        "0000000141ba085ee15d2f3241ba085ee15d2f324059000000000000000000013ff000000000000041ba085ee15d2f32";
-    expectedDefaultValue = BytesUtils.toByteArray(expectedDefaultValueString);
+    FieldSpec expectedTDigestMetricFieldSpec = _newColumnsSchema3.getFieldSpecFor(NEW_TDIGEST_BYTE_METRIC_COLUMN_NAME);
+    Assert.assertEquals(tDigestMetricMetadata.getFieldSpec(), expectedTDigestMetricFieldSpec);
+    expectedDefaultValue = new ByteArray((byte[]) expectedTDigestMetricFieldSpec.getDefaultNullValue());
     Assert.assertEquals(tDigestMetricMetadata.getMinValue(), expectedDefaultValue);
     Assert.assertEquals(tDigestMetricMetadata.getMaxValue(), expectedDefaultValue);
-    Assert.assertEquals(tDigestMetricMetadata.getDefaultNullValueString(), expectedDefaultValueString);
   }
 
   private void checkUpdateDefaultColumns()
       throws Exception {
     // Update default value.
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
-        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchema1)) {
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration);
+        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig,
+            _newColumnsSchema1)) {
       processor.process();
     }
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
@@ -616,65 +588,57 @@ public class SegmentPreProcessorTest {
     // Check column metadata.
     // Check all field for one column, and do necessary checks for other columns.
     ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_INT_METRIC_COLUMN_NAME);
+    Assert.assertEquals(columnMetadata.getFieldSpec(), _newColumnsSchema1.getFieldSpecFor(NEW_INT_METRIC_COLUMN_NAME));
     Assert.assertEquals(columnMetadata.getCardinality(), 1);
     Assert.assertEquals(columnMetadata.getTotalDocs(), 100000);
-    Assert.assertEquals(columnMetadata.getDataType(), DataType.INT);
     Assert.assertEquals(columnMetadata.getBitsPerElement(), 1);
     Assert.assertEquals(columnMetadata.getColumnMaxLength(), 0);
-    Assert.assertEquals(columnMetadata.getFieldType(), FieldSpec.FieldType.METRIC);
     Assert.assertTrue(columnMetadata.isSorted());
-    Assert.assertFalse(columnMetadata.hasNulls());
     Assert.assertTrue(columnMetadata.hasDictionary());
-    Assert.assertTrue(columnMetadata.hasInvertedIndex());
-    Assert.assertTrue(columnMetadata.isSingleValue());
     Assert.assertEquals(columnMetadata.getMaxNumberOfMultiValues(), 0);
     Assert.assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
     Assert.assertTrue(columnMetadata.isAutoGenerated());
     Assert.assertEquals(columnMetadata.getMinValue(), 1);
     Assert.assertEquals(columnMetadata.getMaxValue(), 1);
-    Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "1");
 
     columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_LONG_METRIC_COLUMN_NAME);
-    Assert.assertEquals(columnMetadata.getDataType(), DataType.LONG);
+    Assert.assertEquals(columnMetadata.getFieldSpec(), _newColumnsSchema1.getFieldSpecFor(NEW_LONG_METRIC_COLUMN_NAME));
     Assert.assertEquals(columnMetadata.getMinValue(), 0L);
     Assert.assertEquals(columnMetadata.getMaxValue(), 0L);
-    Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "0");
 
     columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_FLOAT_METRIC_COLUMN_NAME);
-    Assert.assertEquals(columnMetadata.getDataType(), DataType.FLOAT);
+    Assert
+        .assertEquals(columnMetadata.getFieldSpec(), _newColumnsSchema1.getFieldSpecFor(NEW_FLOAT_METRIC_COLUMN_NAME));
     Assert.assertEquals(columnMetadata.getMinValue(), 0f);
     Assert.assertEquals(columnMetadata.getMaxValue(), 0f);
-    Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "0.0");
 
     columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_DOUBLE_METRIC_COLUMN_NAME);
-    Assert.assertEquals(columnMetadata.getDataType(), DataType.DOUBLE);
+    Assert
+        .assertEquals(columnMetadata.getFieldSpec(), _newColumnsSchema1.getFieldSpecFor(NEW_DOUBLE_METRIC_COLUMN_NAME));
     Assert.assertEquals(columnMetadata.getMinValue(), 0.0);
     Assert.assertEquals(columnMetadata.getMaxValue(), 0.0);
-    Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "0.0");
 
     columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_BOOLEAN_SV_DIMENSION_COLUMN_NAME);
-    Assert.assertEquals(columnMetadata.getDataType(), DataType.BOOLEAN);
+    Assert.assertEquals(columnMetadata.getFieldSpec(),
+        _newColumnsSchema1.getFieldSpecFor(NEW_BOOLEAN_SV_DIMENSION_COLUMN_NAME));
     Assert.assertEquals(columnMetadata.getColumnMaxLength(), 0);
-    Assert.assertEquals(columnMetadata.getFieldType(), FieldSpec.FieldType.DIMENSION);
     Assert.assertEquals(columnMetadata.getMinValue(), 0);
     Assert.assertEquals(columnMetadata.getMaxValue(), 0);
-    Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "0");
 
     columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_STRING_MV_DIMENSION_COLUMN_NAME);
-    Assert.assertEquals(columnMetadata.getDataType(), DataType.STRING);
+    Assert.assertEquals(columnMetadata.getFieldSpec(),
+        _newColumnsSchema1.getFieldSpecFor(NEW_STRING_MV_DIMENSION_COLUMN_NAME));
     Assert.assertEquals(columnMetadata.getColumnMaxLength(), 4);
     Assert.assertFalse(columnMetadata.isSorted());
-    Assert.assertFalse(columnMetadata.isSingleValue());
     Assert.assertEquals(columnMetadata.getMaxNumberOfMultiValues(), 1);
     Assert.assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
     Assert.assertEquals(columnMetadata.getMinValue(), "null");
     Assert.assertEquals(columnMetadata.getMaxValue(), "null");
-    Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "null");
 
     // Derived column
     columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_INT_SV_DIMENSION_COLUMN_NAME);
-    Assert.assertEquals(columnMetadata.getDataType(), DataType.INT);
-    Assert.assertEquals(columnMetadata.getDefaultNullValueString(), Integer.toString(Integer.MIN_VALUE));
+    Assert.assertEquals(columnMetadata.getFieldSpec(),
+        _newColumnsSchema1.getFieldSpecFor(NEW_INT_SV_DIMENSION_COLUMN_NAME));
     Assert.assertTrue(columnMetadata.isAutoGenerated());
     ColumnMetadata originalColumnMetadata = segmentMetadata.getColumnMetadataFor(COLUMN1_NAME);
     Assert.assertEquals(columnMetadata.getCardinality(), originalColumnMetadata.getCardinality());
@@ -684,10 +648,8 @@ public class SegmentPreProcessorTest {
     Assert.assertEquals(columnMetadata.getMaxValue(), (int) originalColumnMetadata.getMaxValue() + 1);
 
     // Check dictionary and forward index exist.
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
-        SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration); SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
       Assert.assertTrue(reader.hasIndexFor(NEW_INT_METRIC_COLUMN_NAME, ColumnIndexType.DICTIONARY));
       Assert.assertTrue(reader.hasIndexFor(NEW_INT_METRIC_COLUMN_NAME, ColumnIndexType.FORWARD_INDEX));
       Assert.assertTrue(reader.hasIndexFor(NEW_LONG_METRIC_COLUMN_NAME, ColumnIndexType.DICTIONARY));
@@ -707,10 +669,10 @@ public class SegmentPreProcessorTest {
     // Use the second schema and update default value again.
     // For the second schema, we changed the default value for column 'newIntMetric' to 2, and added default value
     // 'abcd' (keep the same length as 'null') to column 'newStringMVDimension'.
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
-        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchema2)) {
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration);
+        SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig,
+            _newColumnsSchema2)) {
       processor.process();
     }
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
@@ -719,12 +681,12 @@ public class SegmentPreProcessorTest {
     columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_INT_METRIC_COLUMN_NAME);
     Assert.assertEquals(columnMetadata.getMinValue(), 2);
     Assert.assertEquals(columnMetadata.getMaxValue(), 2);
-    Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "2");
+    Assert.assertEquals(columnMetadata.getFieldSpec().getDefaultNullValue(), 2);
 
     columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_STRING_MV_DIMENSION_COLUMN_NAME);
     Assert.assertEquals(columnMetadata.getMinValue(), "abcd");
     Assert.assertEquals(columnMetadata.getMaxValue(), "abcd");
-    Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "abcd");
+    Assert.assertEquals(columnMetadata.getFieldSpec().getDefaultNullValue(), "abcd");
   }
 
   @Test
@@ -746,9 +708,8 @@ public class SegmentPreProcessorTest {
 
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
     indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NONE);
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration);
         SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) {
       processor.process();
     }
@@ -764,9 +725,8 @@ public class SegmentPreProcessorTest {
     Assert.assertNull(metricColumnMetadata.getMaxValue());
 
     indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.TIME);
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration);
         SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) {
       processor.process();
     }
@@ -782,9 +742,8 @@ public class SegmentPreProcessorTest {
     Assert.assertNull(metricColumnMetadata.getMaxValue());
 
     indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NON_METRIC);
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration);
         SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) {
       processor.process();
     }
@@ -800,9 +759,8 @@ public class SegmentPreProcessorTest {
     Assert.assertNull(metricColumnMetadata.getMaxValue());
 
     indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL);
-    try (
-        SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
-            .load(_indexDir.toURI(), _configuration);
+    try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader()
+        .load(_indexDir.toURI(), _configuration);
         SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) {
       processor.process();
     }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/ColumnIndexDirectoryTestHelper.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/ColumnIndexDirectoryTestHelper.java
index 02fc463..0edb8ea 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/ColumnIndexDirectoryTestHelper.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/ColumnIndexDirectoryTestHelper.java
@@ -19,17 +19,18 @@
 package org.apache.pinot.segment.local.segment.store;
 
 import java.io.IOException;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
-import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
 
 public class ColumnIndexDirectoryTestHelper {
   static ColumnIndexType[] indexTypes =
@@ -80,45 +81,12 @@ public class ColumnIndexDirectoryTestHelper {
   }
 
   static SegmentMetadataImpl writeMetadata(SegmentVersion version) {
-    SegmentMetadataImpl meta = Mockito.mock(SegmentMetadataImpl.class);
-    Mockito.when(meta.getVersion()).thenReturn(version.toString());
-    Mockito.when(meta.getSegmentVersion()).thenReturn(version);
-    Mockito.when(meta.getDictionaryFileName(ArgumentMatchers.anyString())).thenAnswer(new Answer<String>() {
-      @Override
-      public String answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        return invocationOnMock.getArguments()[0] + ".dict";
-      }
-    });
-    Mockito.when(meta.getForwardIndexFileName(ArgumentMatchers.anyString())).thenAnswer(new Answer<String>() {
-      @Override
-      public String answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        return invocationOnMock.getArguments()[0] + ".fwd";
-      }
-    });
-
-    Mockito.when(meta.getBitmapInvertedIndexFileName(ArgumentMatchers.anyString())).thenAnswer(new Answer<String>() {
-      @Override
-      public String answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        return invocationOnMock.getArguments()[0] + ".ii";
-      }
-    });
-    Mockito.when(meta.getBloomFilterFileName(ArgumentMatchers.anyString())).thenAnswer(new Answer<String>() {
-      @Override
-      public String answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        return invocationOnMock.getArguments()[0] + ".bloom";
-      }
-    });
-    Mockito.when(meta.getNullValueVectorFileName(ArgumentMatchers.anyString())).thenAnswer(new Answer<String>() {
-      @Override
-      public String answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        return invocationOnMock.getArguments()[0] + ".nullvalue";
-      }
-    });
-    return meta;
+    SegmentMetadataImpl segmentMetadata = Mockito.mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getVersion()).thenReturn(version);
+    ColumnMetadata columnMetadata = Mockito.mock(ColumnMetadata.class);
+    when(columnMetadata.isSingleValue()).thenReturn(true);
+    when(columnMetadata.isSorted()).thenReturn(false);
+    when(segmentMetadata.getColumnMetadataFor(anyString())).thenReturn(columnMetadata);
+    return segmentMetadata;
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
index 1c3f806..4e826ab 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
@@ -67,7 +67,7 @@ public class SingleFileIndexDirectoryTest {
 
   void writeMetadata() {
     SegmentMetadataImpl meta = Mockito.mock(SegmentMetadataImpl.class);
-    Mockito.when(meta.getVersion()).thenReturn(SegmentVersion.v3.toString());
+    Mockito.when(meta.getVersion()).thenReturn(SegmentVersion.v3);
     segmentMetadata = meta;
   }
 
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java
index fc8e46f..115eb71 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java
@@ -22,8 +22,8 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.Constants;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/ColumnMetadata.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/ColumnMetadata.java
new file mode 100644
index 0000000..62796ee
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/ColumnMetadata.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.FieldSpec.FieldType;
+
+
+/**
+ * The <code>ColumnMetadata</code> class holds the column level management information and data statistics.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("rawtypes")
+public interface ColumnMetadata {
+
+  FieldSpec getFieldSpec();
+
+  default String getColumnName() {
+    return getFieldSpec().getName();
+  }
+
+  default FieldType getFieldType() {
+    return getFieldSpec().getFieldType();
+  }
+
+  default DataType getDataType() {
+    return getFieldSpec().getDataType();
+  }
+
+  default boolean isSingleValue() {
+    return getFieldSpec().isSingleValueField();
+  }
+
+  int getTotalDocs();
+
+  /**
+   * NOTE: When a realtime segment has no-dictionary columns, the cardinality for those columns will be set to
+   * {@link Constants#UNKNOWN_CARDINALITY}.
+   */
+  int getCardinality();
+
+  boolean isSorted();
+
+  Comparable getMinValue();
+
+  Comparable getMaxValue();
+
+  @JsonProperty
+  boolean hasDictionary();
+
+  int getColumnMaxLength();
+
+  char getPaddingCharacter();
+
+  int getBitsPerElement();
+
+  int getMaxNumberOfMultiValues();
+
+  int getTotalNumberOfEntries();
+
+  @Nullable
+  PartitionFunction getPartitionFunction();
+
+  @Nullable
+  Set<Integer> getPartitions();
+
+  boolean isAutoGenerated();
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
index 738b70b..3e977cb 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
@@ -18,9 +18,15 @@
  */
 package org.apache.pinot.segment.spi;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.data.Schema;
 import org.joda.time.Duration;
@@ -39,6 +45,8 @@ public interface SegmentMetadata {
   @Deprecated
   String getTableName();
 
+  String getName();
+
   String getTimeColumn();
 
   long getStartTime();
@@ -53,31 +61,20 @@ public interface SegmentMetadata {
 
   String getCrc();
 
-  String getVersion();
+  SegmentVersion getVersion();
 
   Schema getSchema();
 
-  String getShardingKey();
-
   int getTotalDocs();
 
   File getIndexDir();
 
-  String getName();
+  @Nullable
+  String getCreatorName();
 
   long getIndexCreationTime();
 
   /**
-   * Get the last time that this segment was pushed or <code>Long.MIN_VALUE</code> if it has never been pushed.
-   */
-  long getPushTime();
-
-  /**
-   * Get the last time that this segment was refreshed or <code>Long.MIN_VALUE</code> if it has never been refreshed.
-   */
-  long getRefreshTime();
-
-  /**
    * Return the last time a record was indexed in this segment. Applicable for MutableSegments.
    *
    * @return time when the last record was indexed
@@ -93,25 +90,29 @@ public interface SegmentMetadata {
    */
   long getLatestIngestionTimestamp();
 
-  boolean hasDictionary(String columnName);
+  List<StarTreeV2Metadata> getStarTreeV2MetadataList();
 
-  String getForwardIndexFileName(String column);
-
-  String getDictionaryFileName(String column);
-
-  String getBitmapInvertedIndexFileName(String column);
-
-  String getBitmapRangeIndexFileName(String column);
-
-  String getBloomFilterFileName(String column);
+  Map<String, String> getCustomMap();
 
-  String getNullValueVectorFileName(String column);
+  default Set<String> getAllColumns() {
+    return getSchema().getColumnNames();
+  }
 
-  String getCreatorName();
+  Map<String, ColumnMetadata> getColumnMetadataMap();
 
-  char getPaddingCharacter();
+  default ColumnMetadata getColumnMetadataFor(String column) {
+    return getColumnMetadataMap().get(column);
+  }
 
-  boolean close();
+  /**
+   * Removes a column from the segment metadata.
+   */
+  void removeColumn(String column);
 
-  Map<String, String> getCustomMap();
+  /**
+   * Converts segment metadata to json.
+   * @param columnFilter list only the columns in the set. Lists all the columns if the parameter value is null.
+   * @return json representation of segment metadata.
+   */
+  JsonNode toJson(@Nullable Set<String> columnFilter);
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 0fe691b..b4596ba 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -81,11 +81,7 @@ public class V1Constants {
       public static final String DICTIONARY_ELEMENT_SIZE = "lengthOfEachEntry";
       public static final String COLUMN_TYPE = "columnType";
       public static final String IS_SORTED = "isSorted";
-      public static final String HAS_NULL_VALUE = "hasNullValue";
       public static final String HAS_DICTIONARY = "hasDictionary";
-      public static final String HAS_INVERTED_INDEX = "hasInvertedIndex";
-      public static final String HAS_FST_INDEX = "hasFSTIndex";
-      public static final String HAS_JSON_INDEX = "hasJsonIndex";
       public static final String IS_SINGLE_VALUED = "isSingleValues";
       public static final String MAX_MULTI_VALUE_ELEMENTS = "maxNumberOfMultiValues";
       public static final String TOTAL_NUMBER_OF_ENTRIES = "totalNumberOfEntries";
@@ -98,7 +94,21 @@ public class V1Constants {
       public static final String PARTITION_VALUES = "partitionValues";
       public static final String DATETIME_FORMAT = "datetimeFormat";
       public static final String DATETIME_GRANULARITY = "datetimeGranularity";
+
+      // TODO: Remove these 2 fields after releasing 0.8.0 because they are always set to true and never used
+      @Deprecated
+      public static final String HAS_NULL_VALUE = "hasNullValue";
+      @Deprecated
+      public static final String HAS_INVERTED_INDEX = "hasInvertedIndex";
+
+      // TODO: Remove these 3 fields after releasing 0.8.0 because the index info is maintained within the DataSource
+      //       based on the actual indexes loaded
+      @Deprecated
+      public static final String HAS_FST_INDEX = "hasFSTIndex";
+      @Deprecated
       public static final String TEXT_INDEX_TYPE = "textIndexType";
+      @Deprecated
+      public static final String HAS_JSON_INDEX = "hasJsonIndex";
 
       public static final String COLUMN_PROPS_KEY_PREFIX = "column.";
 
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadata.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadata.java
deleted file mode 100644
index 0b1a61e0..0000000
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadata.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.spi.index.metadata;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.lang.reflect.Field;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.index.creator.TextIndexType;
-import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
-import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.FieldSpec.FieldType;
-import org.apache.pinot.spi.data.MetricFieldSpec;
-import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.BytesUtils;
-import org.apache.pinot.spi.utils.EqualityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.pinot.spi.data.FieldSpec.DataType.valueOf;
-
-
-public class ColumnMetadata {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ColumnMetadata.class);
-
-  private final FieldSpec fieldSpec;
-  private final String columnName;
-  private final int cardinality;
-  private final int totalDocs;
-  private final DataType dataType;
-  private final int bitsPerElement;
-  private final int columnMaxLength;
-  private final FieldType fieldType;
-  private final boolean isSorted;
-  @JsonProperty
-  private final boolean containsNulls;
-  @JsonProperty
-  private final boolean hasDictionary;
-  @JsonProperty
-  private final boolean hasInvertedIndex;
-  private final boolean hasFSTIndex;
-  private final boolean isSingleValue;
-  private final boolean isVirtual;
-  private final int maxNumberOfMultiValues;
-  private final int totalNumberOfEntries;
-  private final boolean isAutoGenerated;
-  private final String defaultNullValueString;
-  private final TimeUnit timeUnit;
-  private final char paddingCharacter;
-  private final Comparable minValue;
-  private final Comparable maxValue;
-  private final PartitionFunction partitionFunction;
-  private final int numPartitions;
-  private final Set<Integer> _partitions;
-  private final String dateTimeFormat;
-  private final String dateTimeGranularity;
-  private final TextIndexType textIndexType;
-
-  public static ColumnMetadata fromPropertiesConfiguration(String column, PropertiesConfiguration config) {
-    Builder builder = new Builder();
-
-    builder.setColumnName(column);
-    builder.setCardinality(
-        config.getInt(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.CARDINALITY)));
-    int totalDocs =
-        config.getInt(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.TOTAL_DOCS));
-    builder.setTotalDocs(totalDocs);
-    DataType dataType = valueOf(
-        config.getString(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.DATA_TYPE))
-            .toUpperCase());
-    builder.setDataType(dataType);
-    builder.setBitsPerElement(config
-        .getInt(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.BITS_PER_ELEMENT)));
-    builder.setColumnMaxLength(config.getInt(
-        V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE)));
-    builder.setFieldType(FieldType.valueOf(
-        config.getString(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.COLUMN_TYPE))
-            .toUpperCase()));
-    builder.setIsSorted(config
-        .getBoolean(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.IS_SORTED)));
-    builder.setContainsNulls(config
-        .getBoolean(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.HAS_NULL_VALUE)));
-    builder.setHasDictionary(config
-        .getBoolean(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.HAS_DICTIONARY),
-            true));
-    builder.setHasInvertedIndex(config.getBoolean(
-        V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.HAS_INVERTED_INDEX)));
-    builder.setHasFSTIndex(config
-        .getBoolean(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.HAS_FST_INDEX),
-            false));
-    builder.setSingleValue(config.getBoolean(
-        V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.IS_SINGLE_VALUED)));
-    builder.setMaxNumberOfMultiValues(config.getInt(
-        V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.MAX_MULTI_VALUE_ELEMENTS)));
-    builder.setTotalNumberOfEntries(config.getInt(
-        V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.TOTAL_NUMBER_OF_ENTRIES)));
-    builder.setAutoGenerated(config.getBoolean(
-        V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.IS_AUTO_GENERATED), false));
-    builder.setDefaultNullValueString(config.getString(
-        V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.DEFAULT_NULL_VALUE), null));
-    builder.setTimeUnit(
-        TimeUnit.valueOf(config.getString(V1Constants.MetadataKeys.Segment.TIME_UNIT, "DAYS").toUpperCase()));
-    builder.setTextIndexType(config
-        .getString(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.TEXT_INDEX_TYPE),
-            TextIndexType.NONE.name()));
-
-    char paddingCharacter = V1Constants.Str.LEGACY_STRING_PAD_CHAR;
-    if (config.containsKey(V1Constants.MetadataKeys.Segment.SEGMENT_PADDING_CHARACTER)) {
-      String padding = config.getString(V1Constants.MetadataKeys.Segment.SEGMENT_PADDING_CHARACTER);
-      paddingCharacter = StringEscapeUtils.unescapeJava(padding).charAt(0);
-    }
-    builder.setPaddingCharacter(paddingCharacter);
-
-    String dateTimeFormat = config
-        .getString(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.DATETIME_FORMAT),
-            null);
-    if (dateTimeFormat != null) {
-      builder.setDateTimeFormat(dateTimeFormat);
-    }
-
-    String dateTimeGranularity = config.getString(
-        V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.DATETIME_GRANULARITY), null);
-    if (dateTimeGranularity != null) {
-      builder.setDateTimeGranularity(dateTimeGranularity);
-    }
-
-    // Set min/max value if available
-    // NOTE: Use getProperty() instead of getString() to avoid variable substitution ('${anotherKey}'), which can cause
-    //       problem for special values such as '$${' where the first '$' is identified as escape character.
-    // TODO: Use getProperty() for other properties as well to avoid the overhead of variable substitution
-    String minString = (String) config
-        .getProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.MIN_VALUE));
-    String maxString = (String) config
-        .getProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.MAX_VALUE));
-    if (minString != null && maxString != null) {
-      switch (dataType.getStoredType()) {
-        case INT:
-          builder.setMinValue(Integer.valueOf(minString));
-          builder.setMaxValue(Integer.valueOf(maxString));
-          break;
-        case LONG:
-          builder.setMinValue(Long.valueOf(minString));
-          builder.setMaxValue(Long.valueOf(maxString));
-          break;
-        case FLOAT:
-          builder.setMinValue(Float.valueOf(minString));
-          builder.setMaxValue(Float.valueOf(maxString));
-          break;
-        case DOUBLE:
-          builder.setMinValue(Double.valueOf(minString));
-          builder.setMaxValue(Double.valueOf(maxString));
-          break;
-        case STRING:
-          builder.setMinValue(minString);
-          builder.setMaxValue(maxString);
-          break;
-        case BYTES:
-          builder.setMinValue(BytesUtils.toByteArray(minString));
-          builder.setMaxValue(BytesUtils.toByteArray(maxString));
-          break;
-        default:
-          throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + column);
-      }
-    }
-
-    String partitionFunctionName = config.getString(
-        V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.PARTITION_FUNCTION));
-    if (partitionFunctionName != null) {
-      int numPartitions = config
-          .getInt(V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.NUM_PARTITIONS));
-      PartitionFunction partitionFunction =
-          PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
-      builder.setPartitionFunction(partitionFunction);
-      builder.setNumPartitions(numPartitions);
-      builder.setPartitions(ColumnPartitionMetadata.extractPartitions(config.getList(
-          V1Constants.MetadataKeys.Column.getKeyFor(column, V1Constants.MetadataKeys.Column.PARTITION_VALUES))));
-    }
-
-    return builder.build();
-  }
-
-  public PartitionFunction getPartitionFunction() {
-    return partitionFunction;
-  }
-
-  public int getNumPartitions() {
-    return numPartitions;
-  }
-
-  public Set<Integer> getPartitions() {
-    return _partitions;
-  }
-
-  public static class Builder {
-    private String columnName;
-    private int cardinality;
-    private int totalDocs;
-    private DataType dataType;
-    private int bitsPerElement;
-    private int columnMaxLength;
-    private FieldType fieldType;
-    private boolean isSorted;
-    private boolean containsNulls;
-    private boolean hasDictionary;
-    private boolean hasInvertedIndex;
-    private boolean hasFSTIndex;
-    private boolean isSingleValue;
-    private boolean isVirtual;
-    private int maxNumberOfMultiValues;
-    private int totalNumberOfEntries;
-    private boolean isAutoGenerated;
-    private String defaultNullValueString;
-    private TimeUnit timeUnit;
-    private char paddingCharacter;
-    private Comparable minValue;
-    private Comparable maxValue;
-    private PartitionFunction partitionFunction;
-    private int numPartitions;
-    private Set<Integer> _partitions;
-    private String dateTimeFormat;
-    private String dateTimeGranularity;
-    private String textIndexType = TextIndexType.NONE.name();
-
-    public Builder setColumnName(String columnName) {
-      this.columnName = columnName;
-      return this;
-    }
-
-    public Builder setCardinality(int cardinality) {
-      this.cardinality = cardinality;
-      return this;
-    }
-
-    public Builder setTotalDocs(int totalDocs) {
-      this.totalDocs = totalDocs;
-      return this;
-    }
-
-    public Builder setDataType(DataType dataType) {
-      this.dataType = dataType;
-      return this;
-    }
-
-    public Builder setBitsPerElement(int bitsPerElement) {
-      this.bitsPerElement = bitsPerElement;
-      return this;
-    }
-
-    public Builder setColumnMaxLength(int columnMaxLength) {
-      this.columnMaxLength = columnMaxLength;
-      return this;
-    }
-
-    public Builder setFieldType(FieldType fieldType) {
-      this.fieldType = fieldType;
-      return this;
-    }
-
-    public Builder setIsSorted(boolean isSorted) {
-      this.isSorted = isSorted;
-      return this;
-    }
-
-    public Builder setContainsNulls(boolean containsNulls) {
-      this.containsNulls = containsNulls;
-      return this;
-    }
-
-    public Builder setHasDictionary(boolean hasDictionary) {
-      this.hasDictionary = hasDictionary;
-      return this;
-    }
-
-    public Builder setHasFSTIndex(boolean hasFSTIndex) {
-      this.hasFSTIndex = hasFSTIndex;
-      return this;
-    }
-
-    public Builder setHasInvertedIndex(boolean hasInvertedIndex) {
-      this.hasInvertedIndex = hasInvertedIndex;
-      return this;
-    }
-
-    public Builder setSingleValue(boolean singleValue) {
-      this.isSingleValue = singleValue;
-      return this;
-    }
-
-    public Builder setMaxNumberOfMultiValues(int maxNumberOfMultiValues) {
-      this.maxNumberOfMultiValues = maxNumberOfMultiValues;
-      return this;
-    }
-
-    public Builder setTotalNumberOfEntries(int totalNumberOfEntries) {
-      this.totalNumberOfEntries = totalNumberOfEntries;
-      return this;
-    }
-
-    public Builder setAutoGenerated(boolean isAutoGenerated) {
-      this.isAutoGenerated = isAutoGenerated;
-      return this;
-    }
-
-    public Builder setVirtual(boolean isVirtual) {
-      this.isVirtual = isVirtual;
-      return this;
-    }
-
-    public Builder setDefaultNullValueString(String defaultNullValueString) {
-      this.defaultNullValueString = defaultNullValueString;
-      return this;
-    }
-
-    public Builder setTimeUnit(TimeUnit timeUnit) {
-      this.timeUnit = timeUnit;
-      return this;
-    }
-
-    public Builder setPaddingCharacter(char paddingCharacter) {
-      this.paddingCharacter = paddingCharacter;
-      return this;
-    }
-
-    public Builder setMinValue(Comparable minValue) {
-      this.minValue = minValue;
-      return this;
-    }
-
-    public Builder setMaxValue(Comparable maxValue) {
-      this.maxValue = maxValue;
-      return this;
-    }
-
-    public Builder setPartitionFunction(PartitionFunction partitionFunction) {
-      this.partitionFunction = partitionFunction;
-      return this;
-    }
-
-    public void setNumPartitions(int numPartitions) {
-      this.numPartitions = numPartitions;
-    }
-
-    public Builder setPartitions(Set<Integer> partitions) {
-      _partitions = partitions;
-      return this;
-    }
-
-    public Builder setDateTimeFormat(String dateTimeFormat) {
-      this.dateTimeFormat = dateTimeFormat;
-      return this;
-    }
-
-    public Builder setDateTimeGranularity(String dateTimeGranularity) {
-      this.dateTimeGranularity = dateTimeGranularity;
-      return this;
-    }
-
-    public Builder setTextIndexType(String textIndexType) {
-      this.textIndexType = textIndexType;
-      return this;
-    }
-
-    public ColumnMetadata build() {
-      return new ColumnMetadata(columnName, cardinality, totalDocs, dataType, bitsPerElement, columnMaxLength,
-          fieldType, isSorted, containsNulls, hasDictionary, hasInvertedIndex, isSingleValue, maxNumberOfMultiValues,
-          totalNumberOfEntries, isAutoGenerated, isVirtual, defaultNullValueString, timeUnit, paddingCharacter,
-          minValue, maxValue, partitionFunction, numPartitions, _partitions, dateTimeFormat, dateTimeGranularity,
-          hasFSTIndex, TextIndexType.valueOf(textIndexType));
-    }
-  }
-
-  private ColumnMetadata(String columnName, int cardinality, int totalDocs, DataType dataType, int bitsPerElement,
-      int columnMaxLength, FieldType fieldType, boolean isSorted, boolean hasNulls, boolean hasDictionary,
-      boolean hasInvertedIndex, boolean isSingleValue, int maxNumberOfMultiValues, int totalNumberOfEntries,
-      boolean isAutoGenerated, boolean isVirtual, String defaultNullValueString, TimeUnit timeUnit,
-      char paddingCharacter, Comparable minValue, Comparable maxValue, PartitionFunction partitionFunction,
-      int numPartitions, Set<Integer> partitions, String dateTimeFormat, String dateTimeGranularity,
-      boolean hasFSTIndex, TextIndexType textIndexType) {
-    this.columnName = columnName;
-    this.cardinality = cardinality;
-    this.totalDocs = totalDocs;
-    this.dataType = dataType;
-    this.bitsPerElement = bitsPerElement;
-    this.columnMaxLength = columnMaxLength;
-    this.fieldType = fieldType;
-    this.isSorted = isSorted;
-    this.containsNulls = hasNulls;
-    this.hasDictionary = hasDictionary;
-    this.hasInvertedIndex = hasInvertedIndex;
-    this.hasFSTIndex = hasFSTIndex;
-    this.isSingleValue = isSingleValue;
-    this.maxNumberOfMultiValues = maxNumberOfMultiValues;
-    this.totalNumberOfEntries = totalNumberOfEntries;
-    this.isAutoGenerated = isAutoGenerated;
-    this.isVirtual = isVirtual;
-    this.defaultNullValueString = defaultNullValueString;
-    this.timeUnit = timeUnit;
-    this.paddingCharacter = paddingCharacter;
-    this.minValue = minValue;
-    this.maxValue = maxValue;
-    this.partitionFunction = partitionFunction;
-    this.numPartitions = numPartitions;
-    _partitions = partitions;
-    this.dateTimeFormat = dateTimeFormat;
-    this.dateTimeGranularity = dateTimeGranularity;
-    this.textIndexType = textIndexType;
-
-    switch (fieldType) {
-      case DIMENSION:
-        this.fieldSpec = new DimensionFieldSpec(columnName, dataType, isSingleValue);
-        break;
-      case METRIC:
-        this.fieldSpec = new MetricFieldSpec(columnName, dataType);
-        break;
-      case TIME:
-        this.fieldSpec = new TimeFieldSpec(new TimeGranularitySpec(dataType, timeUnit, columnName));
-        break;
-      case DATE_TIME:
-        this.fieldSpec = new DateTimeFieldSpec(columnName, dataType, dateTimeFormat, dateTimeGranularity);
-        break;
-      default:
-        throw new RuntimeException("Unsupported field type: " + fieldType);
-    }
-  }
-
-  public String getColumnName() {
-    return columnName;
-  }
-
-  /**
-   * When a realtime segment has no-dictionary columns, the cardinality for those columns will be
-   * set to Constants.UNKNOWN_CARDINALITY
-   *
-   * @return The cardinality of the column.
-   */
-  public int getCardinality() {
-    return cardinality;
-  }
-
-  public int getTotalDocs() {
-    return totalDocs;
-  }
-
-  public DataType getDataType() {
-    return dataType;
-  }
-
-  public int getBitsPerElement() {
-    return bitsPerElement;
-  }
-
-  public int getColumnMaxLength() {
-    return columnMaxLength;
-  }
-
-  public FieldType getFieldType() {
-    return fieldType;
-  }
-
-  public boolean isSorted() {
-    return isSorted;
-  }
-
-  public boolean hasNulls() {
-    return containsNulls;
-  }
-
-  public boolean hasDictionary() {
-    return hasDictionary;
-  }
-
-  public boolean hasInvertedIndex() {
-    return hasInvertedIndex;
-  }
-
-  public boolean hasFSTIndex() {
-    return hasFSTIndex;
-  }
-
-  public boolean isSingleValue() {
-    return isSingleValue;
-  }
-
-  public int getMaxNumberOfMultiValues() {
-    return maxNumberOfMultiValues;
-  }
-
-  public int getTotalNumberOfEntries() {
-    return totalNumberOfEntries;
-  }
-
-  public boolean isAutoGenerated() {
-    return isAutoGenerated;
-  }
-
-  public boolean isVirtual() {
-    return isVirtual;
-  }
-
-  public String getDefaultNullValueString() {
-    return defaultNullValueString;
-  }
-
-  public TimeUnit getTimeUnit() {
-    return timeUnit;
-  }
-
-  public char getPaddingCharacter() {
-    return paddingCharacter;
-  }
-
-  public FieldSpec getFieldSpec() {
-    return fieldSpec;
-  }
-
-  public Comparable getMinValue() {
-    return minValue;
-  }
-
-  public Comparable getMaxValue() {
-    return maxValue;
-  }
-
-  public String getDateTimeFormat() {
-    return dateTimeFormat;
-  }
-
-  public String getDateTimeGranularity() {
-    return dateTimeGranularity;
-  }
-
-  public TextIndexType getTextIndexType() {
-    return textIndexType;
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder result = new StringBuilder();
-    final String newLine = System.getProperty("line.separator");
-
-    result.append(this.getClass().getName());
-    result.append(" Object {");
-    result.append(newLine);
-
-    //determine fields declared in this class only (no fields of superclass)
-    final Field[] fields = this.getClass().getDeclaredFields();
-
-    //print field names paired with their values
-    for (final Field field : fields) {
-      result.append("  ");
-      try {
-        result.append(field.getName());
-        result.append(": ");
-        //requires access to private field:
-        result.append(field.get(this));
-      } catch (final IllegalAccessException ex) {
-        if (LOGGER.isErrorEnabled()) {
-          LOGGER.error("Unable to access field " + field, ex);
-        }
-        result.append("[ERROR]");
-      }
-      result.append(newLine);
-    }
-    result.append("}");
-
-    return result.toString();
-  }
-
-  @Override
-  public boolean equals(Object object) {
-    if (this == object) {
-      return true;
-    }
-    if (object instanceof ColumnMetadata) {
-      ColumnMetadata columnMetadata = (ColumnMetadata) object;
-      return StringUtils.equals(getColumnName(), columnMetadata.getColumnName()) && getCardinality() == columnMetadata
-          .getCardinality() && getTotalDocs() == columnMetadata.getTotalDocs() && getDataType()
-          .equals(columnMetadata.getDataType()) && getBitsPerElement() == columnMetadata.getBitsPerElement()
-          && getFieldSpec().equals(columnMetadata.getFieldSpec()) && isSorted() == columnMetadata.isSorted()
-          && hasNulls() == columnMetadata.hasNulls() && hasDictionary() == columnMetadata.hasDictionary()
-          && hasInvertedIndex() == columnMetadata.hasInvertedIndex() && isSingleValue() == columnMetadata
-          .isSingleValue() && isVirtual() == columnMetadata.isVirtual() && getMaxNumberOfMultiValues() == columnMetadata
-          .getMaxNumberOfMultiValues() && getTotalNumberOfEntries() == columnMetadata.getTotalNumberOfEntries()
-          && isAutoGenerated() == columnMetadata.isAutoGenerated() && StringUtils
-          .equals(getDefaultNullValueString(), columnMetadata.getDefaultNullValueString())
-          && getTimeUnit() == (columnMetadata.getTimeUnit()) && getPaddingCharacter() == columnMetadata
-          .getPaddingCharacter() && EqualityUtils.isEqual(getMinValue(), columnMetadata.getMinValue()) && EqualityUtils
-          .isEqual(getMaxValue(), columnMetadata.getMaxValue()) && getPartitionFunction() == (columnMetadata
-          .getPartitionFunction()) && getNumPartitions() == columnMetadata.getNumPartitions() && EqualityUtils
-          .isEqualSet(getPartitions(), columnMetadata.getPartitions()) && StringUtils
-          .equals(getDateTimeFormat(), columnMetadata.getDateTimeFormat()) && StringUtils
-          .equals(getDateTimeGranularity(), columnMetadata.getDateTimeGranularity()) && hasFSTIndex() == columnMetadata
-          .hasFSTIndex() && getTextIndexType().equals(columnMetadata.getTextIndexType());
-    }
-    return false;
-  }
-}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
new file mode 100644
index 0000000..19dbb2a
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.metadata;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column;
+import org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.TimeFieldSpec;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+
+public class ColumnMetadataImpl implements ColumnMetadata {
+  private final FieldSpec _fieldSpec;
+  private final int _totalDocs;
+  private final int _cardinality;
+  private final boolean _sorted;
+  private final Comparable<?> _minValue;
+  private final Comparable<?> _maxValue;
+  private final boolean _hasDictionary;
+  private final int _columnMaxLength;
+  private final char _paddingCharacter;
+  private final int _bitsPerElement;
+  private final int _maxNumberOfMultiValues;
+  private final int _totalNumberOfEntries;
+  private final PartitionFunction _partitionFunction;
+  private final Set<Integer> _partitions;
+  private final boolean _autoGenerated;
+
+  private ColumnMetadataImpl(FieldSpec fieldSpec, int totalDocs, int cardinality, boolean sorted,
+      Comparable<?> minValue, Comparable<?> maxValue, boolean hasDictionary, int columnMaxLength, char paddingCharacter,
+      int bitsPerElement, int maxNumberOfMultiValues, int totalNumberOfEntries,
+      @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions, boolean autoGenerated) {
+    _fieldSpec = fieldSpec;
+    _totalDocs = totalDocs;
+    _cardinality = cardinality;
+    _sorted = sorted;
+    _minValue = minValue;
+    _maxValue = maxValue;
+    _hasDictionary = hasDictionary;
+    _columnMaxLength = columnMaxLength;
+    _paddingCharacter = paddingCharacter;
+    _bitsPerElement = bitsPerElement;
+    _maxNumberOfMultiValues = maxNumberOfMultiValues;
+    _totalNumberOfEntries = totalNumberOfEntries;
+    _partitionFunction = partitionFunction;
+    _partitions = partitions;
+    _autoGenerated = autoGenerated;
+  }
+
+  @Override
+  public FieldSpec getFieldSpec() {
+    return _fieldSpec;
+  }
+
+  @Override
+  public int getTotalDocs() {
+    return _totalDocs;
+  }
+
+  @Override
+  public int getCardinality() {
+    return _cardinality;
+  }
+
+  @Override
+  public boolean isSorted() {
+    return _sorted;
+  }
+
+  @Override
+  public Comparable<?> getMinValue() {
+    return _minValue;
+  }
+
+  @Override
+  public Comparable<?> getMaxValue() {
+    return _maxValue;
+  }
+
+  @Override
+  public boolean hasDictionary() {
+    return _hasDictionary;
+  }
+
+  @Override
+  public int getColumnMaxLength() {
+    return _columnMaxLength;
+  }
+
+  @Override
+  public char getPaddingCharacter() {
+    return _paddingCharacter;
+  }
+
+  @Override
+  public int getBitsPerElement() {
+    return _bitsPerElement;
+  }
+
+  @Override
+  public int getMaxNumberOfMultiValues() {
+    return _maxNumberOfMultiValues;
+  }
+
+  @Override
+  public int getTotalNumberOfEntries() {
+    return _totalNumberOfEntries;
+  }
+
+  @Nullable
+  @Override
+  public PartitionFunction getPartitionFunction() {
+    return _partitionFunction;
+  }
+
+  @Nullable
+  @Override
+  public Set<Integer> getPartitions() {
+    return _partitions;
+  }
+
+  @Override
+  public boolean isAutoGenerated() {
+    return _autoGenerated;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ColumnMetadataImpl that = (ColumnMetadataImpl) o;
+    return _totalDocs == that._totalDocs && _cardinality == that._cardinality && _sorted == that._sorted
+        && _hasDictionary == that._hasDictionary && _columnMaxLength == that._columnMaxLength
+        && _paddingCharacter == that._paddingCharacter && _bitsPerElement == that._bitsPerElement
+        && _maxNumberOfMultiValues == that._maxNumberOfMultiValues
+        && _totalNumberOfEntries == that._totalNumberOfEntries && _autoGenerated == that._autoGenerated && Objects
+        .equals(_fieldSpec, that._fieldSpec) && Objects.equals(_minValue, that._minValue) && Objects
+        .equals(_maxValue, that._maxValue) && Objects.equals(_partitionFunction, that._partitionFunction) && Objects
+        .equals(_partitions, that._partitions);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects
+        .hash(_fieldSpec, _totalDocs, _cardinality, _sorted, _minValue, _maxValue, _hasDictionary, _columnMaxLength,
+            _paddingCharacter, _bitsPerElement, _maxNumberOfMultiValues, _totalNumberOfEntries, _partitionFunction,
+            _partitions, _autoGenerated);
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnMetadataImpl{" + "_fieldSpec=" + _fieldSpec + ", _totalDocs=" + _totalDocs + ", _cardinality="
+        + _cardinality + ", _sorted=" + _sorted + ", _minValue=" + _minValue + ", _maxValue=" + _maxValue
+        + ", _hasDictionary=" + _hasDictionary + ", _columnMaxLength=" + _columnMaxLength + ", _paddingCharacter="
+        + _paddingCharacter + ", _bitsPerElement=" + _bitsPerElement + ", _maxNumberOfMultiValues="
+        + _maxNumberOfMultiValues + ", _totalNumberOfEntries=" + _totalNumberOfEntries + ", _partitionFunction="
+        + _partitionFunction + ", _partitions=" + _partitions + ", _autoGenerated=" + _autoGenerated + '}';
+  }
+
+  public static ColumnMetadataImpl fromPropertiesConfiguration(String column, PropertiesConfiguration config) {
+    Builder builder = new Builder().setTotalDocs(config.getInt(Column.getKeyFor(column, Column.TOTAL_DOCS)))
+        .setCardinality(config.getInt(Column.getKeyFor(column, Column.CARDINALITY)))
+        .setSorted(config.getBoolean(Column.getKeyFor(column, Column.IS_SORTED), false))
+        .setHasDictionary(config.getBoolean(Column.getKeyFor(column, Column.HAS_DICTIONARY), true))
+        .setBitsPerElement(config.getInt(Column.getKeyFor(column, Column.BITS_PER_ELEMENT)))
+        .setColumnMaxLength(config.getInt(Column.getKeyFor(column, Column.DICTIONARY_ELEMENT_SIZE)))
+        .setMaxNumberOfMultiValues(config.getInt(Column.getKeyFor(column, Column.MAX_MULTI_VALUE_ELEMENTS)))
+        .setTotalNumberOfEntries(config.getInt(Column.getKeyFor(column, Column.TOTAL_NUMBER_OF_ENTRIES)))
+        .setAutoGenerated(config.getBoolean(Column.getKeyFor(column, Column.IS_AUTO_GENERATED), false));
+
+    FieldType fieldType =
+        FieldType.valueOf(config.getString(Column.getKeyFor(column, Column.COLUMN_TYPE)).toUpperCase());
+    DataType dataType = DataType.valueOf(config.getString(Column.getKeyFor(column, Column.DATA_TYPE)).toUpperCase());
+    String defaultNullValueString = config.getString(Column.getKeyFor(column, Column.DEFAULT_NULL_VALUE), null);
+    FieldSpec fieldSpec;
+    switch (fieldType) {
+      case DIMENSION:
+        boolean isSingleValue = config.getBoolean(Column.getKeyFor(column, Column.IS_SINGLE_VALUED));
+        fieldSpec = new DimensionFieldSpec(column, dataType, isSingleValue, defaultNullValueString);
+        break;
+      case METRIC:
+        fieldSpec = new MetricFieldSpec(column, dataType, defaultNullValueString);
+        break;
+      case TIME:
+        TimeUnit timeUnit = TimeUnit.valueOf(config.getString(Segment.TIME_UNIT, "DAYS").toUpperCase());
+        fieldSpec = new TimeFieldSpec(new TimeGranularitySpec(dataType, timeUnit, column));
+        break;
+      case DATE_TIME:
+        String format = config.getString(Column.getKeyFor(column, Column.DATETIME_FORMAT));
+        String granularity = config.getString(Column.getKeyFor(column, Column.DATETIME_GRANULARITY));
+        fieldSpec = new DateTimeFieldSpec(column, dataType, format, granularity, defaultNullValueString, null);
+        break;
+      default:
+        throw new IllegalStateException("Unsupported field type: " + fieldType);
+    }
+    builder.setFieldSpec(fieldSpec);
+
+    // Set min/max value if available
+    // NOTE: Use getProperty() instead of getString() to avoid variable substitution ('${anotherKey}'), which can cause
+    //       problem for special values such as '$${' where the first '$' is identified as escape character.
+    // TODO: Use getProperty() for other properties as well to avoid the overhead of variable substitution
+    String minString = (String) config.getProperty(Column.getKeyFor(column, Column.MIN_VALUE));
+    String maxString = (String) config.getProperty(Column.getKeyFor(column, Column.MAX_VALUE));
+    if (minString != null && maxString != null) {
+      switch (dataType.getStoredType()) {
+        case INT:
+          builder.setMinValue(Integer.valueOf(minString));
+          builder.setMaxValue(Integer.valueOf(maxString));
+          break;
+        case LONG:
+          builder.setMinValue(Long.valueOf(minString));
+          builder.setMaxValue(Long.valueOf(maxString));
+          break;
+        case FLOAT:
+          builder.setMinValue(Float.valueOf(minString));
+          builder.setMaxValue(Float.valueOf(maxString));
+          break;
+        case DOUBLE:
+          builder.setMinValue(Double.valueOf(minString));
+          builder.setMaxValue(Double.valueOf(maxString));
+          break;
+        case STRING:
+          builder.setMinValue(minString);
+          builder.setMaxValue(maxString);
+          break;
+        case BYTES:
+          builder.setMinValue(BytesUtils.toByteArray(minString));
+          builder.setMaxValue(BytesUtils.toByteArray(maxString));
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + column);
+      }
+    }
+
+    char paddingCharacter = V1Constants.Str.LEGACY_STRING_PAD_CHAR;
+    String padding = config.getString(Segment.SEGMENT_PADDING_CHARACTER, null);
+    if (padding != null) {
+      paddingCharacter = StringEscapeUtils.unescapeJava(padding).charAt(0);
+    }
+    builder.setPaddingCharacter(paddingCharacter);
+
+    String partitionFunctionName = config.getString(Column.getKeyFor(column, Column.PARTITION_FUNCTION), null);
+    if (partitionFunctionName != null) {
+      int numPartitions = config.getInt(Column.getKeyFor(column, Column.NUM_PARTITIONS));
+      PartitionFunction partitionFunction =
+          PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+      builder.setPartitionFunction(partitionFunction);
+      builder.setPartitions(
+          ColumnPartitionMetadata.extractPartitions(config.getList(Column.getKeyFor(column, Column.PARTITION_VALUES))));
+    }
+
+    return builder.build();
+  }
+
+  public static class Builder {
+    private FieldSpec _fieldSpec;
+    private int _totalDocs;
+    private int _cardinality;
+    private boolean _sorted;
+    private Comparable<?> _minValue;
+    private Comparable<?> _maxValue;
+    private boolean _hasDictionary;
+    private int _columnMaxLength;
+    private char _paddingCharacter;
+    private int _bitsPerElement;
+    private int _maxNumberOfMultiValues;
+    private int _totalNumberOfEntries;
+    private PartitionFunction _partitionFunction;
+    private Set<Integer> _partitions;
+    private boolean _autoGenerated;
+
+    public Builder setFieldSpec(FieldSpec fieldSpec) {
+      _fieldSpec = fieldSpec;
+      return this;
+    }
+
+    public Builder setTotalDocs(int totalDocs) {
+      _totalDocs = totalDocs;
+      return this;
+    }
+
+    public Builder setCardinality(int cardinality) {
+      _cardinality = cardinality;
+      return this;
+    }
+
+    public Builder setSorted(boolean sorted) {
+      _sorted = sorted;
+      return this;
+    }
+
+    public Builder setMinValue(Comparable<?> minValue) {
+      _minValue = minValue;
+      return this;
+    }
+
+    public Builder setMaxValue(Comparable<?> maxValue) {
+      _maxValue = maxValue;
+      return this;
+    }
+
+    public Builder setHasDictionary(boolean hasDictionary) {
+      _hasDictionary = hasDictionary;
+      return this;
+    }
+
+    public Builder setColumnMaxLength(int columnMaxLength) {
+      _columnMaxLength = columnMaxLength;
+      return this;
+    }
+
+    public Builder setPaddingCharacter(char paddingCharacter) {
+      _paddingCharacter = paddingCharacter;
+      return this;
+    }
+
+    public Builder setBitsPerElement(int bitsPerElement) {
+      _bitsPerElement = bitsPerElement;
+      return this;
+    }
+
+    public Builder setMaxNumberOfMultiValues(int maxNumberOfMultiValues) {
+      _maxNumberOfMultiValues = maxNumberOfMultiValues;
+      return this;
+    }
+
+    public Builder setTotalNumberOfEntries(int totalNumberOfEntries) {
+      _totalNumberOfEntries = totalNumberOfEntries;
+      return this;
+    }
+
+    public Builder setPartitionFunction(PartitionFunction partitionFunction) {
+      _partitionFunction = partitionFunction;
+      return this;
+    }
+
+    public Builder setPartitions(Set<Integer> partitions) {
+      _partitions = partitions;
+      return this;
+    }
+
+    public Builder setAutoGenerated(boolean autoGenerated) {
+      _autoGenerated = autoGenerated;
+      return this;
+    }
+
+    public ColumnMetadataImpl build() {
+      return new ColumnMetadataImpl(_fieldSpec, _totalDocs, _cardinality, _sorted, _minValue, _maxValue, _hasDictionary,
+          _columnMaxLength, _paddingCharacter, _bitsPerElement, _maxNumberOfMultiValues, _totalNumberOfEntries,
+          _partitionFunction, _partitions, _autoGenerated);
+    }
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index 141031f..9cb43f2 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -27,7 +27,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.Field;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -44,9 +43,9 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
@@ -74,22 +73,16 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   private long _creationTime = Long.MIN_VALUE;
   private String _timeColumn;
   private TimeUnit _timeUnit;
-  private Interval _timeInterval;
   private Duration _timeGranularity;
-  private long _pushTime = Long.MIN_VALUE;
-  private long _refreshTime = Long.MIN_VALUE;
-
-  private long _lastIndexedTime = Long.MIN_VALUE;
-  private long _latestIngestionTime = Long.MIN_VALUE;
+  private long _segmentStartTime = Long.MAX_VALUE;
+  private long _segmentEndTime = Long.MIN_VALUE;
+  private Interval _timeInterval;
 
   private SegmentVersion _segmentVersion;
   private List<StarTreeV2Metadata> _starTreeV2MetadataList;
   private String _creatorName;
-  private char _paddingCharacter = V1Constants.Str.DEFAULT_STRING_PAD_CHAR;
   private int _totalDocs;
-  private long _segmentStartTime;
-  private long _segmentEndTime;
-  private Map<String, String> _customMap;
+  private final Map<String, String> _customMap = new HashMap<>();
 
   @Deprecated
   private String _rawTableName;
@@ -102,7 +95,6 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     _indexDir = null;
     _columnMetadataMap = new HashMap<>();
     _schema = new Schema();
-    _customMap = new HashMap<>();
 
     PropertiesConfiguration segmentMetadataPropertiesConfiguration =
         CommonsConfigurationUtils.fromInputStream(metadataPropertiesInputStream);
@@ -110,10 +102,9 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     loadCreationMeta(creationMetaInputStream);
 
     setTimeInfo(segmentMetadataPropertiesConfiguration);
-    _totalDocs = segmentMetadataPropertiesConfiguration.getInt(V1Constants.MetadataKeys.Segment.SEGMENT_TOTAL_DOCS);
+    _totalDocs = segmentMetadataPropertiesConfiguration.getInt(Segment.SEGMENT_TOTAL_DOCS);
   }
 
-
   /**
    * For segments on disk.
    * <p>Index directory passed in should be top level segment directory.
@@ -122,10 +113,9 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   public SegmentMetadataImpl(File indexDir)
       throws IOException {
     _indexDir = indexDir;
-    PropertiesConfiguration segmentMetadataPropertiesConfiguration = getPropertiesConfiguration(indexDir);
     _columnMetadataMap = new HashMap<>();
+    PropertiesConfiguration segmentMetadataPropertiesConfiguration = getPropertiesConfiguration(indexDir);
     _schema = new Schema();
-    _customMap = new HashMap<>();
 
     init(segmentMetadataPropertiesConfiguration);
     File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -134,41 +124,19 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     }
 
     setTimeInfo(segmentMetadataPropertiesConfiguration);
-    _totalDocs = segmentMetadataPropertiesConfiguration.getInt(V1Constants.MetadataKeys.Segment.SEGMENT_TOTAL_DOCS);
+    _totalDocs = segmentMetadataPropertiesConfiguration.getInt(Segment.SEGMENT_TOTAL_DOCS);
   }
 
   /**
    * For REALTIME consuming segments.
    */
-  public SegmentMetadataImpl(String rawTableName, String segmentName, long creationTime, long startTime, long endTime,
-      @Nullable TimeUnit timeUnit, long totalDocs, long crc, Schema schema) {
+  public SegmentMetadataImpl(String rawTableName, String segmentName, Schema schema, long creationTime) {
     _indexDir = null;
-    PropertiesConfiguration segmentMetadataPropertiesConfiguration = new PropertiesConfiguration();
-    segmentMetadataPropertiesConfiguration.addProperty(V1Constants.MetadataKeys.Segment.SEGMENT_CREATOR_VERSION, null);
-    segmentMetadataPropertiesConfiguration
-        .addProperty(V1Constants.MetadataKeys.Segment.SEGMENT_PADDING_CHARACTER, V1Constants.Str.DEFAULT_STRING_PAD_CHAR);
-    segmentMetadataPropertiesConfiguration
-        .addProperty(V1Constants.MetadataKeys.Segment.SEGMENT_START_TIME, Long.toString(startTime));
-    segmentMetadataPropertiesConfiguration.addProperty(V1Constants.MetadataKeys.Segment.SEGMENT_END_TIME, Long.toString(endTime));
-    segmentMetadataPropertiesConfiguration.addProperty(V1Constants.MetadataKeys.Segment.TABLE_NAME, rawTableName);
-
-    if (timeUnit != null) {
-      segmentMetadataPropertiesConfiguration.addProperty(V1Constants.MetadataKeys.Segment.TIME_UNIT, timeUnit.toString());
-    } else {
-      segmentMetadataPropertiesConfiguration.addProperty(V1Constants.MetadataKeys.Segment.TIME_UNIT, null);
-    }
-
-    segmentMetadataPropertiesConfiguration.addProperty(V1Constants.MetadataKeys.Segment.SEGMENT_TOTAL_DOCS, totalDocs);
-
-    _crc = crc;
-    _creationTime = creationTime;
-    setTimeInfo(segmentMetadataPropertiesConfiguration);
     _columnMetadataMap = null;
     _rawTableName = rawTableName;
     _segmentName = segmentName;
     _schema = schema;
-    _totalDocs = segmentMetadataPropertiesConfiguration.getInt(V1Constants.MetadataKeys.Segment.SEGMENT_TOTAL_DOCS);
-    _customMap = new HashMap<>();
+    _creationTime = creationTime;
   }
 
   public static PropertiesConfiguration getPropertiesConfiguration(File indexDir) {
@@ -188,29 +156,22 @@ public class SegmentMetadataImpl implements SegmentMetadata {
    * </ul>
    */
   private void setTimeInfo(PropertiesConfiguration segmentMetadataPropertiesConfiguration) {
-    _timeColumn = segmentMetadataPropertiesConfiguration.getString(V1Constants.MetadataKeys.Segment.TIME_COLUMN_NAME);
-    if (segmentMetadataPropertiesConfiguration.containsKey(V1Constants.MetadataKeys.Segment.SEGMENT_START_TIME) && segmentMetadataPropertiesConfiguration
-        .containsKey(V1Constants.MetadataKeys.Segment.SEGMENT_END_TIME) && segmentMetadataPropertiesConfiguration.containsKey(
-        V1Constants.MetadataKeys.Segment.TIME_UNIT)) {
+    _timeColumn = segmentMetadataPropertiesConfiguration.getString(Segment.TIME_COLUMN_NAME);
+    if (segmentMetadataPropertiesConfiguration.containsKey(Segment.SEGMENT_START_TIME)
+        && segmentMetadataPropertiesConfiguration.containsKey(Segment.SEGMENT_END_TIME)
+        && segmentMetadataPropertiesConfiguration.containsKey(Segment.TIME_UNIT)) {
       try {
-        _timeUnit = TimeUtils.timeUnitFromString(segmentMetadataPropertiesConfiguration.getString(
-            V1Constants.MetadataKeys.Segment.TIME_UNIT));
+        _timeUnit = TimeUtils.timeUnitFromString(segmentMetadataPropertiesConfiguration.getString(Segment.TIME_UNIT));
         assert _timeUnit != null;
         _timeGranularity = new Duration(_timeUnit.toMillis(1));
-        String startTimeString = segmentMetadataPropertiesConfiguration.getString(
-            V1Constants.MetadataKeys.Segment.SEGMENT_START_TIME);
-        String endTimeString = segmentMetadataPropertiesConfiguration.getString(
-            V1Constants.MetadataKeys.Segment.SEGMENT_END_TIME);
+        String startTimeString = segmentMetadataPropertiesConfiguration.getString(Segment.SEGMENT_START_TIME);
+        String endTimeString = segmentMetadataPropertiesConfiguration.getString(Segment.SEGMENT_END_TIME);
         _segmentStartTime = Long.parseLong(startTimeString);
         _segmentEndTime = Long.parseLong(endTimeString);
         _timeInterval =
             new Interval(_timeUnit.toMillis(_segmentStartTime), _timeUnit.toMillis(_segmentEndTime), DateTimeZone.UTC);
       } catch (Exception e) {
         LOGGER.warn("Caught exception while setting time interval and granularity", e);
-        _timeInterval = null;
-        _timeGranularity = null;
-        _segmentStartTime = Long.MAX_VALUE;
-        _segmentEndTime = Long.MIN_VALUE;
       }
     }
   }
@@ -233,24 +194,13 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     }
   }
 
-  public Set<String> getAllColumns() {
-    return _schema.getColumnNames();
-  }
-
   private void init(PropertiesConfiguration segmentMetadataPropertiesConfiguration) {
-    if (segmentMetadataPropertiesConfiguration.containsKey(V1Constants.MetadataKeys.Segment.SEGMENT_CREATOR_VERSION)) {
-      _creatorName = segmentMetadataPropertiesConfiguration.getString(
-          V1Constants.MetadataKeys.Segment.SEGMENT_CREATOR_VERSION);
-    }
-
-    if (segmentMetadataPropertiesConfiguration.containsKey(V1Constants.MetadataKeys.Segment.SEGMENT_PADDING_CHARACTER)) {
-      String padding = segmentMetadataPropertiesConfiguration.getString(
-          V1Constants.MetadataKeys.Segment.SEGMENT_PADDING_CHARACTER);
-      _paddingCharacter = StringEscapeUtils.unescapeJava(padding).charAt(0);
+    if (segmentMetadataPropertiesConfiguration.containsKey(Segment.SEGMENT_CREATOR_VERSION)) {
+      _creatorName = segmentMetadataPropertiesConfiguration.getString(Segment.SEGMENT_CREATOR_VERSION);
     }
 
     String versionString =
-        segmentMetadataPropertiesConfiguration.getString(V1Constants.MetadataKeys.Segment.SEGMENT_VERSION, SegmentVersion.v1.toString());
+        segmentMetadataPropertiesConfiguration.getString(Segment.SEGMENT_VERSION, SegmentVersion.v1.toString());
     _segmentVersion = SegmentVersion.valueOf(versionString);
 
     // NOTE: here we only add physical columns as virtual columns should not be loaded from metadata file
@@ -258,24 +208,24 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     // - If key does not exist, it will return an empty list
     // - If key exists but value is missing, it will return a singleton list with an empty string
     Set<String> physicalColumns = new HashSet<>();
-    addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(V1Constants.MetadataKeys.Segment.DIMENSIONS), physicalColumns);
-    addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(V1Constants.MetadataKeys.Segment.METRICS), physicalColumns);
-    addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(V1Constants.MetadataKeys.Segment.TIME_COLUMN_NAME), physicalColumns);
-    addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(V1Constants.MetadataKeys.Segment.DATETIME_COLUMNS), physicalColumns);
+    addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.DIMENSIONS), physicalColumns);
+    addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.METRICS), physicalColumns);
+    addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.TIME_COLUMN_NAME), physicalColumns);
+    addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.DATETIME_COLUMNS), physicalColumns);
 
     // Set the table name (for backward compatibility)
-    String tableName = segmentMetadataPropertiesConfiguration.getString(V1Constants.MetadataKeys.Segment.TABLE_NAME);
+    String tableName = segmentMetadataPropertiesConfiguration.getString(Segment.TABLE_NAME);
     if (tableName != null) {
       _rawTableName = TableNameBuilder.extractRawTableName(tableName);
     }
 
     // Set segment name.
-    _segmentName = segmentMetadataPropertiesConfiguration.getString(V1Constants.MetadataKeys.Segment.SEGMENT_NAME);
+    _segmentName = segmentMetadataPropertiesConfiguration.getString(Segment.SEGMENT_NAME);
 
     // Build column metadata map and schema.
     for (String column : physicalColumns) {
       ColumnMetadata columnMetadata =
-          ColumnMetadata.fromPropertiesConfiguration(column, segmentMetadataPropertiesConfiguration);
+          ColumnMetadataImpl.fromPropertiesConfiguration(column, segmentMetadataPropertiesConfiguration);
       _columnMetadataMap.put(column, columnMetadata);
       _schema.addField(columnMetadata.getFieldSpec());
     }
@@ -297,8 +247,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
 
   private static void setCustomConfigs(Configuration segmentMetadataPropertiesConfiguration,
       Map<String, String> customConfigsMap) {
-    Configuration customConfigs =
-        segmentMetadataPropertiesConfiguration.subset(V1Constants.MetadataKeys.Segment.CUSTOM_SUBSET);
+    Configuration customConfigs = segmentMetadataPropertiesConfiguration.subset(Segment.CUSTOM_SUBSET);
     Iterator<String> customKeysIter = customConfigs.getKeys();
     while (customKeysIter.hasNext()) {
       String key = customKeysIter.next();
@@ -318,29 +267,17 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     }
   }
 
-  public ColumnMetadata getColumnMetadataFor(String column) {
-    return _columnMetadataMap.get(column);
-  }
-
-  public Map<String, ColumnMetadata> getColumnMetadataMap() {
-    return _columnMetadataMap;
-  }
-
-  /**
-   * Removes a column from the segment metadata.
-   */
-  public void removeColumn(String column) {
-    Preconditions.checkState(!column.equals(_timeColumn), "Cannot remove time column: %s", _timeColumn);
-    _columnMetadataMap.remove(column);
-    _schema.removeField(column);
-  }
-
   @Override
   public String getTableName() {
     return _rawTableName;
   }
 
   @Override
+  public String getName() {
+    return _segmentName;
+  }
+
+  @Override
   public String getTimeColumn() {
     return _timeColumn;
   }
@@ -376,11 +313,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   }
 
   @Override
-  public String getVersion() {
-    return _segmentVersion.toString();
-  }
-
-  public SegmentVersion getSegmentVersion() {
+  public SegmentVersion getVersion() {
     return _segmentVersion;
   }
 
@@ -390,11 +323,6 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   }
 
   @Override
-  public String getShardingKey() {
-    return null;
-  }
-
-  @Override
   public int getTotalDocs() {
     return _totalDocs;
   }
@@ -404,42 +332,10 @@ public class SegmentMetadataImpl implements SegmentMetadata {
     return _indexDir;
   }
 
+  @Nullable
   @Override
-  public String getName() {
-    return _segmentName;
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder result = new StringBuilder();
-    final String newLine = System.getProperty("line.separator");
-
-    result.append(this.getClass().getName());
-    result.append(" Object {");
-    result.append(newLine);
-
-    // determine fields declared in this class only (no fields of superclass)
-    final Field[] fields = this.getClass().getDeclaredFields();
-
-    // print field names paired with their values
-    for (final Field field : fields) {
-      result.append("  ");
-      try {
-        result.append(field.getName());
-        result.append(": ");
-        // requires access to private field:
-        result.append(field.get(this));
-      } catch (final IllegalAccessException ex) {
-        if (LOGGER.isWarnEnabled()) {
-          LOGGER.warn("Caught exception while trying to access field {}", field, ex);
-        }
-        result.append("ERROR");
-      }
-      result.append(newLine);
-    }
-    result.append("}");
-
-    return result.toString();
+  public String getCreatorName() {
+    return _creatorName;
   }
 
   @Override
@@ -448,108 +344,38 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   }
 
   @Override
-  public long getPushTime() {
-    return _pushTime;
-  }
-
-  @Override
-  public long getRefreshTime() {
-    return _refreshTime;
-  }
-
-  @Override
   public long getLastIndexedTimestamp() {
-    return _lastIndexedTime;
+    return Long.MIN_VALUE;
   }
 
   @Override
   public long getLatestIngestionTimestamp() {
-    return _latestIngestionTime;
-  }
-
-  @Override
-  public boolean hasDictionary(String columnName) {
-    return _columnMetadataMap.get(columnName).hasDictionary();
-  }
-
-  @Override
-  public boolean close() {
-    return false;
+    return Long.MIN_VALUE;
   }
 
   @Override
-  public Map<String, String> getCustomMap() {
-    return _customMap;
-  }
-
   public List<StarTreeV2Metadata> getStarTreeV2MetadataList() {
     return _starTreeV2MetadataList;
   }
 
   @Override
-  public String getForwardIndexFileName(String column) {
-    ColumnMetadata columnMetadata = getColumnMetadataFor(column);
-    StringBuilder fileNameBuilder = new StringBuilder(column);
-    // starting v2 we will append the forward index files with version
-    // if (!SegmentVersion.v1.toString().equalsIgnoreCase(segmentVersion)) {
-    // fileNameBuilder.append("_").append(segmentVersion);
-    // }
-    if (columnMetadata.isSingleValue()) {
-      if (!columnMetadata.hasDictionary()) {
-        fileNameBuilder.append(V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
-      } else if (columnMetadata.isSorted()) {
-        fileNameBuilder.append(V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
-      } else {
-        fileNameBuilder.append(V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
-      }
-    } else {
-      fileNameBuilder.append(V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
-    }
-    return fileNameBuilder.toString();
-  }
-
-  @Override
-  public String getDictionaryFileName(String column) {
-    return column + V1Constants.Dict.FILE_EXTENSION;
-  }
-
-  @Override
-  public String getBitmapInvertedIndexFileName(String column) {
-    return column + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
-  }
-
-  @Override
-  public String getBitmapRangeIndexFileName(String column) {
-    return column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
-  }
-
-  @Override
-  public String getBloomFilterFileName(String column) {
-    return column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION;
+  public Map<String, String> getCustomMap() {
+    return _customMap;
   }
 
   @Override
-  public String getNullValueVectorFileName(String column) {
-    return column + V1Constants.Indexes.NULLVALUE_VECTOR_FILE_EXTENSION;
+  public Map<String, ColumnMetadata> getColumnMetadataMap() {
+    return _columnMetadataMap;
   }
 
-  @Nullable
   @Override
-  public String getCreatorName() {
-    return _creatorName;
+  public void removeColumn(String column) {
+    Preconditions.checkState(!column.equals(_timeColumn), "Cannot remove time column: %s", _timeColumn);
+    _columnMetadataMap.remove(column);
+    _schema.removeField(column);
   }
 
   @Override
-  public char getPaddingCharacter() {
-    return _paddingCharacter;
-  }
-
-  /**
-   * Converts segment metadata to json
-   * @param columnFilter list only  the columns in the set. Lists all the columns if
-   *                     the parameter value is null
-   * @return json representation of segment metadata
-   */
   public JsonNode toJson(@Nullable Set<String> columnFilter) {
     ObjectNode segmentMetadata = JsonUtils.newObjectNode();
     segmentMetadata.put("segmentName", _segmentName);
@@ -574,17 +400,8 @@ public class SegmentMetadataImpl implements SegmentMetadata {
       segmentMetadata.put("endTimeReadable", _timeInterval.getEnd().toString());
     }
 
-    segmentMetadata.put("pushTimeMillis", _pushTime);
-    String pushTimeStr = _pushTime != Long.MIN_VALUE ? dateFormat.format(new Date(_pushTime)) : null;
-    segmentMetadata.put("pushTimeReadable", pushTimeStr);
-
-    segmentMetadata.put("refreshTimeMillis", _refreshTime);
-    String refreshTimeStr = _refreshTime != Long.MIN_VALUE ? dateFormat.format(new Date(_refreshTime)) : null;
-    segmentMetadata.put("refreshTimeReadable", refreshTimeStr);
-
     segmentMetadata.put("segmentVersion", _segmentVersion.toString());
     segmentMetadata.put("creatorName", _creatorName);
-    segmentMetadata.put("paddingCharacter", String.valueOf(_paddingCharacter));
 
     ObjectNode customConfigs = JsonUtils.newObjectNode();
     for (String key : _customMap.keySet()) {
@@ -604,4 +421,9 @@ public class SegmentMetadataImpl implements SegmentMetadata {
 
     return segmentMetadata;
   }
+
+  @Override
+  public String toString() {
+    return toJson(null).toString();
+  }
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/SegmentMetadataFetcher.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/SegmentMetadataFetcher.java
index fe24004..f56af91 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/SegmentMetadataFetcher.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/SegmentMetadataFetcher.java
@@ -31,8 +31,8 @@ import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
@@ -60,7 +60,7 @@ public class SegmentMetadataFetcher {
   private static final String STAR_TREE_METRIC_AGGREGATIONS = "metric-aggregations";
   private static final String STAR_TREE_MAX_LEAF_RECORDS = "max-leaf-records";
   private static final String STAR_TREE_DIMENSION_COLUMNS_SKIPPED = "dimension-columns-skipped";
-  
+
 
   /**
    * This is a helper method that fetches the segment metadata for a given segment.
@@ -68,7 +68,7 @@ public class SegmentMetadataFetcher {
    */
   public static String getSegmentMetadata(SegmentDataManager segmentDataManager, List<String> columns)
       throws JsonProcessingException {
-    SegmentMetadataImpl segmentMetadata = (SegmentMetadataImpl) segmentDataManager.getSegment().getSegmentMetadata();
+    SegmentMetadata segmentMetadata = segmentDataManager.getSegment().getSegmentMetadata();
     Set<String> columnSet;
     if (columns.size() == 1 && columns.get(0).equals("*")) {
       columnSet = null;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 45c657f..8d52c80 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -61,7 +61,6 @@ import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.server.api.access.AccessControl;
 import org.apache.pinot.server.api.access.AccessControlFactory;
 import org.apache.pinot.server.starter.ServerInstance;
@@ -171,9 +170,8 @@ public class TablesResource {
     try {
       Map<String, String> segmentCrcForTable = new HashMap<>();
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-        SegmentMetadataImpl segmentMetadata =
-            (SegmentMetadataImpl) segmentDataManager.getSegment().getSegmentMetadata();
-        segmentCrcForTable.put(segmentDataManager.getSegmentName(), segmentMetadata.getCrc());
+        segmentCrcForTable
+            .put(segmentDataManager.getSegmentName(), segmentDataManager.getSegment().getSegmentMetadata().getCrc());
       }
       return ResourceUtils.convertToJsonString(segmentCrcForTable);
     } catch (Exception e) {
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 29864ba..997dbfb 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.restlet.resources.TablesList;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -108,16 +109,10 @@ public class TablesResourceTest extends BaseResourceTest {
 
     JsonNode jsonResponse =
         JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class));
-    SegmentMetadataImpl segmentMetadata = (SegmentMetadataImpl) defaultSegment.getSegmentMetadata();
+    SegmentMetadata segmentMetadata = defaultSegment.getSegmentMetadata();
     Assert.assertEquals(jsonResponse.get("segmentName").asText(), segmentMetadata.getName());
     Assert.assertEquals(jsonResponse.get("crc").asText(), segmentMetadata.getCrc());
     Assert.assertEquals(jsonResponse.get("creationTimeMillis").asLong(), segmentMetadata.getIndexCreationTime());
-    Assert.assertEquals(jsonResponse.get("paddingCharacter").asText(),
-        String.valueOf(segmentMetadata.getPaddingCharacter()));
-    Assert.assertEquals(jsonResponse.get("refreshTimeMillis").asLong(), segmentMetadata.getRefreshTime());
-    Assert.assertEquals(jsonResponse.get("pushTimeMillis").asLong(), segmentMetadata.getPushTime());
-    Assert.assertTrue(jsonResponse.has("pushTimeReadable"));
-    Assert.assertTrue(jsonResponse.has("refreshTimeReadable"));
     Assert.assertTrue(jsonResponse.has("startTimeReadable"));
     Assert.assertTrue(jsonResponse.has("endTimeReadable"));
     Assert.assertTrue(jsonResponse.has("creationTimeReadable"));
@@ -212,32 +207,41 @@ public class TablesResourceTest extends BaseResourceTest {
   }
 
   @Test
-  public void testUploadSegments() throws Exception {
-    setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS, null, _realtimeIndexSegments);
-    setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE, null, _realtimeIndexSegments);
+  public void testUploadSegments()
+      throws Exception {
+    setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS, null,
+        _realtimeIndexSegments);
+    setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE, null,
+        _realtimeIndexSegments);
 
     // Verify segment uploading succeed.
-    Response response = _webTarget.path(String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
-        LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null);
+    Response response = _webTarget.path(String
+        .format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+            LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null);
     Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
     Assert.assertEquals(response.readEntity(String.class), SEGMENT_DOWNLOAD_URL);
 
     // Verify bad request: table type is offline
-    response = _webTarget.path(String.format("/segments/%s/%s/upload", TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), _offlineIndexSegments.get(0).getSegmentName())).request().post(null);
+    response = _webTarget.path(String
+        .format("/segments/%s/%s/upload", TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
+            _offlineIndexSegments.get(0).getSegmentName())).request().post(null);
     Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
 
     // Verify bad request: segment is not low level consumer segment
-    response = _webTarget.path(String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), _realtimeIndexSegments.get(0).getSegmentName())).request().post(null);
+    response = _webTarget.path(String
+        .format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+            _realtimeIndexSegments.get(0).getSegmentName())).request().post(null);
     Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
 
     // Verify non-existent segment uploading fail with NOT_FOUND status.
-    response = _webTarget.path(String.format("/segments/%s/%s_dummy/upload", TABLE_NAME,
-        LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null);
+    response =
+        _webTarget.path(String.format("/segments/%s/%s_dummy/upload", TABLE_NAME, LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS))
+            .request().post(null);
     Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
 
     // Verify fail to upload segment to segment store with internal server error.
-    response = _webTarget.path(String.format("/segments/%s/%s/upload", TABLE_NAME,
-        LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE)).request().post(null);
+    response = _webTarget.path(String.format("/segments/%s/%s/upload", TABLE_NAME, LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE))
+        .request().post(null);
     Assert.assertEquals(response.getStatus(), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
   }
 
@@ -252,16 +256,10 @@ public class TablesResourceTest extends BaseResourceTest {
     JsonNode jsonResponse =
         JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class));
 
-    SegmentMetadataImpl segmentMetadata = (SegmentMetadataImpl) defaultSegment.getSegmentMetadata();
+    SegmentMetadata segmentMetadata = defaultSegment.getSegmentMetadata();
     Assert.assertEquals(jsonResponse.get("segmentName").asText(), segmentMetadata.getName());
     Assert.assertEquals(jsonResponse.get("crc").asText(), segmentMetadata.getCrc());
     Assert.assertEquals(jsonResponse.get("creationTimeMillis").asLong(), segmentMetadata.getIndexCreationTime());
-    Assert.assertEquals(jsonResponse.get("paddingCharacter").asText(),
-        String.valueOf(segmentMetadata.getPaddingCharacter()));
-    Assert.assertEquals(jsonResponse.get("refreshTimeMillis").asLong(), segmentMetadata.getRefreshTime());
-    Assert.assertEquals(jsonResponse.get("pushTimeMillis").asLong(), segmentMetadata.getPushTime());
-    Assert.assertTrue(jsonResponse.has("pushTimeReadable"));
-    Assert.assertTrue(jsonResponse.has("refreshTimeReadable"));
     Assert.assertTrue(jsonResponse.has("startTimeReadable"));
     Assert.assertTrue(jsonResponse.has("endTimeReadable"));
     Assert.assertTrue(jsonResponse.has("creationTimeReadable"));
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/ArrayBasedGlobalDictionaries.java b/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/ArrayBasedGlobalDictionaries.java
index 6816b55..c7447c8 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/ArrayBasedGlobalDictionaries.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/ArrayBasedGlobalDictionaries.java
@@ -28,7 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.utils.ByteArray;
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/GlobalDictionaries.java b/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/GlobalDictionaries.java
index 762125b..1450dd5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/GlobalDictionaries.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/GlobalDictionaries.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.tools.anonymizer;
 
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 
 
 public interface GlobalDictionaries {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/MapBasedGlobalDictionaries.java b/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/MapBasedGlobalDictionaries.java
index 3f6bc52..432ba78 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/MapBasedGlobalDictionaries.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/MapBasedGlobalDictionaries.java
@@ -29,7 +29,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.utils.ByteArray;
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/PinotDataAndQueryAnonymizer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/PinotDataAndQueryAnonymizer.java
index 351eadb..6d4f8bf 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/PinotDataAndQueryAnonymizer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/anonymizer/PinotDataAndQueryAnonymizer.java
@@ -66,9 +66,9 @@ import org.apache.pinot.pql.parsers.pql2.ast.StringLiteralAstNode;
 import org.apache.pinot.pql.parsers.pql2.ast.WhereAstNode;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/Projection.java b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/Projection.java
index 65280ed..3c4f117 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/Projection.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/Projection.java
@@ -26,8 +26,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/SegmentQueryProcessor.java b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/SegmentQueryProcessor.java
index 2300054..71ffe3a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/SegmentQueryProcessor.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/SegmentQueryProcessor.java
@@ -33,9 +33,9 @@ import org.apache.pinot.common.request.GroupBy;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -84,7 +84,6 @@ class SegmentQueryProcessor {
   }
 
   public void close() {
-    _metadata.close();
     _immutableSegment.destroy();
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org