You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/04/03 00:16:26 UTC

[hudi] branch master updated: [HUDI-3664] Fixing Column Stats Index composition (#5181)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cc3737b  [HUDI-3664] Fixing Column Stats Index composition  (#5181)
cc3737b is described below

commit cc3737be506475c11c12471be6d0296ea14c7f39
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Sat Apr 2 17:15:52 2022 -0700

    [HUDI-3664] Fixing Column Stats Index composition  (#5181)
    
    Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
 azure-pipelines.yml                                |  22 +-
 .../hudi/cli/commands/TestCleansCommand.java       |   9 +-
 .../hudi/cli/commands/TestRollbacksCommand.java    |   8 +
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |  13 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  35 +-
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |   3 +-
 .../org/apache/hudi/client/TestClientRollback.java |   9 +-
 .../functional/TestHoodieBackedMetadata.java       |  96 +++--
 .../functional/TestHoodieBackedTableMetadata.java  |  26 +-
 .../java/org/apache/hudi/table/TestCleaner.java    |   1 +
 .../table/action/compact/TestInlineCompaction.java |   4 +-
 .../table/functional/TestCleanPlanExecutor.java    |  21 +-
 .../hudi/testutils/HoodieClientTestHarness.java    |  44 ++-
 hudi-common/pom.xml                                |   6 +
 hudi-common/src/main/avro/HoodieMetadata.avsc      | 186 +++++++++-
 .../apache/hudi/avro/ConvertingGenericData.java    | 144 ++++++++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 106 +++++-
 .../apache/hudi/avro/HoodieAvroWriteSupport.java   |   2 +-
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  14 +-
 .../common/model/HoodieColumnRangeMetadata.java    |  34 --
 .../hudi/common/model/HoodieCommitMetadata.java    |   5 +-
 .../hudi/common/model/HoodieDeltaWriteStat.java    |  20 +-
 .../org/apache/hudi/common/util/DateTimeUtils.java |  29 ++
 .../hudi/metadata/HoodieMetadataPayload.java       | 177 ++++++++-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 398 ++++++++++++++-------
 .../hudi/metadata/MetadataPartitionType.java       |   2 +
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  |  43 ++-
 .../common/functional/TestHoodieLogFormat.java     | 116 +++---
 .../hudi/common/testutils/FileCreateUtils.java     |  19 +-
 .../hudi/common/testutils/HoodieTestTable.java     |   8 +-
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  | 216 +++++++++++
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  64 +---
 .../spark/{ => sql}/HoodieSparkTypeUtils.scala     |  11 +-
 .../spark/{ => sql}/HoodieUnsafeRDDUtils.scala     |  22 +-
 .../apache/spark/sql/hudi/DataSkippingUtils.scala  |   2 +-
 .../org/apache/hudi}/ColumnStatsIndexHelper.java   |  65 +---
 .../index/zorder/column-stats-index-table.json     |   4 +
 .../zorder/updated-column-stats-index-table.json   |   8 +
 .../index/zorder/z-index-table-merged.json         |   8 -
 .../test/resources/index/zorder/z-index-table.json |   4 -
 .../org/apache/hudi/TestDataSkippingUtils.scala    |  52 ++-
 .../hudi/functional/TestColumnStatsIndex.scala     | 236 +++++-------
 .../TestMetadataTableWithSparkDataSource.scala     |  17 +-
 .../functional/TestParquetColumnProjection.scala   |   3 +-
 .../benchmark/SpaceCurveOptimizeBenchmark.scala    |   4 +-
 .../sql/HoodieSpark2CatalystExpressionUtils.scala  |   2 +-
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  60 +++-
 .../HoodieSpark3_1CatalystExpressionUtils.scala    |   2 +-
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  59 ++-
 .../HoodieSpark3_2CatalystExpressionUtils.scala    |   2 +-
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  59 ++-
 .../utilities/HoodieMetadataTableValidator.java    |  35 +-
 52 files changed, 1781 insertions(+), 754 deletions(-)

diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index f1b25db..8ca54c1 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -43,7 +43,7 @@ stages:
               options: -T 2.5C -DskipTests
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
           - task: Maven@3
             displayName: UT common flink client/spark-client
             inputs:
@@ -52,7 +52,7 @@ stages:
               options: -Punit-tests -pl hudi-common,hudi-flink-datasource/hudi-flink,hudi-client/hudi-spark-client
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
           - task: Maven@3
             displayName: FT common flink
             inputs:
@@ -61,7 +61,7 @@ stages:
               options: -Pfunctional-tests -pl hudi-common,hudi-flink-datasource/hudi-flink
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
       - job: UT_FT_2
         displayName: FT client/spark-client
         timeoutInMinutes: '90'
@@ -74,7 +74,7 @@ stages:
               options: -T 2.5C -DskipTests
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
           - task: Maven@3
             displayName: FT client/spark-client
             inputs:
@@ -83,7 +83,7 @@ stages:
               options: -Pfunctional-tests -pl hudi-client/hudi-spark-client
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
       - job: UT_FT_3
         displayName: UT FT clients & cli & utilities & sync/hive-sync
         timeoutInMinutes: '90'
@@ -96,7 +96,7 @@ stages:
               options: -T 2.5C -DskipTests
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
           - task: Maven@3
             displayName: UT clients & cli & utilities & sync/hive-sync
             inputs:
@@ -105,7 +105,7 @@ stages:
               options: -Punit-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
           - task: Maven@3
             displayName: FT clients & cli & utilities & sync/hive-sync
             inputs:
@@ -114,7 +114,7 @@ stages:
               options: -Pfunctional-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
       - job: UT_FT_4
         displayName: UT FT other modules
         timeoutInMinutes: '90'
@@ -127,7 +127,7 @@ stages:
               options: -T 2.5C -DskipTests
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
           - task: Maven@3
             displayName: UT other modules
             inputs:
@@ -136,7 +136,7 @@ stages:
               options: -Punit-tests -pl $(EXCLUDE_TESTED_MODULES)
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
           - task: Maven@3
             displayName: FT other modules
             inputs:
@@ -145,7 +145,7 @@ stages:
               options: -Pfunctional-tests -pl $(EXCLUDE_TESTED_MODULES)
               publishJUnitResults: false
               jdkVersionOption: '1.8'
-              mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+              mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
       - job: IT
         displayName: IT modules
         timeoutInMinutes: '90'
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
index c475c63..cac4f13 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
@@ -27,6 +27,7 @@ import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
 import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -35,6 +36,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.hadoop.conf.Configuration;
@@ -47,8 +49,10 @@ import org.springframework.shell.core.CommandResult;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -94,8 +98,11 @@ public class TestCleansCommand extends CLIFunctionalTestHarness {
       // Inflight Compaction
       HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
           new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
+
+      Map<String, String> extraCommitMetadata =
+          Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA);
       HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf, fileId1, fileId2,
-          Option.empty(), Option.empty());
+          Option.empty(), Option.empty(), extraCommitMetadata);
     }
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
index 0aec7c5..cf4faf2 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
@@ -26,6 +26,7 @@ import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
 import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -83,6 +84,13 @@ public class TestRollbacksCommand extends CLIFunctionalTestHarness {
     };
 
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
+        .withMetadataConfig(
+            // Column Stats Index is disabled, since these tests construct tables which are
+            // not valid (empty commit metadata, etc)
+            HoodieMetadataConfig.newBuilder()
+                .withMetadataIndexColumnStats(false)
+                .build()
+        )
         .withRollbackUsingMarkers(false)
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
     HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index 522bbb3..8897152 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.io.HoodieRangeInfoHandle;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -55,6 +54,7 @@ import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.toList;
 import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
 import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
+import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
 import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
 
@@ -206,7 +206,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
    * @return List of partition and file column range info pairs
    */
   protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
-      List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+      List<String> partitions, final HoodieEngineContext context, final HoodieTable<?, ?, ?, ?> hoodieTable) {
     // also obtain file ranges, if range pruning is enabled
     context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices");
 
@@ -221,15 +221,16 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
         return Stream.empty();
       }
       try {
-        Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap = hoodieTable
-            .getMetadataTable().getColumnStats(partitionFileNameList, keyField);
+        Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
+            hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
         List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
         for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
           result.add(Pair.of(entry.getKey().getLeft(),
               new BloomIndexFileInfo(
                   FSUtils.getFileId(entry.getKey().getRight()),
-                  entry.getValue().getMinValue(),
-                  entry.getValue().getMaxValue()
+                  // NOTE: Here we assume that the type of the primary key field is string
+                  (String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
+                  (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
               )));
         }
         return result.stream();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index efae458..4ab4be3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.io;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -56,11 +60,6 @@ import org.apache.hudi.exception.HoodieAppendException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.table.HoodieTable;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -76,8 +75,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.metadata.HoodieTableMetadataUtil.accumulateColumnRanges;
-import static org.apache.hudi.metadata.HoodieTableMetadataUtil.aggregateColumnStats;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
 
 /**
  * IO Operation to append data onto an existing file.
@@ -349,26 +347,21 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
 
     if (config.isMetadataColumnStatsIndexEnabled()) {
       final List<Schema.Field> fieldsToIndex;
-      if (!StringUtils.isNullOrEmpty(config.getColumnsEnabledForColumnStatsIndex())) {
+      if (StringUtils.isNullOrEmpty(config.getColumnsEnabledForColumnStatsIndex())) {
+        // If column stats index is enabled but columns not configured then we assume that all columns should be indexed
+        fieldsToIndex = writeSchemaWithMetaFields.getFields();
+      } else {
         Set<String> columnsToIndex = Stream.of(config.getColumnsEnabledForColumnStatsIndex().split(","))
             .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
+
         fieldsToIndex = writeSchemaWithMetaFields.getFields().stream()
             .filter(field -> columnsToIndex.contains(field.name())).collect(Collectors.toList());
-      } else {
-        // if column stats index is enabled but columns not configured then we assume that all columns should be indexed
-        fieldsToIndex = writeSchemaWithMetaFields.getFields();
       }
 
-      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()
-          ? stat.getRecordsStats().get().getStats() : new HashMap<>();
-      final String filePath = stat.getPath();
-      // initialize map of column name to map of stats name to stats value
-      Map<String, Map<String, Object>> columnToStats = new HashMap<>();
-      fieldsToIndex.forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>()));
-      // collect stats for columns at once per record and keep iterating through every record to eventually find col stats for all fields.
-      recordList.forEach(record -> aggregateColumnStats(record, fieldsToIndex, columnToStats, config.isConsistentLogicalTimestampEnabled()));
-      fieldsToIndex.forEach(field -> accumulateColumnRanges(field, filePath, columnRangeMap, columnToStats));
-      stat.setRecordsStats(new HoodieDeltaWriteStat.RecordsStats<>(columnRangeMap));
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
+          collectColumnRangeMetadata(recordList, fieldsToIndex, stat.getPath());
+
+      stat.setRecordsStats(columnRangesMetadataMap);
     }
 
     resetWriteCounts();
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index b46995e..57eb32f 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -31,12 +31,11 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql.{DataFrame, SparkSession}
 import java.util.Properties
 
 import org.apache.hudi.internal.schema.InternalSchema
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index c06b0a0..d2dabc0 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -318,7 +318,14 @@ public class TestClientRollback extends HoodieClientTestBase {
 
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withRollbackUsingMarkers(false)
-        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+        .withMetadataConfig(
+            HoodieMetadataConfig.newBuilder()
+                // Column Stats Index is disabled, since these tests construct tables which are
+                // not valid (empty commit metadata, invalid parquet files)
+                .withMetadataIndexColumnStats(false)
+                .enable(enableMetadataTable)
+                .build()
+        )
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 5c73d96..61c2775 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -154,6 +154,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @Tag("functional")
 public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
@@ -859,30 +860,31 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
       }
 
       Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
-      HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
-
-      while (logFileReader.hasNext()) {
-        HoodieLogBlock logBlock = logFileReader.next();
-        if (logBlock instanceof HoodieDataBlock) {
-          try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
-            recordItr.forEachRemaining(indexRecord -> {
-              final GenericRecord record = (GenericRecord) indexRecord;
-              if (enableMetaFields) {
-                // Metadata table records should have meta fields!
-                assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
-                assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
-              } else {
-                // Metadata table records should not have meta fields!
-                assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
-                assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
-              }
-
-              final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
-              assertFalse(key.isEmpty());
-              if (enableMetaFields) {
-                assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
-              }
-            });
+
+      try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
+        while (logFileReader.hasNext()) {
+          HoodieLogBlock logBlock = logFileReader.next();
+          if (logBlock instanceof HoodieDataBlock) {
+            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+              recordItr.forEachRemaining(indexRecord -> {
+                final GenericRecord record = (GenericRecord) indexRecord;
+                if (enableMetaFields) {
+                  // Metadata table records should have meta fields!
+                  assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+                  assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+                } else {
+                  // Metadata table records should not have meta fields!
+                  assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+                  assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+                }
+
+                final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
+                assertFalse(key.isEmpty());
+                if (enableMetaFields) {
+                  assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
+                }
+              });
+            }
           }
         }
       }
@@ -2214,11 +2216,57 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
       assertTrue(latestSlices.size()
           <= (numFileVersions * metadataEnabledPartitionTypes.get(partition).getFileGroupCount()), "Should limit file slice to "
           + numFileVersions + " per file group, but was " + latestSlices.size());
+      List<HoodieLogFile> logFiles = latestSlices.get(0).getLogFiles().collect(Collectors.toList());
+      try {
+        if (MetadataPartitionType.FILES.getPartitionPath().equals(partition)) {
+          verifyMetadataRawRecords(table, logFiles, false);
+        }
+        if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) {
+          verifyMetadataColumnStatsRecords(logFiles);
+        }
+      } catch (IOException e) {
+        LOG.error("Metadata record validation failed", e);
+        fail("Metadata record validation failed");
+      }
     });
 
     LOG.info("Validation time=" + timer.endTimer());
   }
 
+  private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles) throws IOException {
+    for (HoodieLogFile logFile : logFiles) {
+      FileStatus[] fsStatus = fs.listStatus(logFile.getPath());
+      MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(fs, logFile.getPath());
+      if (writerSchemaMsg == null) {
+        // not a data block
+        continue;
+      }
+
+      Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
+      try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
+        while (logFileReader.hasNext()) {
+          HoodieLogBlock logBlock = logFileReader.next();
+          if (logBlock instanceof HoodieDataBlock) {
+            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+              recordItr.forEachRemaining(indexRecord -> {
+                final GenericRecord record = (GenericRecord) indexRecord;
+                final GenericRecord colStatsRecord = (GenericRecord) record.get(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS);
+                assertNotNull(colStatsRecord);
+                assertNotNull(colStatsRecord.get(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME));
+                assertNotNull(colStatsRecord.get(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT));
+                /**
+                 * TODO: some types of field may have null min/max as these statistics are only supported for primitive types
+                 * assertNotNull(colStatsRecord.get(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE));
+                 * assertNotNull(colStatsRecord.get(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE));
+                 */
+              });
+            }
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Returns the list of all files in the dataset by iterating over the metadata table.
    *
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 70f54b1..323724a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -288,19 +288,19 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
       }
 
       Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
-      HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
-
-      while (logFileReader.hasNext()) {
-        HoodieLogBlock logBlock = logFileReader.next();
-        if (logBlock instanceof HoodieDataBlock) {
-          try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
-            recordItr.forEachRemaining(indexRecord -> {
-              final GenericRecord record = (GenericRecord) indexRecord;
-              assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
-              assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
-              final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
-              assertFalse(key.isEmpty());
-            });
+      try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
+        while (logFileReader.hasNext()) {
+          HoodieLogBlock logBlock = logFileReader.next();
+          if (logBlock instanceof HoodieDataBlock) {
+            try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+              recordItr.forEachRemaining(indexRecord -> {
+                final GenericRecord record = (GenericRecord) indexRecord;
+                assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+                assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+                final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
+                assertFalse(key.isEmpty());
+              });
+            }
           }
         }
       }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index dd8a83b..7e774c3 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -1509,6 +1509,7 @@ public class TestCleaner extends HoodieClientTestBase {
   protected static HoodieCommitMetadata generateCommitMetadata(
       String instantTime, Map<String, List<String>> partitionToFilePaths) {
     HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA);
     partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> {
       HoodieWriteStat writeStat = new HoodieWriteStat();
       writeStat.setPartitionPath(partitionPath);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 310ff4f..7f1046b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -119,7 +119,7 @@ public class TestInlineCompaction extends CompactionTestBase {
   @Test
   public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
     // Given: make three commits
-    HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME);
+    HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_OR_TIME);
     try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
       List<HoodieRecord> records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
       HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
@@ -134,7 +134,7 @@ public class TestInlineCompaction extends CompactionTestBase {
       assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
       // 4th commit, that will trigger compaction because reach the time elapsed
       metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
-      finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
+      finalInstant = HoodieActiveTimeline.createNewInstantTime(60000);
       createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
 
       metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index 60367f8..90d0f88 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -111,7 +111,10 @@ public class TestCleanPlanExecutor extends TestCleaner {
       boolean simulateFailureRetry, boolean simulateMetadataFailure,
       boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
-        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+        .withMetadataConfig(
+            HoodieMetadataConfig.newBuilder()
+                .withAssumeDatePartitioning(true)
+                .build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withIncrementalCleaningMode(enableIncrementalClean)
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
@@ -384,7 +387,13 @@ public class TestCleanPlanExecutor extends TestCleaner {
 
     HoodieWriteConfig config =
                 HoodieWriteConfig.newBuilder().withPath(basePath)
-                        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+                        .withMetadataConfig(
+                            HoodieMetadataConfig.newBuilder()
+                                .withAssumeDatePartitioning(true)
+                                // Column Stats Index is disabled, since these tests construct tables which are
+                                // not valid (empty commit metadata, invalid parquet files)
+                                .withMetadataIndexColumnStats(false)
+                                .build())
                         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                                 .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
                         .build();
@@ -422,7 +431,13 @@ public class TestCleanPlanExecutor extends TestCleaner {
 
     HoodieWriteConfig config =
             HoodieWriteConfig.newBuilder().withPath(basePath)
-                    .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+                    .withMetadataConfig(
+                        HoodieMetadataConfig.newBuilder()
+                            .withAssumeDatePartitioning(true)
+                            // Column Stats Index is disabled, since these tests construct tables which are
+                            // not valid (empty commit metadata, invalid parquet files)
+                            .withMetadataIndexColumnStats(false)
+                            .build())
                     .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
                     .build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index ab3d504..1b41769 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -17,6 +17,13 @@
 
 package org.apache.hudi.testutils;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -25,6 +32,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
@@ -59,19 +67,12 @@ import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.timeline.service.TimelineService;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -82,6 +83,7 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestInfo;
+import scala.Tuple2;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -91,14 +93,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import scala.Tuple2;
-
 import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -571,7 +573,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
       }
     });
     if (doFullValidation) {
-      runFullValidation(writeConfig, metadataTableBasePath, engineContext);
+      runFullValidation(table.getConfig().getMetadataConfig(), writeConfig, metadataTableBasePath, engineContext);
     }
 
     LOG.info("Validation time=" + timer.endTimer());
@@ -644,7 +646,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
     assertEquals(metadataFilenames.size(), numFiles);
   }
 
-  private void runFullValidation(HoodieWriteConfig writeConfig, String metadataTableBasePath, HoodieSparkEngineContext engineContext) {
+  private void runFullValidation(HoodieMetadataConfig metadataConfig,
+                                 HoodieWriteConfig writeConfig,
+                                 String metadataTableBasePath,
+                                 HoodieSparkEngineContext engineContext) {
     HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(writeConfig);
     assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
 
@@ -666,16 +671,25 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
     // in the .hoodie folder.
     List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, HoodieTableMetadata.getMetadataTableBasePath(basePath),
         false, false);
-    Assertions.assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size());
+
+    List<MetadataPartitionType> enabledPartitionTypes = metadataWriter.getEnabledPartitionTypes();
+
+    Assertions.assertEquals(enabledPartitionTypes.size(), metadataTablePartitions.size());
+
+    Map<String, MetadataPartitionType> partitionTypeMap = enabledPartitionTypes.stream()
+        .collect(Collectors.toMap(MetadataPartitionType::getPartitionPath, Function.identity()));
 
     // Metadata table should automatically compact and clean
     // versions are +1 as autoClean / compaction happens end of commits
     int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1;
     HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
     metadataTablePartitions.forEach(partition -> {
+      MetadataPartitionType partitionType = partitionTypeMap.get(partition);
+
       List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
-      assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() <= 1, "Should have a single latest base file");
-      assertTrue(latestSlices.size() <= 1, "Should have a single latest file slice");
+
+      assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).filter(Objects::nonNull).count() <= partitionType.getFileGroupCount(), "Should have a single latest base file");
+      assertTrue(latestSlices.size() <= partitionType.getFileGroupCount(), "Should have a single latest file slice");
       assertTrue(latestSlices.size() <= numFileVersions, "Should limit file slice to "
           + numFileVersions + " but was " + latestSlices.size());
     });
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index ece8d24..0fc91b2 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -265,5 +265,11 @@
       <version>1.8.0</version>
     </dependency>
 
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 </project>
diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc b/hudi-common/src/main/avro/HoodieMetadata.avsc
index 4b458bd..a8d7ca7 100644
--- a/hudi-common/src/main/avro/HoodieMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieMetadata.avsc
@@ -122,17 +122,195 @@
                             "doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type",
                             "name": "minValue",
                             "type": [
+                                // Those types should be aligned with Parquet `Statistics` impl
+                                // making sure that we implement semantic consistent across file formats
+                                //
+                                // NOTE: Other logical types (decimal, date, timestamp, etc) will be converted
+                                //       into one of the following types, making sure that their corresponding
+                                //       ordering is preserved
                                 "null",
-                                "string"
-                            ]
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "BooleanWrapper",
+                                    "doc": "A record wrapping boolean type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": "boolean",
+                                            "name": "value"
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "IntWrapper",
+                                    "doc": "A record wrapping int type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": "int",
+                                            "name": "value"
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "LongWrapper",
+                                    "doc": "A record wrapping long type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": "long",
+                                            "name": "value"
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "FloatWrapper",
+                                    "doc": "A record wrapping float type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": "float",
+                                            "name": "value"
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "DoubleWrapper",
+                                    "doc": "A record wrapping double type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": "double",
+                                            "name": "value"
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "BytesWrapper",
+                                    "doc": "A record wrapping bytes type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": "bytes",
+                                            "name": "value"
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "StringWrapper",
+                                    "doc": "A record wrapping string type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": "string",
+                                            "name": "value"
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "DateWrapper",
+                                    "doc": "A record wrapping Date logical type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": {
+                                                "type": "int"
+                                                // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we can't
+                                                //       rely on logical types to do proper encoding of the native Java types,
+                                                //       and hereby have to encode statistic manually
+                                                //"logicalType": "date"
+                                            },
+                                            "name": "value"
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "DecimalWrapper",
+                                    "doc": "A record wrapping Decimal logical type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": {
+                                                "type": "bytes",
+                                                "logicalType": "decimal",
+                                                // NOTE: This is equivalent to Spark's [[DoubleDecimal]] and should
+                                                //       be enough for almost any possible use-cases
+                                                "precision": 30,
+                                                "scale": 15
+                                            },
+                                            "name": "value"
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "TimeMicrosWrapper",
+                                    "doc": "A record wrapping Time-micros logical type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": {
+                                                "type": "long",
+                                                "logicalType": "time-micros"
+                                            },
+                                            "name": "value"
+
+                                        }
+                                    ]
+                                },
+                                {
+                                    "namespace": "org.apache.hudi.avro.model",
+                                    "type": "record",
+                                    "name": "TimestampMicrosWrapper",
+                                    "doc": "A record wrapping Timestamp-micros logical type to be able to be used it w/in Avro's Union",
+                                    "fields": [
+                                        {
+                                            "type": {
+                                                "type": "long"
+                                                // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we can't
+                                                //       rely on logical types to do proper encoding of the native Java types,
+                                                //       and hereby have to encode statistic manually
+                                                //"logicalType": "timestamp-micros"
+                                            },
+                                            "name": "value"
+                                        }
+                                    ]
+                                }
+                            ],
+                            "default": null
                         },
                         {
                             "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
                             "name": "maxValue",
                             "type": [
+                                // Those types should be aligned with Parquet `Statistics` impl
+                                // making sure that we implement semantic consistent across file formats
+                                //
+                                // NOTE: Other logical types (decimal, date, timestamp, etc) will be converted
+                                //       into one of the following types, making sure that their corresponding
+                                //       ordering is preserved
                                 "null",
-                                "string"
-                            ]
+                                "org.apache.hudi.avro.model.BooleanWrapper",
+                                "org.apache.hudi.avro.model.IntWrapper",
+                                "org.apache.hudi.avro.model.LongWrapper",
+                                "org.apache.hudi.avro.model.FloatWrapper",
+                                "org.apache.hudi.avro.model.DoubleWrapper",
+                                "org.apache.hudi.avro.model.BytesWrapper",
+                                "org.apache.hudi.avro.model.StringWrapper",
+                                "org.apache.hudi.avro.model.DateWrapper",
+                                "org.apache.hudi.avro.model.DecimalWrapper",
+                                "org.apache.hudi.avro.model.TimeMicrosWrapper",
+                                "org.apache.hudi.avro.model.TimestampMicrosWrapper"
+                            ],
+                            "default": null
                         },
                         {
                             "doc": "Total count of values",
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java b/hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java
new file mode 100644
index 0000000..9d36e21
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.UnresolvedUnionException;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+
+import java.util.Map;
+
+/**
+ * Custom instance of the {@link GenericData} model incorporating conversions from the
+ * common Avro logical types like "decimal", "uuid", "date", "time-micros", "timestamp-micros"
+ *
+ * NOTE: Given that this code has to be interoperable w/ Spark 2 (which relies on Avro 1.8.2)
+ *       this model can't support newer conversion introduced in Avro 1.10 at the moment
+ */
+public class ConvertingGenericData extends GenericData {
+
+  private static final Conversions.DecimalConversion DECIMAL_CONVERSION = new Conversions.DecimalConversion();
+  private static final Conversions.UUIDConversion UUID_CONVERSION = new Conversions.UUIDConversion();
+  private static final TimeConversions.DateConversion DATE_CONVERSION = new TimeConversions.DateConversion();
+  private static final TimeConversions.TimeMicrosConversion TIME_MICROS_CONVERSION = new TimeConversions.TimeMicrosConversion();
+  private static final TimeConversions.TimestampMicrosConversion TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion();
+
+  // NOTE: Those are not supported in Avro 1.8.2
+  // TODO re-enable upon upgrading to 1.10
+  // private static final TimeConversions.TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
+  // private static final TimeConversions.TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
+  // private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion();
+  // private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion();
+
+  public static final GenericData INSTANCE = new ConvertingGenericData();
+
+  private ConvertingGenericData() {
+    addLogicalTypeConversion(DECIMAL_CONVERSION);
+    addLogicalTypeConversion(UUID_CONVERSION);
+    addLogicalTypeConversion(DATE_CONVERSION);
+    addLogicalTypeConversion(TIME_MICROS_CONVERSION);
+    addLogicalTypeConversion(TIMESTAMP_MICROS_CONVERSION);
+    // NOTE: Those are not supported in Avro 1.8.2
+    // TODO re-enable upon upgrading to 1.10
+    // addLogicalTypeConversion(TIME_MILLIS_CONVERSION);
+    // addLogicalTypeConversion(TIMESTAMP_MILLIS_CONVERSION);
+    // addLogicalTypeConversion(LOCAL_TIMESTAMP_MILLIS_CONVERSION);
+    // addLogicalTypeConversion(LOCAL_TIMESTAMP_MICROS_CONVERSION);
+  }
+
+  @Override
+  public boolean validate(Schema schema, Object datum) {
+    switch (schema.getType()) {
+      case RECORD:
+        if (!isRecord(datum)) {
+          return false;
+        }
+        for (Schema.Field f : schema.getFields()) {
+          if (!validate(f.schema(), getField(datum, f.name(), f.pos()))) {
+            return false;
+          }
+        }
+        return true;
+      case ENUM:
+        if (!isEnum(datum)) {
+          return false;
+        }
+        return schema.getEnumSymbols().contains(datum.toString());
+      case ARRAY:
+        if (!(isArray(datum))) {
+          return false;
+        }
+        for (Object element : getArrayAsCollection(datum)) {
+          if (!validate(schema.getElementType(), element)) {
+            return false;
+          }
+        }
+        return true;
+      case MAP:
+        if (!(isMap(datum))) {
+          return false;
+        }
+        @SuppressWarnings(value = "unchecked")
+        Map<Object, Object> map = (Map<Object, Object>) datum;
+        for (Map.Entry<Object, Object> entry : map.entrySet()) {
+          if (!validate(schema.getValueType(), entry.getValue())) {
+            return false;
+          }
+        }
+        return true;
+      case UNION:
+        try {
+          int i = resolveUnion(schema, datum);
+          return validate(schema.getTypes().get(i), datum);
+        } catch (UnresolvedUnionException e) {
+          return false;
+        }
+      case FIXED:
+        return (datum instanceof GenericFixed && ((GenericFixed) datum).bytes().length == schema.getFixedSize())
+            || DECIMAL_CONVERSION.getConvertedType().isInstance(datum);
+      case STRING:
+        return isString(datum)
+            || UUID_CONVERSION.getConvertedType().isInstance(datum);
+      case BYTES:
+        return isBytes(datum)
+            || DECIMAL_CONVERSION.getConvertedType().isInstance(datum);
+      case INT:
+        return isInteger(datum)
+            || DATE_CONVERSION.getConvertedType().isInstance(datum);
+      case LONG:
+        return isLong(datum)
+            || TIME_MICROS_CONVERSION.getConvertedType().isInstance(datum)
+            || TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum);
+      case FLOAT:
+        return isFloat(datum);
+      case DOUBLE:
+        return isDouble(datum);
+      case BOOLEAN:
+        return isBoolean(datum);
+      case NULL:
+        return datum == null;
+      default:
+        return false;
+    }
+  }
+}
+
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 90344ce..237851c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -69,6 +69,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.HashMap;
 import java.util.TimeZone;
 import java.util.Iterator;
@@ -82,9 +83,8 @@ import static org.apache.avro.Schema.Type.UNION;
  */
 public class HoodieAvroUtils {
 
-  private static ThreadLocal<BinaryEncoder> reuseEncoder = ThreadLocal.withInitial(() -> null);
-
-  private static ThreadLocal<BinaryDecoder> reuseDecoder = ThreadLocal.withInitial(() -> null);
+  private static final ThreadLocal<BinaryEncoder> BINARY_ENCODER = ThreadLocal.withInitial(() -> null);
+  private static final ThreadLocal<BinaryDecoder> BINARY_DECODER = ThreadLocal.withInitial(() -> null);
 
   private static final long MILLIS_PER_DAY = 86400000L;
 
@@ -92,9 +92,9 @@ public class HoodieAvroUtils {
   public static final Conversions.DecimalConversion DECIMAL_CONVERSION = new Conversions.DecimalConversion();
 
   // As per https://avro.apache.org/docs/current/spec.html#names
-  private static String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]";
-  private static String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]";
-  private static String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
+  private static final String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]";
+  private static final String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]";
+  private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
 
   // All metadata fields are optional strings.
   public static final Schema METADATA_FIELD_SCHEMA =
@@ -110,10 +110,10 @@ public class HoodieAvroUtils {
   }
 
   public static <T extends IndexedRecord> byte[] indexedRecordToBytes(T record) {
-    GenericDatumWriter<T> writer = new GenericDatumWriter<>(record.getSchema());
+    GenericDatumWriter<T> writer = new GenericDatumWriter<>(record.getSchema(), ConvertingGenericData.INSTANCE);
     try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, reuseEncoder.get());
-      reuseEncoder.set(encoder);
+      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, BINARY_ENCODER.get());
+      BINARY_ENCODER.set(encoder);
       writer.write(record, encoder);
       encoder.flush();
       return out.toByteArray();
@@ -148,8 +148,8 @@ public class HoodieAvroUtils {
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {
-    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, reuseDecoder.get());
-    reuseDecoder.set(decoder);
+    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, BINARY_DECODER.get());
+    BINARY_DECODER.set(decoder);
     GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
     return reader.read(null, decoder);
   }
@@ -391,7 +391,7 @@ public class HoodieAvroUtils {
       }
     }
 
-    if (!GenericData.get().validate(newSchema, newRecord)) {
+    if (!ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) {
       throw new SchemaCompatibilityException(
           "Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema);
     }
@@ -429,9 +429,13 @@ public class HoodieAvroUtils {
 
     if (fieldValue != null) {
       // In case field's value is a nested record, we have to rewrite it as well
-      Object newFieldValue = fieldValue instanceof GenericRecord
-          ? rewriteRecord((GenericRecord) fieldValue, resolveNullableSchema(field.schema()))
-          : fieldValue;
+      Object newFieldValue;
+      if (fieldValue instanceof GenericRecord) {
+        GenericRecord record = (GenericRecord) fieldValue;
+        newFieldValue = rewriteRecord(record, resolveUnionSchema(field.schema(), record.getSchema().getFullName()));
+      } else {
+        newFieldValue = fieldValue;
+      }
       newRecord.put(field.name(), newFieldValue);
     } else if (field.defaultVal() instanceof JsonProperties.Null) {
       newRecord.put(field.name(), null);
@@ -519,6 +523,56 @@ public class HoodieAvroUtils {
   }
 
   /**
+   * Get schema for the given field and record. Field can be nested, denoted by dot notation. e.g: a.b.c
+   *
+   * @param record    - record containing the value of the given field
+   * @param fieldName - name of the field
+   * @return
+   */
+  public static Schema getNestedFieldSchemaFromRecord(GenericRecord record, String fieldName) {
+    String[] parts = fieldName.split("\\.");
+    GenericRecord valueNode = record;
+    int i = 0;
+    for (; i < parts.length; i++) {
+      String part = parts[i];
+      Object val = valueNode.get(part);
+
+      if (i == parts.length - 1) {
+        return resolveNullableSchema(valueNode.getSchema().getField(part).schema());
+      } else {
+        if (!(val instanceof GenericRecord)) {
+          throw new HoodieException("Cannot find a record at part value :" + part);
+        }
+        valueNode = (GenericRecord) val;
+      }
+    }
+    throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldName);
+  }
+
+
+  /**
+   * Get schema for the given field and write schema. Field can be nested, denoted by dot notation. e.g: a.b.c
+   * Use this method when record is not available. Otherwise, prefer to use {@link #getNestedFieldSchemaFromRecord(GenericRecord, String)}
+   *
+   * @param writeSchema - write schema of the record
+   * @param fieldName   -  name of the field
+   * @return
+   */
+  public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema, String fieldName) {
+    String[] parts = fieldName.split("\\.");
+    int i = 0;
+    for (; i < parts.length; i++) {
+      String part = parts[i];
+      Schema schema = writeSchema.getField(part).schema();
+
+      if (i == parts.length - 1) {
+        return resolveNullableSchema(schema);
+      }
+    }
+    throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldName);
+  }
+
+  /**
    * Returns the string value of the given record {@code rec} and field {@code fieldName}.
    * The field and value both could be missing.
    *
@@ -653,7 +707,27 @@ public class HoodieAvroUtils {
     return getRecordColumnValues(record, columns, schema.get(), consistentLogicalTimestampEnabled);
   }
 
-  private static Schema resolveNullableSchema(Schema schema) {
+  private static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullName) {
+    if (schema.getType() != Schema.Type.UNION) {
+      return schema;
+    }
+
+    List<Schema> innerTypes = schema.getTypes();
+    Schema nonNullType =
+        innerTypes.stream()
+            .filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName))
+            .findFirst()
+            .orElse(null);
+
+    if (nonNullType == null) {
+      throw new AvroRuntimeException(
+          String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
+    }
+
+    return nonNullType;
+  }
+
+  public static Schema resolveNullableSchema(Schema schema) {
     if (schema.getType() != Schema.Type.UNION) {
       return schema;
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index 18827c6..020fcc2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -44,7 +44,7 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
   public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code";
 
   public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, Option<BloomFilter> bloomFilterOpt) {
-    super(schema, avroSchema);
+    super(schema, avroSchema, ConvertingGenericData.INSTANCE);
     this.bloomFilterOpt = bloomFilterOpt;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 7c9b7cc..86bb320 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -585,12 +586,21 @@ public class FSUtils {
   }
 
   public static Path getPartitionPath(String basePath, String partitionPath) {
-    return getPartitionPath(new Path(basePath), partitionPath);
+    if (StringUtils.isNullOrEmpty(partitionPath)) {
+      return new Path(basePath);
+    }
+
+    // NOTE: We have to chop leading "/" to make sure Hadoop does not treat it like
+    //       absolute path
+    String properPartitionPath = partitionPath.startsWith("/")
+        ? partitionPath.substring(1)
+        : partitionPath;
+    return getPartitionPath(new Path(basePath), properPartitionPath);
   }
 
   public static Path getPartitionPath(Path basePath, String partitionPath) {
     // FOr non-partitioned table, return only base-path
-    return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath : new Path(basePath, partitionPath);
+    return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new Path(basePath, partitionPath);
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
index 2afbd19..e3c5a70 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
@@ -20,10 +20,7 @@ package org.apache.hudi.common.model;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
-import java.util.Comparator;
 import java.util.Objects;
-import java.util.function.BiFunction;
-import java.util.stream.Stream;
 
 /**
  * Hoodie metadata for the column range of data stored in columnar format (like Parquet)
@@ -45,23 +42,6 @@ public class HoodieColumnRangeMetadata<T extends Comparable> implements Serializ
   private final long totalSize;
   private final long totalUncompressedSize;
 
-  public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> COLUMN_RANGE_MERGE_FUNCTION =
-      (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<Comparable>(
-          newColumnRange.getFilePath(),
-          newColumnRange.getColumnName(),
-          (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
-              .filter(Objects::nonNull)
-              .min(Comparator.naturalOrder())
-              .orElse(null),
-          (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
-              .filter(Objects::nonNull)
-              .max(Comparator.naturalOrder()).orElse(null),
-          oldColumnRange.getNullCount() + newColumnRange.getNullCount(),
-          oldColumnRange.getValueCount() + newColumnRange.getValueCount(),
-          oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(),
-          oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize()
-      );
-
   private HoodieColumnRangeMetadata(String filePath,
                                     String columnName,
                                     @Nullable T minValue,
@@ -168,18 +148,4 @@ public class HoodieColumnRangeMetadata<T extends Comparable> implements Serializ
                                                            String columnName) {
     return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1);
   }
-
-  /**
-   * Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index.
-   */
-  public static final class Stats {
-    public static final String VALUE_COUNT = "value_count";
-    public static final String NULL_COUNT = "null_count";
-    public static final String MIN = "min";
-    public static final String MAX = "max";
-    public static final String TOTAL_SIZE = "total_size";
-    public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size";
-
-    private Stats() {}
-  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 09996db..53ceb00 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -130,8 +130,9 @@ public class HoodieCommitMetadata implements Serializable {
   public HashMap<String, String> getFileIdAndFullPaths(String basePath) {
     HashMap<String, String> fullPaths = new HashMap<>();
     for (Map.Entry<String, String> entry : getFileIdAndRelativePaths().entrySet()) {
-      String fullPath =
-          (entry.getValue() != null) ? (FSUtils.getPartitionPath(basePath, entry.getValue())).toString() : null;
+      String fullPath = entry.getValue() != null
+          ? FSUtils.getPartitionPath(basePath, entry.getValue()).toString()
+          : null;
       fullPaths.put(entry.getKey(), fullPath);
     }
     return fullPaths;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
index cf3bb52..9626e21 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.model;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.hudi.common.util.Option;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -30,13 +29,14 @@ import java.util.Map;
  * Statistics about a single Hoodie delta log operation.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
+@SuppressWarnings("rawtypes")
 public class HoodieDeltaWriteStat extends HoodieWriteStat {
 
   private int logVersion;
   private long logOffset;
   private String baseFile;
   private List<String> logFiles = new ArrayList<>();
-  private Option<RecordsStats<? extends Map>> recordsStats = Option.empty();
+  private Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> recordsStats = Option.empty();
 
   public void setLogVersion(int logVersion) {
     this.logVersion = logVersion;
@@ -74,23 +74,11 @@ public class HoodieDeltaWriteStat extends HoodieWriteStat {
     return logFiles;
   }
 
-  public void setRecordsStats(RecordsStats<? extends Map> stats) {
+  public void setRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) {
     recordsStats = Option.of(stats);
   }
 
-  public Option<RecordsStats<? extends Map>> getRecordsStats() {
+  public Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> getColumnStats() {
     return recordsStats;
   }
-
-  public static class RecordsStats<T> implements Serializable {
-    private final T recordsStats;
-
-    public RecordsStats(T recordsStats) {
-      this.recordsStats = recordsStats;
-    }
-
-    public T getStats() {
-      return recordsStats;
-    }
-  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
index 531a090..cf90eff 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
@@ -40,6 +40,35 @@ public class DateTimeUtils {
       Collections.unmodifiableMap(initMap());
 
   /**
+   * Converts provided microseconds (from epoch) to {@link Instant}
+   */
+  public static Instant microsToInstant(long microsFromEpoch) {
+    long epochSeconds = microsFromEpoch / (1_000_000L);
+    long nanoAdjustment = (microsFromEpoch % (1_000_000L)) * 1_000L;
+
+    return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
+  }
+
+  /**
+   * Converts provided {@link Instant} to microseconds (from epoch)
+   */
+  public static long instantToMicros(Instant instant) {
+    long seconds = instant.getEpochSecond();
+    int nanos = instant.getNano();
+
+    if (seconds < 0 && nanos > 0) {
+      long micros = Math.multiplyExact(seconds + 1, 1_000_000L);
+      long adjustment = (nanos / 1_000L) - 1_000_000;
+
+      return Math.addExact(micros, adjustment);
+    } else {
+      long micros = Math.multiplyExact(seconds, 1_000_000L);
+
+      return Math.addExact(micros, nanos / 1_000L);
+    }
+  }
+
+  /**
    * Parse input String to a {@link java.time.Instant}.
    *
    * @param s Input String should be Epoch time in millisecond or ISO-8601 format.
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 0f45997..01618c5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -18,10 +18,30 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.BooleanWrapper;
+import org.apache.hudi.avro.model.BytesWrapper;
+import org.apache.hudi.avro.model.DateWrapper;
+import org.apache.hudi.avro.model.DecimalWrapper;
+import org.apache.hudi.avro.model.DoubleWrapper;
+import org.apache.hudi.avro.model.FloatWrapper;
 import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.avro.model.IntWrapper;
+import org.apache.hudi.avro.model.LongWrapper;
+import org.apache.hudi.avro.model.StringWrapper;
+import org.apache.hudi.avro.model.TimestampMicrosWrapper;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -35,31 +55,33 @@ import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.io.storage.HoodieHFileReader;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
+import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartition;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;
 
 /**
  * MetadataTable records are persisted with the schema defined in HoodieMetadata.avsc.
@@ -119,6 +141,8 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
   public static final String COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE = "totalUncompressedSize";
   public static final String COLUMN_STATS_FIELD_IS_DELETED = FIELD_IS_DELETED;
 
+  private static final Conversions.DecimalConversion AVRO_DECIMAL_CONVERSION = new Conversions.DecimalConversion();
+
   private String key = null;
   private int type = 0;
   private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
@@ -180,8 +204,8 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
           columnStatMetadata = HoodieMetadataColumnStats.newBuilder()
               .setFileName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME))
               .setColumnName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME))
-              .setMinValue((String) columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))
-              .setMaxValue((String) columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))
+              .setMinValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))
+              .setMaxValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))
               .setValueCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
               .setNullCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
               .setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
@@ -351,7 +375,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
     HoodieMetadataColumnStats previousColStatsRecord = previousRecord.getColumnStatMetadata().get();
     HoodieMetadataColumnStats newColumnStatsRecord = getColumnStatMetadata().get();
 
-    return HoodieTableMetadataUtil.mergeColumnStats(previousColStatsRecord, newColumnStatsRecord);
+    return mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord);
   }
 
   @Override
@@ -531,29 +555,69 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
     return getColumnStatsIndexKey(partitionIndexID, fileIndexID, columnIndexID);
   }
 
-  public static Stream<HoodieRecord> createColumnStatsRecords(
-      String partitionName, Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList, boolean isDeleted) {
+  public static Stream<HoodieRecord> createColumnStatsRecords(String partitionName,
+                                                              Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+                                                              boolean isDeleted) {
     return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
       HoodieKey key = new HoodieKey(getColumnStatsIndexKey(partitionName, columnRangeMetadata),
           MetadataPartitionType.COLUMN_STATS.getPartitionPath());
+
       HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(),
           HoodieMetadataColumnStats.newBuilder()
               .setFileName(new Path(columnRangeMetadata.getFilePath()).getName())
               .setColumnName(columnRangeMetadata.getColumnName())
-              .setMinValue(columnRangeMetadata.getMinValue() == null ? null :
-                  columnRangeMetadata.getMinValue().toString())
-              .setMaxValue(columnRangeMetadata.getMaxValue() == null ? null :
-                  columnRangeMetadata.getMaxValue().toString())
+              .setMinValue(wrapStatisticValue(columnRangeMetadata.getMinValue()))
+              .setMaxValue(wrapStatisticValue(columnRangeMetadata.getMaxValue()))
               .setNullCount(columnRangeMetadata.getNullCount())
               .setValueCount(columnRangeMetadata.getValueCount())
               .setTotalSize(columnRangeMetadata.getTotalSize())
               .setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
               .setIsDeleted(isDeleted)
               .build());
+
       return new HoodieAvroRecord<>(key, payload);
     });
   }
 
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
+                                                                   HoodieMetadataColumnStats newColumnStats) {
+    checkArgument(Objects.equals(prevColumnStats.getFileName(), newColumnStats.getFileName()));
+    checkArgument(Objects.equals(prevColumnStats.getColumnName(), newColumnStats.getColumnName()));
+
+    if (newColumnStats.getIsDeleted()) {
+      return newColumnStats;
+    }
+
+    Comparable minValue =
+        (Comparable) Stream.of(
+            (Comparable) unwrapStatisticValueWrapper(prevColumnStats.getMinValue()),
+            (Comparable) unwrapStatisticValueWrapper(newColumnStats.getMinValue()))
+        .filter(Objects::nonNull)
+        .min(Comparator.naturalOrder())
+        .orElse(null);
+
+    Comparable maxValue =
+        (Comparable) Stream.of(
+            (Comparable) unwrapStatisticValueWrapper(prevColumnStats.getMinValue()),
+            (Comparable) unwrapStatisticValueWrapper(newColumnStats.getMinValue()))
+        .filter(Objects::nonNull)
+        .max(Comparator.naturalOrder())
+        .orElse(null);
+
+    return HoodieMetadataColumnStats.newBuilder()
+        .setFileName(newColumnStats.getFileName())
+        .setColumnName(newColumnStats.getColumnName())
+        .setMinValue(wrapStatisticValue(minValue))
+        .setMaxValue(wrapStatisticValue(maxValue))
+        .setValueCount(prevColumnStats.getValueCount() + newColumnStats.getValueCount())
+        .setNullCount(prevColumnStats.getNullCount() + newColumnStats.getNullCount())
+        .setTotalSize(prevColumnStats.getTotalSize() + newColumnStats.getTotalSize())
+        .setTotalUncompressedSize(prevColumnStats.getTotalUncompressedSize() + newColumnStats.getTotalUncompressedSize())
+        .setIsDeleted(newColumnStats.getIsDeleted())
+        .build();
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("HoodieMetadataPayload {");
@@ -579,6 +643,85 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
     return sb.toString();
   }
 
+  private static Object wrapStatisticValue(Comparable<?> statValue) {
+    if (statValue == null) {
+      return null;
+    } else if (statValue instanceof Date || statValue instanceof LocalDate) {
+      // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we can't
+      //       rely on logical types to do proper encoding of the native Java types,
+      //       and hereby have to encode statistic manually
+      LocalDate localDate = statValue instanceof LocalDate
+          ? (LocalDate) statValue
+          : ((Date) statValue).toLocalDate();
+      return DateWrapper.newBuilder().setValue((int) localDate.toEpochDay()).build();
+    } else if (statValue instanceof BigDecimal) {
+      Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
+      BigDecimal upcastDecimal = tryUpcastDecimal((BigDecimal) statValue, (LogicalTypes.Decimal) valueSchema.getLogicalType());
+      return DecimalWrapper.newBuilder()
+          .setValue(AVRO_DECIMAL_CONVERSION.toBytes(upcastDecimal, valueSchema, valueSchema.getLogicalType()))
+          .build();
+    } else if (statValue instanceof Timestamp) {
+      // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we can't
+      //       rely on logical types to do proper encoding of the native Java types,
+      //       and hereby have to encode statistic manually
+      Instant instant = ((Timestamp) statValue).toInstant();
+      return TimestampMicrosWrapper.newBuilder()
+          .setValue(instantToMicros(instant))
+          .build();
+    } else if (statValue instanceof Boolean) {
+      return BooleanWrapper.newBuilder().setValue((Boolean) statValue).build();
+    } else if (statValue instanceof Integer) {
+      return IntWrapper.newBuilder().setValue((Integer) statValue).build();
+    } else if (statValue instanceof Long) {
+      return LongWrapper.newBuilder().setValue((Long) statValue).build();
+    } else if (statValue instanceof Float) {
+      return FloatWrapper.newBuilder().setValue((Float) statValue).build();
+    } else if (statValue instanceof Double) {
+      return DoubleWrapper.newBuilder().setValue((Double) statValue).build();
+    } else if (statValue instanceof ByteBuffer) {
+      return BytesWrapper.newBuilder().setValue((ByteBuffer) statValue).build();
+    } else if (statValue instanceof String || statValue instanceof Utf8) {
+      return StringWrapper.newBuilder().setValue(statValue.toString()).build();
+    } else {
+      throw new UnsupportedOperationException(String.format("Unsupported type of the statistic (%s)", statValue.getClass()));
+    }
+  }
+
+  public static Comparable<?> unwrapStatisticValueWrapper(Object statValueWrapper) {
+    if (statValueWrapper == null) {
+      return null;
+    } else if (statValueWrapper instanceof DateWrapper) {
+      return LocalDate.ofEpochDay(((DateWrapper) statValueWrapper).getValue());
+    } else if (statValueWrapper instanceof DecimalWrapper) {
+      Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
+      return AVRO_DECIMAL_CONVERSION.fromBytes(((DecimalWrapper) statValueWrapper).getValue(), valueSchema, valueSchema.getLogicalType());
+    } else if (statValueWrapper instanceof TimestampMicrosWrapper) {
+      return microsToInstant(((TimestampMicrosWrapper) statValueWrapper).getValue());
+    } else if (statValueWrapper instanceof BooleanWrapper) {
+      return ((BooleanWrapper) statValueWrapper).getValue();
+    } else if (statValueWrapper instanceof IntWrapper) {
+      return ((IntWrapper) statValueWrapper).getValue();
+    } else if (statValueWrapper instanceof LongWrapper) {
+      return ((LongWrapper) statValueWrapper).getValue();
+    } else if (statValueWrapper instanceof FloatWrapper) {
+      return ((FloatWrapper) statValueWrapper).getValue();
+    } else if (statValueWrapper instanceof DoubleWrapper) {
+      return ((DoubleWrapper) statValueWrapper).getValue();
+    } else if (statValueWrapper instanceof BytesWrapper) {
+      return ((BytesWrapper) statValueWrapper).getValue();
+    } else if (statValueWrapper instanceof StringWrapper) {
+      return ((StringWrapper) statValueWrapper).getValue();
+    } else if (statValueWrapper instanceof GenericRecord) {
+      // NOTE: This branch could be hit b/c Avro records could be reconstructed
+      //       as {@code GenericRecord)
+      // TODO add logical type decoding
+      GenericRecord record = (GenericRecord) statValueWrapper;
+      return (Comparable<?>) record.get("value");
+    } else {
+      throw new UnsupportedOperationException(String.format("Unsupported type of the statistic (%s)", statValueWrapper.getClass()));
+    }
+  }
+
   private static void validatePayload(int type, Map<String, HoodieMetadataFileInfo> filesystemMetadata) {
     if (type == METADATA_TYPE_FILE_LIST) {
       filesystemMetadata.forEach((fileName, fileInfo) -> {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 9e3eca3..63271cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -18,6 +18,14 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.ConvertingGenericData;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -55,18 +63,13 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import javax.annotation.Nonnull;
-
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -77,23 +80,19 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
-import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MIN;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_COUNT;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT;
+import static org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
+import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
+import static org.apache.hudi.avro.HoodieAvroUtils.resolveNullableSchema;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
 import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
 import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
 
@@ -109,6 +108,100 @@ public class HoodieTableMetadataUtil {
   protected static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
 
   /**
+   * Collects {@link HoodieColumnRangeMetadata} for the provided collection of records, pretending
+   * as if provided records have been persisted w/in given {@code filePath}
+   *
+   * @param records target records to compute column range metadata for
+   * @param targetFields columns (fields) to be collected
+   * @param filePath file path value required for {@link HoodieColumnRangeMetadata}
+   *
+   * @return map of {@link HoodieColumnRangeMetadata} for each of the provided target fields for
+   *         the collection of provided records
+   */
+  public static Map<String, HoodieColumnRangeMetadata<Comparable>> collectColumnRangeMetadata(List<IndexedRecord> records,
+                                                                                              List<Schema.Field> targetFields,
+                                                                                              String filePath) {
+    // Helper class to calculate column stats
+    class ColumnStats {
+      Object minValue;
+      Object maxValue;
+      long nullCount;
+      long valueCount;
+    }
+
+    HashMap<String, ColumnStats> allColumnStats = new HashMap<>();
+
+    // Collect stats for all columns by iterating through records while accounting
+    // corresponding stats
+    records.forEach((record) -> {
+      // For each column (field) we have to index update corresponding column stats
+      // with the values from this record
+      targetFields.forEach(field -> {
+        ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(), (ignored) -> new ColumnStats());
+
+        GenericRecord genericRecord = (GenericRecord) record;
+
+        final Object fieldVal = convertValueForSpecificDataTypes(field.schema(), genericRecord.get(field.name()), true);
+        final Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name());
+
+        if (fieldVal != null && canCompare(fieldSchema)) {
+          // Set the min value of the field
+          if (colStats.minValue == null
+              || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.minValue, fieldSchema) < 0) {
+            colStats.minValue = fieldVal;
+          }
+
+          // Set the max value of the field
+          if (colStats.maxValue == null || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.maxValue, fieldSchema) > 0) {
+            colStats.maxValue = fieldVal;
+          }
+
+          colStats.valueCount++;
+        } else {
+          colStats.nullCount++;
+        }
+      });
+    });
+
+    Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, HoodieColumnRangeMetadata<Comparable>>> collector =
+        Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity());
+
+    return (Map<String, HoodieColumnRangeMetadata<Comparable>>) targetFields.stream()
+      .map(field -> {
+        ColumnStats colStats = allColumnStats.get(field.name());
+        return HoodieColumnRangeMetadata.<Comparable>create(
+            filePath,
+            field.name(),
+            colStats == null ? null : coerceToComparable(field.schema(), colStats.minValue),
+            colStats == null ? null : coerceToComparable(field.schema(), colStats.maxValue),
+            colStats == null ? 0 : colStats.nullCount,
+            colStats == null ? 0 : colStats.valueCount,
+            // NOTE: Size and compressed size statistics are set to 0 to make sure we're not
+            //       mixing up those provided by Parquet with the ones from other encodings,
+            //       since those are not directly comparable
+            0,
+            0
+        );
+      })
+      .collect(collector);
+  }
+
+  /**
+   * Converts instance of {@link HoodieMetadataColumnStats} to {@link HoodieColumnRangeMetadata}
+   */
+  public static HoodieColumnRangeMetadata<Comparable> convertColumnStatsRecordToColumnRangeMetadata(HoodieMetadataColumnStats columnStats) {
+    return HoodieColumnRangeMetadata.<Comparable>create(
+        columnStats.getFileName(),
+        columnStats.getColumnName(),
+        unwrapStatisticValueWrapper(columnStats.getMinValue()),
+        unwrapStatisticValueWrapper(columnStats.getMaxValue()),
+        columnStats.getNullCount(),
+        columnStats.getValueCount(),
+        columnStats.getTotalSize(),
+        columnStats.getTotalUncompressedSize());
+  }
+
+  /**
    * Delete the metadata table for the dataset. This will be invoked during upgrade/downgrade operation during which
    * no other
    * process should be running.
@@ -457,8 +550,11 @@ public class HoodieTableMetadataUtil {
     int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
     return engineContext.parallelize(deleteFileList, parallelism)
         .flatMap(deleteFileInfoPair -> {
-          if (deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-            return getColumnStats(deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), dataTableMetaClient, columnsToIndex, true).iterator();
+          String partitionPath = deleteFileInfoPair.getLeft();
+          String filePath = deleteFileInfoPair.getRight();
+
+          if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+            return getColumnStatsRecords(partitionPath, filePath, dataTableMetaClient, columnsToIndex, true).iterator();
           }
           return Collections.emptyListIterator();
         });
@@ -531,7 +627,8 @@ public class HoodieTableMetadataUtil {
     }
 
     if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) {
-      final HoodieData<HoodieRecord> metadataColumnStatsRDD = convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
+      final HoodieData<HoodieRecord> metadataColumnStatsRDD =
+          convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
       partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD);
     }
 
@@ -815,7 +912,7 @@ public class HoodieTableMetadataUtil {
 
       return deletedFileList.stream().flatMap(deletedFile -> {
         final String filePathWithPartition = partitionName + "/" + deletedFile;
-        return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, true);
+        return getColumnStatsRecords(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, true);
       }).iterator();
     });
     allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
@@ -836,7 +933,7 @@ public class HoodieTableMetadataUtil {
           return Stream.empty();
         }
         final String filePathWithPartition = partitionName + "/" + appendedFileNameLengthEntry.getKey();
-        return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, false);
+        return getColumnStatsRecords(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, false);
       }).iterator();
 
     });
@@ -1014,63 +1111,65 @@ public class HoodieTableMetadataUtil {
     return Arrays.asList(tableConfig.getRecordKeyFields().get());
   }
 
-  public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats prevColumnStatsRecord,
-                                                           HoodieMetadataColumnStats newColumnStatsRecord) {
-    checkArgument(prevColumnStatsRecord.getFileName().equals(newColumnStatsRecord.getFileName()));
-    checkArgument(prevColumnStatsRecord.getColumnName().equals(newColumnStatsRecord.getColumnName()));
-
-    if (newColumnStatsRecord.getIsDeleted()) {
-      return newColumnStatsRecord;
+  private static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat,
+                                                                     HoodieTableMetaClient datasetMetaClient,
+                                                                     List<String> columnsToIndex) {
+    if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getColumnStats().isPresent()) {
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = ((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
+      Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnRangeMap.values();
+      return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
     }
 
-    return HoodieMetadataColumnStats.newBuilder()
-        .setFileName(newColumnStatsRecord.getFileName())
-        .setColumnName(newColumnStatsRecord.getColumnName())
-        .setMinValue(Stream.of(prevColumnStatsRecord.getMinValue(), newColumnStatsRecord.getMinValue()).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null))
-        .setMaxValue(Stream.of(prevColumnStatsRecord.getMinValue(), newColumnStatsRecord.getMinValue()).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null))
-        .setValueCount(prevColumnStatsRecord.getValueCount() + newColumnStatsRecord.getValueCount())
-        .setNullCount(prevColumnStatsRecord.getNullCount() + newColumnStatsRecord.getNullCount())
-        .setTotalSize(prevColumnStatsRecord.getTotalSize() + newColumnStatsRecord.getTotalSize())
-        .setTotalUncompressedSize(prevColumnStatsRecord.getTotalUncompressedSize() + newColumnStatsRecord.getTotalUncompressedSize())
-        .setIsDeleted(newColumnStatsRecord.getIsDeleted())
-        .build();
+    return getColumnStatsRecords(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
   }
 
-  public static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat,
-                                                                     HoodieTableMetaClient datasetMetaClient,
-                                                                     List<String> columnsToIndex) {
-    if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getRecordsStats().isPresent()) {
-      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = ((HoodieDeltaWriteStat) writeStat).getRecordsStats().get().getStats();
-      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>(columnRangeMap.values());
-      return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
+  private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath,
+                                                            String filePath,
+                                                            HoodieTableMetaClient datasetMetaClient,
+                                                            List<String> columnsToIndex,
+                                                            boolean isDeleted) {
+    String partitionName = getPartition(partitionPath);
+    // NOTE: We have to chop leading "/" to make sure Hadoop does not treat it like
+    //       absolute path
+    String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath;
+    String fileName = partitionName.equals(NON_PARTITIONED_NAME)
+        ? filePartitionPath
+        : filePartitionPath.substring(partitionName.length() + 1);
+
+    if (isDeleted) {
+      // TODO we should delete records instead of stubbing them
+      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnsToIndex.stream()
+          .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
+          .collect(Collectors.toList());
+
+      return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, true);
     }
-    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
+
+    List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
+        readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, columnsToIndex);
+
+    return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadata, false);
   }
 
-  private static Stream<HoodieRecord> getColumnStats(final String partitionPath, final String filePathWithPartition,
-                                                     HoodieTableMetaClient datasetMetaClient,
-                                                     List<String> columnsToIndex,
-                                                     boolean isDeleted) {
-    final String partition = getPartition(partitionPath);
-    final int offset = partition.equals(NON_PARTITIONED_NAME) ? (filePathWithPartition.startsWith("/") ? 1 : 0)
-        : partition.length() + 1;
-    final String fileName = filePathWithPartition.substring(offset);
-
-    if (filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-      final Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePathWithPartition);
-      List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList;
-      if (!isDeleted) {
-        columnRangeMetadataList = new ParquetUtils().readRangeFromParquetMetadata(
-            datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex);
-      } else {
-        // TODO we should delete records instead of stubbing them
-        columnRangeMetadataList =
-            columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
-                .collect(Collectors.toList());
+  private static List<HoodieColumnRangeMetadata<Comparable>> readColumnRangeMetadataFrom(String filePath,
+                                                                                         HoodieTableMetaClient datasetMetaClient,
+                                                                                         List<String> columnsToIndex) {
+    try {
+      if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+        Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePath);
+        List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
+            new ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex);
+
+        return columnRangeMetadataList;
       }
-      return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted);
-    } else {
-      throw new HoodieException("Column range index not supported for filePathWithPartition " + fileName);
+
+      LOG.warn("Column range index not supported for: " + filePath);
+      return Collections.emptyList();
+    } catch (Exception e) {
+      // NOTE: In case reading column range metadata from individual file failed,
+      //       we simply fall back, in lieu of failing the whole task
+      LOG.error("Failed to fetch column range metadata for: " + filePath);
+      return Collections.emptyList();
     }
   }
 
@@ -1105,72 +1204,37 @@ public class HoodieTableMetadataUtil {
   }
 
   /**
-   * Accumulates column range metadata for the given field and updates the column range map.
-   *
-   * @param field          - column for which statistics will be computed
-   * @param filePath       - data file path
-   * @param columnRangeMap - old column range statistics, which will be merged in this computation
-   * @param columnToStats  - map of column to map of each stat and its value
-   */
-  public static void accumulateColumnRanges(Schema.Field field, String filePath,
-                                            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
-                                            Map<String, Map<String, Object>> columnToStats) {
-    Map<String, Object> columnStats = columnToStats.get(field.name());
-    HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = HoodieColumnRangeMetadata.create(
-        filePath,
-        field.name(),
-        (Comparable) String.valueOf(columnStats.get(MIN)),
-        (Comparable) String.valueOf(columnStats.get(MAX)),
-        Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
-        Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
-        Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
-        Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
-    );
-    columnRangeMap.merge(field.name(), columnRangeMetadata, COLUMN_RANGE_MERGE_FUNCTION);
-  }
-
-  /**
-   * Aggregates column stats for each field.
-   *
-   * @param record                            - current record
-   * @param fields                            - fields for which stats will be aggregated
-   * @param columnToStats                     - map of column to map of each stat and its value which gets updates in this method
-   * @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value
+   * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by
+   * the {@link org.apache.avro.LogicalTypes.Decimal} Avro logical type
    */
-  public static void aggregateColumnStats(IndexedRecord record, List<Schema.Field> fields,
-                                          Map<String, Map<String, Object>> columnToStats,
-                                          boolean consistentLogicalTimestampEnabled) {
-    if (!(record instanceof GenericRecord)) {
-      throw new HoodieIOException("Record is not a generic type to get column range metadata!");
+  public static BigDecimal tryUpcastDecimal(BigDecimal value, final LogicalTypes.Decimal decimal) {
+    final int scale = decimal.getScale();
+    final int valueScale = value.scale();
+
+    boolean scaleAdjusted = false;
+    if (valueScale != scale) {
+      try {
+        value = value.setScale(scale, RoundingMode.UNNECESSARY);
+        scaleAdjusted = true;
+      } catch (ArithmeticException aex) {
+        throw new AvroTypeException(
+            "Cannot encode decimal with scale " + valueScale + " as scale " + scale + " without rounding");
+      }
     }
 
-    fields.forEach(field -> {
-      Map<String, Object> columnStats = columnToStats.getOrDefault(field.name(), new HashMap<>());
-      final String fieldVal = getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled);
-      // update stats
-      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
-      columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
-      columnStats.put(TOTAL_UNCOMPRESSED_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + fieldSize);
-
-      if (!isNullOrEmpty(fieldVal)) {
-        // set the min value of the field
-        if (!columnStats.containsKey(MIN)) {
-          columnStats.put(MIN, fieldVal);
-        }
-        if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {
-          columnStats.put(MIN, fieldVal);
-        }
-        // set the max value of the field
-        if (fieldVal.compareTo(String.valueOf(columnStats.getOrDefault(MAX, ""))) > 0) {
-          columnStats.put(MAX, fieldVal);
-        }
-        // increment non-null value count
-        columnStats.put(VALUE_COUNT, Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()) + 1);
+    int precision = decimal.getPrecision();
+    int valuePrecision = value.precision();
+    if (valuePrecision > precision) {
+      if (scaleAdjusted) {
+        throw new AvroTypeException("Cannot encode decimal with precision " + valuePrecision + " as max precision "
+            + precision + ". This is after safely adjusting scale from " + valueScale + " to required " + scale);
       } else {
-        // increment null value count
-        columnStats.put(NULL_COUNT, Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()) + 1);
+        throw new AvroTypeException(
+            "Cannot encode decimal with precision " + valuePrecision + " as max precision " + precision);
       }
-    });
+    }
+
+    return value;
   }
 
   private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) {
@@ -1178,14 +1242,82 @@ public class HoodieTableMetadataUtil {
       return Option.empty();
     }
 
-    TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient);
     try {
+      TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient);
       return Option.of(schemaResolver.getTableAvroSchema());
     } catch (Exception e) {
       throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e);
     }
   }
 
+  /**
+   * Given a schema, coerces provided value to instance of {@link Comparable<?>} such that
+   * it could subsequently used in column stats
+   *
+   * NOTE: This method has to stay compatible with the semantic of
+   *      {@link ParquetUtils#readRangeFromParquetMetadata} as they are used in tandem
+   */
+  private static Comparable<?> coerceToComparable(Schema schema, Object val) {
+    if (val == null) {
+      return null;
+    }
+
+    switch (schema.getType()) {
+      case UNION:
+        // TODO we need to handle unions in general case as well
+        return coerceToComparable(resolveNullableSchema(schema), val);
+
+      case FIXED:
+      case BYTES:
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          return (Comparable<?>) val;
+        }
+        return (ByteBuffer) val;
+
+
+      case INT:
+        if (schema.getLogicalType() == LogicalTypes.date()
+            || schema.getLogicalType() == LogicalTypes.timeMillis()) {
+          // NOTE: This type will be either {@code java.sql.Date} or {org.joda.LocalDate}
+          //       depending on the Avro version. Hence, we simply cast it to {@code Comparable<?>}
+          return (Comparable<?>) val;
+        }
+        return (Integer) val;
+
+      case LONG:
+        if (schema.getLogicalType() == LogicalTypes.timeMicros()
+            || schema.getLogicalType() == LogicalTypes.timestampMicros()
+            || schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+          // NOTE: This type will be either {@code java.sql.Date} or {org.joda.LocalDate}
+          //       depending on the Avro version. Hence, we simply cast it to {@code Comparable<?>}
+          return (Comparable<?>) val;
+        }
+        return (Long) val;
+
+      case STRING:
+      case FLOAT:
+      case DOUBLE:
+      case BOOLEAN:
+        return (Comparable<?>) val;
+
+
+      // TODO add support for those types
+      case ENUM:
+      case MAP:
+      case NULL:
+      case RECORD:
+      case ARRAY:
+        return null;
+
+      default:
+        throw new IllegalStateException("Unexpected type: " + schema.getType());
+    }
+  }
+
+  private static boolean canCompare(Schema schema) {
+    return schema.getType() != Schema.Type.MAP;
+  }
+
   public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableConfig) {
     return StringUtils.toSet(tableConfig.getMetadataPartitionsInflight());
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 9fb268e..85505c0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -31,6 +31,8 @@ public enum MetadataPartitionType {
   // FileId prefix used for all file groups in this partition.
   private final String fileIdPrefix;
   // Total file groups
+  // TODO fix: enum should not have any mutable aspect as this compromises whole idea
+  //      of the inum being static, immutable entity
   private int fileGroupCount = 1;
 
   MetadataPartitionType(final String partitionPath, final String fileIdPrefix) {
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index e64964e..8c57dc8 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.avro;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import org.apache.avro.JsonProperties;
@@ -27,12 +28,14 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -88,6 +91,12 @@ public class TestHoodieAvroUtils {
       + "{\"name\":\"decimal_col\",\"type\":[\"null\","
       + "{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":8,\"scale\":4}],\"default\":null}]}";
 
+  private static String SCHEMA_WITH_NESTED_FIELD = "{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+      + "{\"name\":\"firstname\",\"type\":\"string\"},"
+      + "{\"name\":\"lastname\",\"type\":\"string\"},"
+      + "{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
+      + "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}";
+
   @Test
   public void testPropsPresent() {
     Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA));
@@ -248,7 +257,7 @@ public class TestHoodieAvroUtils {
   }
 
   @Test
-  public void testGetNestedFieldValWithDecimalFiled() {
+  public void testGetNestedFieldValWithDecimalField() {
     GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD));
     rec.put("key_col", "key");
     BigDecimal bigDecimal = new BigDecimal("1234.5678");
@@ -264,4 +273,36 @@ public class TestHoodieAvroUtils {
     assertEquals(0, buffer.position());
   }
 
+  @Test
+  public void testGetNestedFieldSchema() throws IOException {
+    Schema schema = SchemaTestUtil.getEvolvedSchema();
+    GenericRecord rec = new GenericData.Record(schema);
+    rec.put("field1", "key1");
+    rec.put("field2", "val1");
+    rec.put("name", "val2");
+    rec.put("favorite_number", 2);
+    // test simple field schema
+    assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(rec.getSchema(), "field1"));
+
+    GenericRecord rec2 = new GenericData.Record(schema);
+    rec2.put("field1", "key1");
+    rec2.put("field2", "val1");
+    rec2.put("name", "val2");
+    rec2.put("favorite_number", 12);
+    // test comparison of non-string type
+    assertEquals(-1, GenericData.get().compare(rec.get("favorite_number"), rec2.get("favorite_number"), getNestedFieldSchemaFromWriteSchema(rec.getSchema(), "favorite_number")));
+
+    // test nested field schema
+    Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);
+    GenericRecord rec3 = new GenericData.Record(nestedSchema);
+    rec3.put("firstname", "person1");
+    rec3.put("lastname", "person2");
+    GenericRecord studentRecord = new GenericData.Record(rec3.getSchema().getField("student").schema());
+    studentRecord.put("firstname", "person1");
+    studentRecord.put("lastname", "person2");
+    rec3.put("student", studentRecord);
+
+    assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(rec3.getSchema(), "student.firstname"));
+    assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(nestedSchema, "student.firstname"));
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 5b3dabd..71917f9 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -1749,40 +1749,39 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     FileCreateUtils.createDeltaCommit(basePath, "100", fs);
 
-    HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(),
-        fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(),
-        bufferSize, readBlocksLazily, true);
+    HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
+    try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
 
-    assertTrue(reader.hasPrev(), "Last block should be available");
-    HoodieLogBlock prevBlock = reader.prev();
-    HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
+      assertTrue(reader.hasPrev(), "Last block should be available");
+      HoodieLogBlock prevBlock = reader.prev();
+      HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
 
-    List<IndexedRecord> recordsRead1 = getRecords(dataBlockRead);
-    assertEquals(copyOfRecords3.size(), recordsRead1.size(),
-        "Third records size should be equal to the written records size");
-    assertEquals(copyOfRecords3, recordsRead1,
-        "Both records lists should be the same. (ordering guaranteed)");
+      List<IndexedRecord> recordsRead1 = getRecords(dataBlockRead);
+      assertEquals(copyOfRecords3.size(), recordsRead1.size(),
+          "Third records size should be equal to the written records size");
+      assertEquals(copyOfRecords3, recordsRead1,
+          "Both records lists should be the same. (ordering guaranteed)");
 
-    assertTrue(reader.hasPrev(), "Second block should be available");
-    prevBlock = reader.prev();
-    dataBlockRead = (HoodieDataBlock) prevBlock;
-    List<IndexedRecord> recordsRead2 = getRecords(dataBlockRead);
-    assertEquals(copyOfRecords2.size(), recordsRead2.size(),
-        "Read records size should be equal to the written records size");
-    assertEquals(copyOfRecords2, recordsRead2,
-        "Both records lists should be the same. (ordering guaranteed)");
+      assertTrue(reader.hasPrev(), "Second block should be available");
+      prevBlock = reader.prev();
+      dataBlockRead = (HoodieDataBlock) prevBlock;
+      List<IndexedRecord> recordsRead2 = getRecords(dataBlockRead);
+      assertEquals(copyOfRecords2.size(), recordsRead2.size(),
+          "Read records size should be equal to the written records size");
+      assertEquals(copyOfRecords2, recordsRead2,
+          "Both records lists should be the same. (ordering guaranteed)");
 
-    assertTrue(reader.hasPrev(), "First block should be available");
-    prevBlock = reader.prev();
-    dataBlockRead = (HoodieDataBlock) prevBlock;
-    List<IndexedRecord> recordsRead3 = getRecords(dataBlockRead);
-    assertEquals(copyOfRecords1.size(), recordsRead3.size(),
-        "Read records size should be equal to the written records size");
-    assertEquals(copyOfRecords1, recordsRead3,
-        "Both records lists should be the same. (ordering guaranteed)");
+      assertTrue(reader.hasPrev(), "First block should be available");
+      prevBlock = reader.prev();
+      dataBlockRead = (HoodieDataBlock) prevBlock;
+      List<IndexedRecord> recordsRead3 = getRecords(dataBlockRead);
+      assertEquals(copyOfRecords1.size(), recordsRead3.size(),
+          "Read records size should be equal to the written records size");
+      assertEquals(copyOfRecords1, recordsRead3,
+          "Both records lists should be the same. (ordering guaranteed)");
 
-    assertFalse(reader.hasPrev());
-    reader.close();
+      assertFalse(reader.hasPrev());
+    }
   }
 
   @ParameterizedTest
@@ -1830,19 +1829,20 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     writer.close();
 
     // First round of reads - we should be able to read the first block and then EOF
-    HoodieLogFileReader reader =
-        new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(),
-            fs.getFileStatus(writer.getLogFile().getPath()).getLen()), schema, bufferSize, readBlocksLazily, true);
+    HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
 
-    assertTrue(reader.hasPrev(), "Last block should be available");
-    HoodieLogBlock block = reader.prev();
-    assertTrue(block instanceof HoodieDataBlock, "Last block should be datablock");
+    try (HoodieLogFileReader reader =
+        new HoodieLogFileReader(fs, logFile, schema, bufferSize, readBlocksLazily, true)) {
 
-    assertTrue(reader.hasPrev(), "Last block should be available");
-    assertThrows(CorruptedLogFileException.class, () -> {
-      reader.prev();
-    });
-    reader.close();
+      assertTrue(reader.hasPrev(), "Last block should be available");
+      HoodieLogBlock block = reader.prev();
+      assertTrue(block instanceof HoodieDataBlock, "Last block should be datablock");
+
+      assertTrue(reader.hasPrev(), "Last block should be available");
+      assertThrows(CorruptedLogFileException.class, () -> {
+        reader.prev();
+      });
+    }
   }
 
   @ParameterizedTest
@@ -1882,28 +1882,28 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
     FileCreateUtils.createDeltaCommit(basePath, "100", fs);
 
-    HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(),
-        fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(),
-        bufferSize, readBlocksLazily, true);
+    HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
+    try (HoodieLogFileReader reader =
+             new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
 
-    assertTrue(reader.hasPrev(), "Third block should be available");
-    reader.moveToPrev();
+      assertTrue(reader.hasPrev(), "Third block should be available");
+      reader.moveToPrev();
 
-    assertTrue(reader.hasPrev(), "Second block should be available");
-    reader.moveToPrev();
+      assertTrue(reader.hasPrev(), "Second block should be available");
+      reader.moveToPrev();
 
-    // After moving twice, this last reader.prev() should read the First block written
-    assertTrue(reader.hasPrev(), "First block should be available");
-    HoodieLogBlock prevBlock = reader.prev();
-    HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
-    List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
-    assertEquals(copyOfRecords1.size(), recordsRead.size(),
-        "Read records size should be equal to the written records size");
-    assertEquals(copyOfRecords1, recordsRead,
-        "Both records lists should be the same. (ordering guaranteed)");
+      // After moving twice, this last reader.prev() should read the First block written
+      assertTrue(reader.hasPrev(), "First block should be available");
+      HoodieLogBlock prevBlock = reader.prev();
+      HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
+      List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
+      assertEquals(copyOfRecords1.size(), recordsRead.size(),
+          "Read records size should be equal to the written records size");
+      assertEquals(copyOfRecords1, recordsRead,
+          "Both records lists should be the same. (ordering guaranteed)");
 
-    assertFalse(reader.hasPrev());
-    reader.close();
+      assertFalse(reader.hasPrev());
+    }
   }
 
   @Test
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index a403f92..c80c5e2 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -99,15 +99,6 @@ public class FileCreateUtils {
     return String.format("%s_%s_%s%s%s.%s", fileId, WRITE_TOKEN, instantTime, fileExtension, HoodieTableMetaClient.MARKER_EXTN, ioType);
   }
 
-  private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException {
-    Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
-    Files.createDirectories(parentPath);
-    Path metaFilePath = parentPath.resolve(instantTime + suffix);
-    if (Files.notExists(metaFilePath)) {
-      Files.createFile(metaFilePath);
-    }
-  }
-
   private static void createMetaFile(String basePath, String instantTime, String suffix, FileSystem fs) throws IOException {
     org.apache.hadoop.fs.Path parentPath = new org.apache.hadoop.fs.Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
     if (!fs.exists(parentPath)) {
@@ -119,12 +110,20 @@ public class FileCreateUtils {
     }
   }
 
+  private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException {
+    createMetaFile(basePath, instantTime, suffix, "".getBytes());
+  }
+
   private static void createMetaFile(String basePath, String instantTime, String suffix, byte[] content) throws IOException {
     Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
     Files.createDirectories(parentPath);
     Path metaFilePath = parentPath.resolve(instantTime + suffix);
     if (Files.notExists(metaFilePath)) {
-      Files.write(metaFilePath, content);
+      if (content.length == 0) {
+        Files.createFile(metaFilePath);
+      } else {
+        Files.write(metaFilePath, content);
+      }
     }
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index fa6998b..881197e 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -117,8 +117,12 @@ import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 
 public class HoodieTestTable {
 
+  public static final String PHONY_TABLE_SCHEMA =
+      "{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\", \"name\": \"PhonyRecord\", \"fields\": []}";
+
   private static final Logger LOG = LogManager.getLogger(HoodieTestTable.class);
   private static final Random RANDOM = new Random();
+
   protected static HoodieTestTableState testTableState;
   private final List<String> inflightCommits = new ArrayList<>();
 
@@ -215,7 +219,7 @@ public class HoodieTestTable {
       writeStats.addAll(generateHoodieWriteStatForPartitionLogFiles(testTableState.getPartitionToLogFileInfoMap(commitTime), commitTime, bootstrap));
     }
     Map<String, String> extraMetadata = createImmutableMap("test", "test");
-    return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, EMPTY_STRING, action);
+    return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, PHONY_TABLE_SCHEMA, action);
   }
 
   public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException {
@@ -779,7 +783,7 @@ public class HoodieTestTable {
       this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition));
     }
     HoodieReplaceCommitMetadata replaceMetadata =
-        (HoodieReplaceCommitMetadata) buildMetadata(writeStats, partitionToReplaceFileIds, Option.empty(), CLUSTER, EMPTY_STRING,
+        (HoodieReplaceCommitMetadata) buildMetadata(writeStats, partitionToReplaceFileIds, Option.empty(), CLUSTER, PHONY_TABLE_SCHEMA,
             REPLACE_COMMIT_ACTION);
     addReplaceCommit(commitTime, Option.empty(), Option.empty(), replaceMetadata);
     return replaceMetadata;
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
new file mode 100644
index 0000000..ea44170
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, tryUnpackNonNullVal}
+import org.apache.hudi.metadata.{HoodieMetadataPayload, MetadataPartitionType}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+
+import scala.collection.immutable.TreeSet
+
+/**
+ * Mixin trait abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index,
+ * providing convenient interfaces to read it, transpose, etc
+ */
+trait ColumnStatsIndexSupport {
+
+  def readColumnStatsIndex(spark: SparkSession, metadataTablePath: String): DataFrame = {
+    val targetColStatsIndexColumns = Seq(
+      HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+      HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+      HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+      HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
+
+    val requiredMetadataIndexColumns =
+      (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
+        s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
+
+    // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+    val metadataTableDF = spark.read.format("org.apache.hudi")
+      .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+
+    // TODO filter on (column, partition) prefix
+    val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+      .select(requiredMetadataIndexColumns.map(col): _*)
+
+    colStatsDF
+  }
+
+  /**
+   * Transposes and converts the raw table format of the Column Stats Index representation,
+   * where each row/record corresponds to individual (column, file) pair, into the table format
+   * where each row corresponds to single file with statistic for individual columns collated
+   * w/in such row:
+   *
+   * Metadata Table Column Stats Index format:
+   *
+   * <pre>
+   *  +---------------------------+------------+------------+------------+-------------+
+   *  |        fileName           | columnName |  minValue  |  maxValue  |  num_nulls  |
+   *  +---------------------------+------------+------------+------------+-------------+
+   *  | one_base_file.parquet     |          A |          1 |         10 |           0 |
+   *  | another_base_file.parquet |          A |        -10 |          0 |           5 |
+   *  +---------------------------+------------+------------+------------+-------------+
+   * </pre>
+   *
+   * Returned table format
+   *
+   * <pre>
+   *  +---------------------------+------------+------------+-------------+
+   *  |          file             | A_minValue | A_maxValue | A_num_nulls |
+   *  +---------------------------+------------+------------+-------------+
+   *  | one_base_file.parquet     |          1 |         10 |           0 |
+   *  | another_base_file.parquet |        -10 |          0 |           5 |
+   *  +---------------------------+------------+------------+-------------+
+   * </pre>
+   *
+   * NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while
+   *       query at hand might only be referencing a handful of those. As such, we collect all the
+   *       column references from the filtering expressions, and only transpose records corresponding to the
+   *       columns referenced in those
+   *
+   * @param spark Spark session ref
+   * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+   * @param targetColumns target columns to be included into the final table
+   * @param tableSchema schema of the source data table
+   * @return reshaped table according to the format outlined above
+   */
+  def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, targetColumns: Seq[String], tableSchema: StructType): DataFrame = {
+    val colStatsSchema = colStatsDF.schema
+    val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
+      case (field, ordinal) => (field.name, ordinal)
+    }).toMap
+
+    val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
+
+    // NOTE: We're sorting the columns to make sure final index schema matches layout
+    //       of the transposed table
+    val sortedColumns = TreeSet(targetColumns: _*)
+
+    val transposedRDD = colStatsDF.rdd
+      .filter(row => sortedColumns.contains(row.getString(colStatsSchemaOrdinalsMap("columnName"))))
+      .map { row =>
+        val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("minValue")))
+        val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("maxValue")))
+
+        val colName = row.getString(colStatsSchemaOrdinalsMap("columnName"))
+        val colType = tableSchemaFieldMap(colName).dataType
+
+        val rowValsSeq = row.toSeq.toArray
+
+        rowValsSeq(colStatsSchemaOrdinalsMap("minValue")) = deserialize(minValue, colType)
+        rowValsSeq(colStatsSchemaOrdinalsMap("maxValue")) = deserialize(maxValue, colType)
+
+        Row(rowValsSeq:_*)
+      }
+      .groupBy(r => r.getString(colStatsSchemaOrdinalsMap("fileName")))
+      .foldByKey(Seq[Row]()) {
+        case (_, columnRows) =>
+          // Rows seq is always non-empty (otherwise it won't be grouped into)
+          val fileName = columnRows.head.get(colStatsSchemaOrdinalsMap("fileName"))
+          val coalescedRowValuesSeq = columnRows.toSeq
+            // NOTE: It's crucial to maintain appropriate ordering of the columns
+            //       matching table layout
+            .sortBy(_.getString(colStatsSchemaOrdinalsMap("columnName")))
+            .foldLeft(Seq[Any](fileName)) {
+              case (acc, columnRow) =>
+                acc ++ Seq("minValue", "maxValue", "nullCount").map(ord => columnRow.get(colStatsSchemaOrdinalsMap(ord)))
+            }
+
+          Seq(Row(coalescedRowValuesSeq:_*))
+      }
+      .values
+      .flatMap(it => it)
+
+    // NOTE: It's crucial to maintain appropriate ordering of the columns
+    //       matching table layout: hence, we cherry-pick individual columns
+    //       instead of simply filtering in the ones we're interested in the schema
+    val indexSchema = composeIndexSchema(sortedColumns.toSeq, tableSchema)
+
+    spark.createDataFrame(transposedRDD, indexSchema)
+  }
+}
+
+object ColumnStatsIndexSupport {
+
+  private val COLUMN_STATS_INDEX_FILE_COLUMN_NAME = "fileName"
+  private val COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME = "minValue"
+  private val COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue"
+  private val COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME = "num_nulls"
+
+  /**
+   * @VisibleForTesting
+   */
+  def composeIndexSchema(targetColumnNames: Seq[String], tableSchema: StructType): StructType = {
+    val fileNameField = StructField(COLUMN_STATS_INDEX_FILE_COLUMN_NAME, StringType, nullable = true, Metadata.empty)
+    val targetFields = targetColumnNames.map(colName => tableSchema.fields.find(f => f.name == colName).get)
+
+    StructType(
+      targetFields.foldLeft(Seq(fileNameField)) {
+        case (acc, field) =>
+          acc ++ Seq(
+            composeColumnStatStructType(field.name, COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME, field.dataType),
+            composeColumnStatStructType(field.name, COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME, field.dataType),
+            composeColumnStatStructType(field.name, COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME, LongType))
+      }
+    )
+  }
+
+  @inline def getMinColumnNameFor(colName: String): String =
+    formatColName(colName, COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME)
+
+  @inline def getMaxColumnNameFor(colName: String): String =
+    formatColName(colName, COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME)
+
+  @inline def getNumNullsColumnNameFor(colName: String): String =
+    formatColName(colName, COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME)
+
+  @inline private def formatColName(col: String, statName: String) = { // TODO add escaping for
+    String.format("%s_%s", col, statName)
+  }
+
+  @inline private def composeColumnStatStructType(col: String, statName: String, dataType: DataType) =
+    StructField(formatColName(col, statName), dataType, nullable = true, Metadata.empty)
+
+  private def tryUnpackNonNullVal(statStruct: Row): (Any, Int) =
+    statStruct.toSeq.zipWithIndex
+      .find(_._1 != null)
+      // NOTE: First non-null value will be a wrapper (converted into Row), bearing a single
+      //       value
+      .map { case (value, ord) => (value.asInstanceOf[Row].get(0), ord)}
+      .getOrElse((null, -1))
+
+  private def deserialize(value: Any, dataType: DataType): Any = {
+    dataType match {
+      // NOTE: Since we can't rely on Avro's "date", and "timestamp-micros" logical-types, we're
+      //       manually encoding corresponding values as int and long w/in the Column Stats Index and
+      //       here we have to decode those back into corresponding logical representation.
+      case TimestampType => DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long])
+      case DateType => DateTimeUtils.toJavaDate(value.asInstanceOf[Int])
+
+      // NOTE: All integral types of size less than Int are encoded as Ints in MT
+      case ShortType => value.asInstanceOf[Int].toShort
+      case ByteType => value.asInstanceOf[Int].toByte
+
+      case _ => value
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 7fa3e93..82cd1f4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -24,19 +24,17 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, MetadataPartitionType}
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
 import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
-import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlCommonUtils}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
 import java.text.SimpleDateFormat
@@ -81,7 +79,8 @@ case class HoodieFileIndex(spark: SparkSession,
     specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
     fileStatusCache = fileStatusCache
   )
-    with FileIndex {
+    with FileIndex
+    with ColumnStatsIndexSupport {
 
   override def rootPaths: Seq[Path] = queryPaths.asScala
 
@@ -202,61 +201,12 @@ case class HoodieFileIndex(spark: SparkSession,
     if (!isDataSkippingEnabled || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
       Option.empty
     } else {
-      val targetColStatsIndexColumns = Seq(
-        HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
-        HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
-        HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
-        HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
-
-      val requiredMetadataIndexColumns =
-        (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
-          s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
-
-      // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
-      val metadataTableDF = spark.read.format("org.apache.hudi")
-        .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
-
-      // TODO filter on (column, partition) prefix
-      val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
-        .select(requiredMetadataIndexColumns.map(col): _*)
-
+      val colStatsDF: DataFrame = readColumnStatsIndex(spark, metadataTablePath)
       val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
 
       // Persist DF to avoid re-computing column statistics unraveling
       withPersistence(colStatsDF) {
-        // Metadata Table bears rows in the following format
-        //
-        //  +---------------------------+------------+------------+------------+-------------+
-        //  |        fileName           | columnName |  minValue  |  maxValue  |  num_nulls  |
-        //  +---------------------------+------------+------------+------------+-------------+
-        //  | one_base_file.parquet     |          A |          1 |         10 |           0 |
-        //  | another_base_file.parquet |          A |        -10 |          0 |           5 |
-        //  +---------------------------+------------+------------+------------+-------------+
-        //
-        // While Data Skipping utils are expecting following (transposed) format, where per-column stats are
-        // essentially transposed (from rows to columns):
-        //
-        //  +---------------------------+------------+------------+-------------+
-        //  |          file             | A_minValue | A_maxValue | A_num_nulls |
-        //  +---------------------------+------------+------------+-------------+
-        //  | one_base_file.parquet     |          1 |         10 |           0 |
-        //  | another_base_file.parquet |        -10 |          0 |           5 |
-        //  +---------------------------+------------+------------+-------------+
-        //
-        // NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while
-        //       query at hand might only be referencing a handful of those. As such, we collect all the
-        //       column references from the filtering expressions, and only transpose records corresponding to the
-        //       columns referenced in those
-        val transposedColStatsDF =
-          queryReferencedColumns.map(colName =>
-            colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
-              .select(targetColStatsIndexColumns.map(col): _*)
-              .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName))
-              .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName))
-              .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName))
-          )
-            .reduceLeft((left, right) =>
-              left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))
+        val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark, colStatsDF, queryReferencedColumns, schema)
 
         // Persist DF to avoid re-computing column statistics unraveling
         withPersistence(transposedColStatsDF) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkTypeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieSparkTypeUtils.scala
similarity index 79%
rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkTypeUtils.scala
rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieSparkTypeUtils.scala
index d5d9587..3b0fcf0 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkTypeUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieSparkTypeUtils.scala
@@ -15,14 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.spark
+package org.apache.spark.sql
 
-import org.apache.spark.sql.types.{DataType, NumericType, StringType}
+import org.apache.spark.sql.types.{DataType, DecimalType, NumericType, StringType}
 
 // TODO unify w/ DataTypeUtils
 object HoodieSparkTypeUtils {
 
   /**
+   * Returns whether this DecimalType is wider than `other`. If yes, it means `other`
+   * can be casted into `this` safely without losing any precision or range.
+   */
+  def isWiderThan(one: DecimalType, another: DecimalType) =
+    one.isWiderThan(another)
+
+  /**
    * Checks whether casting expression of [[from]] [[DataType]] to [[to]] [[DataType]] will
    * preserve ordering of the elements
    */
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
similarity index 62%
rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
index a21a296..8995701 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,11 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.spark
+package org.apache.spark.sql
 
 import org.apache.hudi.HoodieUnsafeRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.MutablePair
 
 /**
@@ -27,6 +29,10 @@ import org.apache.spark.util.MutablePair
  */
 object HoodieUnsafeRDDUtils {
 
+  // TODO scala-doc
+  def createDataFrame(spark: SparkSession, rdd: RDD[InternalRow], structType: StructType): DataFrame =
+    spark.internalCreateDataFrame(rdd, structType)
+
   /**
    * Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], returning a properly
    * copied [[Array]] of [[InternalRow]]s
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index b7ddd28..bdaddd3 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.ColumnStatsIndexSupport.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
similarity index 80%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java
rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index d34480c..a60fac2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.index.columnstats;
+package org.apache.hudi;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -41,17 +41,15 @@ import org.apache.spark.sql.types.DoubleType;
 import org.apache.spark.sql.types.FloatType;
 import org.apache.spark.sql.types.IntegerType;
 import org.apache.spark.sql.types.LongType;
-import org.apache.spark.sql.types.LongType$;
-import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.ShortType;
 import org.apache.spark.sql.types.StringType;
-import org.apache.spark.sql.types.StringType$;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.StructType$;
 import org.apache.spark.sql.types.TimestampType;
 import org.apache.spark.util.SerializableConfiguration;
 import scala.collection.JavaConversions;
+import scala.collection.JavaConverters$;
 
 import javax.annotation.Nonnull;
 import java.math.BigDecimal;
@@ -63,49 +61,9 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+// TODO merge w/ ColumnStatsIndexSupport
 public class ColumnStatsIndexHelper {
 
-  private static final String COLUMN_STATS_INDEX_FILE_COLUMN_NAME = "file";
-  private static final String COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME = "minValue";
-  private static final String COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue";
-  private static final String COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME = "num_nulls";
-
-  public static String getMinColumnNameFor(String colName) {
-    return composeZIndexColName(colName, COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME);
-  }
-
-  public static String getMaxColumnNameFor(String colName) {
-    return composeZIndexColName(colName, COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME);
-  }
-
-  public static String getNumNullsColumnNameFor(String colName) {
-    return composeZIndexColName(colName, COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME);
-  }
-
-  /**
-   * @VisibleForTesting
-   */
-  @Nonnull
-  public static StructType composeIndexSchema(@Nonnull List<StructField> zorderedColumnsSchemas) {
-    List<StructField> schema = new ArrayList<>();
-    schema.add(new StructField(COLUMN_STATS_INDEX_FILE_COLUMN_NAME, StringType$.MODULE$, true, Metadata.empty()));
-    zorderedColumnsSchemas.forEach(colSchema -> {
-      schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME, colSchema.dataType()));
-      schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME, colSchema.dataType()));
-      schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME, LongType$.MODULE$));
-    });
-    return StructType$.MODULE$.apply(schema);
-  }
-
-  private static StructField composeColumnStatStructType(String col, String statName, DataType dataType) {
-    return new StructField(composeZIndexColName(col, statName), dataType, true, Metadata.empty());
-  }
-
-  private static String composeZIndexColName(String col, String statName) {
-    // TODO add escaping for
-    return String.format("%s_%s", col, statName);
-  }
-
   public static Pair<Object, Object>
       fetchMinMaxValues(
           @Nonnull DataType colType,
@@ -199,10 +157,16 @@ public class ColumnStatsIndexHelper {
     SparkContext sc = sparkSession.sparkContext();
     JavaSparkContext jsc = new JavaSparkContext(sc);
 
+    List<String> columnNames = orderedColumnSchemas.stream()
+        .map(StructField::name)
+        .collect(Collectors.toList());
+
     SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
     int numParallelism = (baseFilesPaths.size() / 3 + 1);
-    List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
+
     String previousJobDescription = sc.getLocalProperty("spark.job.description");
+
+    List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
     try {
       jsc.setJobDescription("Listing parquet column statistics");
       colMinMaxInfos =
@@ -215,9 +179,7 @@ public class ColumnStatsIndexHelper {
                         utils.readRangeFromParquetMetadata(
                                 serializableConfiguration.value(),
                                 new Path(path),
-                                orderedColumnSchemas.stream()
-                                    .map(StructField::name)
-                                    .collect(Collectors.toList())
+                                columnNames
                             )
                             .stream()
                     )
@@ -274,7 +236,10 @@ public class ColumnStatsIndexHelper {
             })
             .filter(Objects::nonNull);
 
-    StructType indexSchema = composeIndexSchema(orderedColumnSchemas);
+    StructType indexSchema = ColumnStatsIndexSupport$.MODULE$.composeIndexSchema(
+        JavaConverters$.MODULE$.collectionAsScalaIterableConverter(columnNames).asScala().toSeq(),
+        StructType$.MODULE$.apply(orderedColumnSchemas)
+    );
 
     return sparkSession.createDataFrame(allMetaDataRDD, indexSchema);
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json
new file mode 100644
index 0000000..1ed929c
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json
@@ -0,0 +1,4 @@
+{"c1_maxValue":769,"c1_minValue":309,"c1_num_nulls":0,"c2_maxValue":" 769sdc","c2_minValue":" 309sdc","c2_num_nulls":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.543-08:00","c4_minValue":"2021-11-19T20:40:55.521-08:00","c4_num_nulls":0,"c5_maxValue":78,"c5_minValue":32,"c5_num_nulls":0,"c6_maxValue":"2020-11-14","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"uQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_min [...]
+{"c1_maxValue":932,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_num_nulls":0,"c5_maxValue":94,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"xw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue [...]
+{"c1_maxValue":943,"c1_minValue":89,"c1_num_nulls":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_num_nulls":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.508-08:00","c4_num_nulls":0,"c5_maxValue":95,"c5_minValue":10,"c5_num_nulls":0,"c6_maxValue":"2020-10-10","c6_minValue":"2020-01-10","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"LA==","c7_num_nulls":0,"c8_maxValue":9,"c8_min [...]
+{"c1_maxValue":959,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.507-08:00","c4_num_nulls":0,"c5_maxValue":97,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minVa [...]
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json
new file mode 100644
index 0000000..b5486d1
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json
@@ -0,0 +1,8 @@
+{"c1_maxValue":568,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":82.111,"c3_num_nulls":0,"c4_maxValue":"2021-11-18T23:34:44.193-08:00","c4_minValue":"2021-11-18T23:34:44.159-08:00","c4_num_nulls":0,"c5_maxValue":58,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-08","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"9g==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue [...]
+{"c1_maxValue":715,"c1_minValue":76,"c1_num_nulls":0,"c2_maxValue":" 76sdc","c2_minValue":" 224sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":246.427,"c3_num_nulls":0,"c4_maxValue":"2021-11-18T23:34:44.199-08:00","c4_minValue":"2021-11-18T23:34:44.166-08:00","c4_num_nulls":0,"c5_maxValue":73,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"LA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minVa [...]
+{"c1_maxValue":768,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 768sdc","c2_minValue":" 118sdc","c2_num_nulls":0,"c3_maxValue":959.131,"c3_minValue":64.768,"c3_num_nulls":0,"c4_maxValue":"2021-11-18T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.164-08:00","c4_num_nulls":0,"c5_maxValue":78,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-05-04","c6_num_nulls":0,"c7_maxValue":"zw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minVa [...]
+{"c1_maxValue":769,"c1_minValue":309,"c1_num_nulls":0,"c2_maxValue":" 769sdc","c2_minValue":" 309sdc","c2_num_nulls":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.543-08:00","c4_minValue":"2021-11-19T20:40:55.521-08:00","c4_num_nulls":0,"c5_maxValue":78,"c5_minValue":32,"c5_num_nulls":0,"c6_maxValue":"2020-11-14","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"uQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_min [...]
+{"c1_maxValue":770,"c1_minValue":129,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":153.431,"c3_num_nulls":0,"c4_maxValue":"2021-11-18T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.169-08:00","c4_num_nulls":0,"c5_maxValue":78,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_mi [...]
+{"c1_maxValue":932,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_num_nulls":0,"c5_maxValue":94,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"xw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue [...]
+{"c1_maxValue":943,"c1_minValue":89,"c1_num_nulls":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_num_nulls":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.508-08:00","c4_num_nulls":0,"c5_maxValue":95,"c5_minValue":10,"c5_num_nulls":0,"c6_maxValue":"2020-10-10","c6_minValue":"2020-01-10","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"LA==","c7_num_nulls":0,"c8_maxValue":9,"c8_min [...]
+{"c1_maxValue":959,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.507-08:00","c4_num_nulls":0,"c5_maxValue":97,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minVa [...]
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json
deleted file mode 100644
index 00d16c6..0000000
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json
+++ /dev/null
@@ -1,8 +0,0 @@
-{"c1_maxValue":272,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":430.129,"c3_num_nulls":0,"c5_maxValue":28,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-03-23","c6_num_nulls":0,"c7_maxValue":"8A==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
-{"c1_maxValue":486,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 79sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":771.590,"c3_minValue":82.111,"c3_num_nulls":0,"c5_maxValue":50,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-22","c6_num_nulls":0,"c7_maxValue":"5g==","c7_minValue":"Ow==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
-{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
-{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
-{"c1_maxValue":719,"c1_minValue":125,"c1_num_nulls":0,"c2_maxValue":" 719sdc","c2_minValue":" 125sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":153.125,"c3_num_nulls":0,"c5_maxValue":73,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-09-27","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"OA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
-{"c1_maxValue":770,"c1_minValue":300,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 300sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_num_nulls":0,"c5_maxValue":78,"c5_minValue":31,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
-{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
-{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json
deleted file mode 100644
index a633e31..0000000
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
-{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
-{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
-{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index 07c1011..10b4faf 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
+import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.spark.sql.catalyst.expressions.{Expression, Not}
 import org.apache.spark.sql.functions.{col, lower}
@@ -35,24 +35,22 @@ import java.sql.Timestamp
 import scala.collection.JavaConverters._
 
 // NOTE: Only A, B columns are indexed
-case class IndexRow(
-  file: String,
+case class IndexRow(fileName: String,
 
-  // Corresponding A column is LongType
-  A_minValue: Long = -1,
-  A_maxValue: Long = -1,
-  A_num_nulls: Long = -1,
+                    // Corresponding A column is LongType
+                    A_minValue: Long = -1,
+                    A_maxValue: Long = -1,
+                    A_num_nulls: Long = -1,
 
-  // Corresponding B column is StringType
-  B_minValue: String = null,
-  B_maxValue: String = null,
-  B_num_nulls: Long = -1,
+                    // Corresponding B column is StringType
+                    B_minValue: String = null,
+                    B_maxValue: String = null,
+                    B_num_nulls: Long = -1,
 
-  // Corresponding B column is TimestampType
-  C_minValue: Timestamp = null,
-  C_maxValue: Timestamp = null,
-  C_num_nulls: Long = -1
-) {
+                    // Corresponding B column is TimestampType
+                    C_minValue: Timestamp = null,
+                    C_maxValue: Timestamp = null,
+                    C_num_nulls: Long = -1) {
   def toRow: Row = Row(productIterator.toSeq: _*)
 }
 
@@ -79,29 +77,27 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
       )
     )
 
-  val indexSchema: StructType =
-    ColumnStatsIndexHelper.composeIndexSchema(
-      sourceTableSchema.fields.toSeq
-        .filter(f => indexedCols.contains(f.name))
-        .asJava
-    )
+  val indexSchema: StructType = composeIndexSchema(indexedCols, sourceTableSchema)
 
   @ParameterizedTest
   @MethodSource(
     Array(
-        "testBasicLookupFilterExpressionsSource",
-        "testAdvancedLookupFilterExpressionsSource",
-        "testCompositeFilterExpressionsSource"
+      "testBasicLookupFilterExpressionsSource",
+      "testAdvancedLookupFilterExpressionsSource",
+      "testCompositeFilterExpressionsSource"
     ))
   def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = {
+    // We have to fix the timezone to make sure all date-bound utilities output
+    // is consistent with the fixtures
     spark.sqlContext.setConf(SESSION_LOCAL_TIMEZONE.key, "UTC")
+
     val resolvedExpr: Expression = exprUtils.resolveExpr(spark, sourceExpr, sourceTableSchema)
     val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)
 
     val indexDf = spark.createDataFrame(input.map(_.toRow).asJava, indexSchema)
 
     val rows = indexDf.where(new Column(lookupFilter))
-      .select("file")
+      .select("fileName")
       .collect()
       .map(_.getString(0))
       .toSeq
@@ -121,7 +117,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
     val indexDf = spark.createDataset(input)
 
     val rows = indexDf.where(new Column(lookupFilter))
-      .select("file")
+      .select("fileName")
       .collect()
       .map(_.getString(0))
       .toSeq
@@ -340,7 +336,7 @@ object TestDataSkippingUtils {
 
       arguments(
         // Filter out all rows that contain A = 0 AND B = 'abc'
-      "A != 0 OR B != 'abc'",
+        "A != 0 OR B != 'abc'",
         Seq(
           IndexRow("file_1", 1, 2, 0),
           IndexRow("file_2", -1, 1, 0),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 7c20be6..5d10a1d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -20,25 +20,28 @@ package org.apache.hudi.functional
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
+import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.ParquetUtils
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
+import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
+import org.apache.hudi.metadata.HoodieTableMetadata
 import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
 import org.apache.spark.sql._
-import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.typedLit
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Tag, Test}
+import org.junit.jupiter.api._
 
 import java.math.BigInteger
 import java.sql.{Date, Timestamp}
 import scala.collection.JavaConverters._
 import scala.util.Random
 
-// TODO repurpose to test Column Stats in Metadata Table
-@Disabled
 @Tag("functional")
-class TestColumnStatsIndex extends HoodieClientTestBase {
+class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSupport {
   var spark: SparkSession = _
 
   val sourceTableSchema =
@@ -67,140 +70,98 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
   }
 
   @Test
-  def testZIndexTableComposition(): Unit = {
-    val targetParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString
-    val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
+  def testMetadataColumnStatsIndex(): Unit = {
+    val opts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+    )
 
-    bootstrapParquetInputTableFromJSON(sourceJSONTablePath, targetParquetTablePath)
+    setTableName("hoodie_test")
+    initMetaClient()
+
+    val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
 
-    val inputDf =
     // NOTE: Schema here is provided for validation that the input date is in the appropriate format
-      spark.read
-        .schema(sourceTableSchema)
-        .parquet(targetParquetTablePath)
-
-    val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8")
-    val zorderedColsSchemaFields = inputDf.schema.fields.filter(f => zorderedCols.contains(f.name)).toSeq
-
-    // {@link TimestampType} is not supported, and will throw -- hence skipping "c4"
-    val newZIndexTableDf = null
-//      ColumnStatsIndexHelper.buildColumnStatsTableFor(
-//        inputDf.sparkSession,
-//        inputDf.inputFiles.toSeq.asJava,
-//        zorderedColsSchemaFields.asJava
-//      )
-
-    val indexSchema =
-      ColumnStatsIndexHelper.composeIndexSchema(
-        sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq.asJava
-      )
-
-    // Collect Z-index stats manually (reading individual Parquet files)
-    val manualZIndexTableDf =
-      buildColumnStatsTableManually(targetParquetTablePath, zorderedCols, indexSchema)
-
-    // NOTE: Z-index is built against stats collected w/in Parquet footers, which will be
-    //       represented w/ corresponding Parquet schema (INT, INT64, INT96, etc).
-    //
-    //       When stats are collected manually, produced Z-index table is inherently coerced into the
-    //       schema of the original source Parquet base-file and therefore we have to similarly coerce newly
-    //       built Z-index table (built off Parquet footers) into the canonical index schema (built off the
-    //       original source file schema)
-    assertEquals(asJson(sort(manualZIndexTableDf)), asJson(sort(newZIndexTableDf)))
-
-    // Match against expected Z-index table
-    val expectedZIndexTableDf =
-      spark.read
-        .schema(indexSchema)
-        .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString)
+    val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
 
-    assertEquals(asJson(sort(expectedZIndexTableDf)), asJson(sort(replace(newZIndexTableDf))))
-  }
+    inputDF
+      .sort("c1")
+      .repartition(4, new Column("c1"))
+      .write
+      .format("hudi")
+      .options(opts)
+      .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
 
-  @Test
-  def testZIndexTableMerge(): Unit = {
-    val testZIndexPath = new Path(basePath, "zindex")
-
-    val firstParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString
-    val firstJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
-
-    // Bootstrap FIRST source Parquet table
-    bootstrapParquetInputTableFromJSON(firstJSONTablePath, firstParquetTablePath)
-
-    val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8")
-    val indexSchema =
-      ColumnStatsIndexHelper.composeIndexSchema(
-        sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq.asJava
-      )
-
-    //
-    // Bootstrap Z-index table
-    //
-
-    val firstCommitInstance = "0"
-    val firstInputDf = spark.read.parquet(firstParquetTablePath)
-
-//    ColumnStatsIndexHelper.updateColumnStatsIndexFor(
-//      firstInputDf.sparkSession,
-//      sourceTableSchema,
-//      firstInputDf.inputFiles.toSeq.asJava,
-//      zorderedCols.asJava,
-//      testZIndexPath.toString,
-//      firstCommitInstance,
-//      Seq().asJava
-//    )
-
-    // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able
-    //       to reliably retrieve it
-    val initialZIndexTable =
-    spark.read
-      .parquet(new Path(testZIndexPath, firstCommitInstance).toString)
-
-    val expectedInitialZIndexTableDf =
-      spark.read
-        .schema(indexSchema)
-        .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
 
-    assertEquals(asJson(sort(expectedInitialZIndexTableDf)), asJson(sort(replace(initialZIndexTable))))
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
 
-    // Bootstrap SECOND source Parquet table
-    val secondParquetTablePath = tempDir.resolve("index/zorder/another-input-table").toAbsolutePath.toString
-    val secondJSONTablePath = getClass.getClassLoader.getResource("index/zorder/another-input-table-json").toString
+    val colStatsDF = readColumnStatsIndex(spark, metadataTablePath)
+    val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
 
-    bootstrapParquetInputTableFromJSON(secondJSONTablePath, secondParquetTablePath)
+    val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
 
-    val secondCommitInstance = "1"
-    val secondInputDf =
+    // Match against expected column stats table
+    val expectedColStatsIndexTableDf =
       spark.read
-        .schema(sourceTableSchema)
-        .parquet(secondParquetTablePath)
-
-    //
-    // Update Column Stats table
-    //
-
-//    ColumnStatsIndexHelper.updateColumnStatsIndexFor(
-//      secondInputDf.sparkSession,
-//      sourceTableSchema,
-//      secondInputDf.inputFiles.toSeq.asJava,
-//      zorderedCols.asJava,
-//      testZIndexPath.toString,
-//      secondCommitInstance,
-//      Seq(firstCommitInstance).asJava
-//    )
-
-    // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able
-    //       to reliably retrieve it
-    val mergedZIndexTable =
-    spark.read
-      .parquet(new Path(testZIndexPath, secondCommitInstance).toString)
-
-    val expectedMergedZIndexTableDf =
+        .schema(expectedColStatsSchema)
+        .json(getClass.getClassLoader.getResource("index/zorder/column-stats-index-table.json").toString)
+
+    assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema)
+    // NOTE: We have to drop the `fileName` column as it contains semi-random components
+    //       that we can't control in this test. Nevertheless, since we manually verify composition of the
+    //       ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
+    assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(transposedColStatsDF.drop("fileName"))))
+
+    // Collect Column Stats manually (reading individual Parquet files)
+    val manualColStatsTableDF =
+      buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema)
+
+    assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF)))
+
+    // do an upsert and validate
+    val updateJSONTablePath = getClass.getClassLoader.getResource("index/zorder/another-input-table-json").toString
+    val updateDF = spark.read
+      .schema(sourceTableSchema)
+      .json(updateJSONTablePath)
+
+    updateDF.repartition(4)
+      .write
+      .format("hudi")
+      .options(opts)
+      .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+
+    val updatedColStatsDF = readColumnStatsIndex(spark, metadataTablePath)
+    val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
+
+    val expectedColStatsIndexUpdatedDF =
       spark.read
-        .schema(indexSchema)
-        .json(getClass.getClassLoader.getResource("index/zorder/z-index-table-merged.json").toString)
+        .schema(expectedColStatsSchema)
+        .json(getClass.getClassLoader.getResource("index/zorder/updated-column-stats-index-table.json").toString)
 
-    assertEquals(asJson(sort(expectedMergedZIndexTableDf)), asJson(sort(replace(mergedZIndexTable))))
+    assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
+    assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+
+    // Collect Column Stats manually (reading individual Parquet files)
+    val manualUpdatedColStatsTableDF =
+      buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema)
+
+    assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
   }
 
   @Test
@@ -249,7 +210,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
       while (it.hasNext) {
         seq = seq :+ it.next()
       }
-      seq
+      seq.filter(fs => fs.getPath.getName.endsWith(".parquet"))
     }
 
     spark.createDataFrame(
@@ -296,23 +257,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
     fs.delete(new Path(targetParquetTablePath, "_SUCCESS"), false)
   }
 
-  def replace(ds: Dataset[Row]): DataFrame = {
-    val uuidRegexp = "[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}"
-
-    val uuids =
-      ds.selectExpr(s"regexp_extract(file, '(${uuidRegexp})')")
-        .distinct()
-        .collect()
-        .map(_.getString(0))
-
-    val uuidToIdx: UserDefinedFunction = functions.udf((fileName: String) => {
-      val uuid = uuids.find(uuid => fileName.contains(uuid)).get
-      fileName.replace(uuid, "xxx")
-    })
-
-    ds.withColumn("file", uuidToIdx(ds("file")))
-  }
-
   private def generateRandomDataFrame(spark: SparkSession): DataFrame = {
     val sourceTableSchema =
       new StructType()
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index 918202e..ead6358 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -51,6 +51,8 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
 
     val opts: Map[String, String] = commonOpts ++ Map(
       HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
       HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
     )
 
@@ -74,22 +76,29 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
       .mode(SaveMode.Append)
       .save(basePath)
 
-    val metadataDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata")
+    // Files partition of MT
+    val filesPartitionDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
 
     // Smoke test
-    metadataDF.show()
+    filesPartitionDF.show()
 
     // Query w/ 0 requested columns should be working fine
-    assertEquals(4, metadataDF.count())
+    assertEquals(4, filesPartitionDF.count())
 
     val expectedKeys = Seq("2015/03/16", "2015/03/17", "2016/03/15", "__all_partitions__")
-    val keys = metadataDF.select("key")
+    val keys = filesPartitionDF.select("key")
       .collect()
       .map(_.getString(0))
       .toSeq
       .sorted
 
     assertEquals(expectedKeys, keys)
+
+    // Column Stats Index partition of MT
+    val colStatsDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
+
+    // Smoke test
+    colStatsDF.show()
   }
 
   private def parseRecords(records: Seq[String]) = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index fff92bc..2cdd788 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -27,10 +27,9 @@ import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
-import org.apache.spark.HoodieUnsafeRDDUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.{Dataset, Row, SaveMode}
+import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode}
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
 import org.junit.jupiter.api.{Tag, Test}
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala
index d6a2453..d84fad4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala
@@ -19,8 +19,8 @@
 package org.apache.spark.sql.execution.benchmark
 
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.ColumnStatsIndexHelper.buildColumnStatsTableFor
 import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
 import org.apache.hudi.sort.SpaceCurveSortingHelper
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.hudi.TestHoodieSqlBase
@@ -38,7 +38,7 @@ object SpaceCurveOptimizeBenchmark extends TestHoodieSqlBase {
 
     val orderedColsTypes = Seq(StructField(co1, IntegerType), StructField(co2, IntegerType))
     val colStatsIndexTable =
-      ColumnStatsIndexHelper.buildColumnStatsTableFor(spark, sourceTableDF.inputFiles.toSeq, orderedColsTypes)
+      buildColumnStatsTableFor(spark, sourceTableDF.inputFiles.toSeq, orderedColsTypes)
         .collect()
         .map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5)))
 
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
index 3e23335..f81ff74 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.HoodieSparkTypeUtils.isCastPreservingOrdering
+import HoodieSparkTypeUtils.isCastPreservingOrdering
 import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
 
 object HoodieSpark2CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 31deb34..2673088 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -178,6 +178,11 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConverter = newUnionConverter(st, avroType)
+        val numFields = st.length
+        (getter, ordinal) => unionConverter(getter.getStruct(ordinal, numFields))
+
       case (MapType(kt, vt, valueContainsNull), MAP) if kt == StringType =>
         val valueConverter = newConverter(
           vt, resolveNullableType(avroType.getValueType, valueContainsNull))
@@ -205,8 +210,7 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
     }
   }
 
-  private def newStructConverter(
-                                  catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = {
+  private def newStructConverter(catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = {
     if (avroStruct.getType != RECORD || avroStruct.getFields.size() != catalystStruct.length) {
       throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
         s"Avro type $avroStruct.")
@@ -229,14 +233,58 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
       result
   }
 
+  private def newUnionConverter(catalystStruct: StructType, avroUnion: Schema): InternalRow => Any = {
+    if (avroUnion.getType != UNION || !canMapUnion(catalystStruct, avroUnion)) {
+      throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
+        s"Avro type $avroUnion.")
+    }
+    val nullable = avroUnion.getTypes.size() > 0 && avroUnion.getTypes.get(0).getType == Type.NULL
+    val avroInnerTypes = if (nullable) {
+      avroUnion.getTypes.asScala.tail
+    } else {
+      avroUnion.getTypes.asScala
+    }
+    val fieldConverters = catalystStruct.zip(avroInnerTypes).map {
+      case (f1, f2) => newConverter(f1.dataType, f2)
+    }
+    val numFields = catalystStruct.length
+    (row: InternalRow) =>
+      var i = 0
+      var result: Any = null
+      while (i < numFields) {
+        if (!row.isNullAt(i)) {
+          if (result != null) {
+            throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+              s"Avro union $avroUnion. Record has more than one optional values set")
+          }
+          result = fieldConverters(i).apply(row, i)
+        }
+        i += 1
+      }
+      if (!nullable && result == null) {
+        throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+          s"Avro union $avroUnion. Record has no values set, while should have exactly one")
+      }
+      result
+  }
+
+  private def canMapUnion(catalystStruct: StructType, avroStruct: Schema): Boolean = {
+    (avroStruct.getTypes.size() > 0 &&
+      avroStruct.getTypes.get(0).getType == Type.NULL &&
+      avroStruct.getTypes.size() - 1 == catalystStruct.length) || avroStruct.getTypes.size() == catalystStruct.length
+  }
+
   private def resolveNullableType(avroType: Schema, nullable: Boolean): Schema = {
     if (nullable && avroType.getType != NULL) {
-      // avro uses union to represent nullable type.
+      // Avro uses union to represent nullable type.
       val fields = avroType.getTypes.asScala
-      assert(fields.length == 2)
       val actualType = fields.filter(_.getType != Type.NULL)
-      assert(actualType.length == 1)
-      actualType.head
+      if (fields.length == 2 && actualType.length == 1) {
+        actualType.head
+      } else {
+        // This is just a normal union, not used to designate nullability
+        avroType
+      }
     } else {
       avroType
     }
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala
index cb9c31f..3e65123 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala
@@ -18,7 +18,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.HoodieSparkTypeUtils.isCastPreservingOrdering
+import HoodieSparkTypeUtils.isCastPreservingOrdering
 import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
 
 object HoodieSpark3_1CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index b423f9b..36d86c1 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -196,6 +196,11 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConverter = newUnionConverter(st, avroType)
+        val numFields = st.length
+        (getter, ordinal) => unionConverter(getter.getStruct(ordinal, numFields))
+
       case (MapType(kt, vt, valueContainsNull), MAP) if kt == StringType =>
         val valueConverter = newConverter(
           vt, resolveNullableType(avroType.getValueType, valueContainsNull))
@@ -223,8 +228,7 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
     }
   }
 
-  private def newStructConverter(
-                                  catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = {
+  private def newStructConverter(catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = {
     if (avroStruct.getType != RECORD || avroStruct.getFields.size() != catalystStruct.length) {
       throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
         s"Avro type $avroStruct.")
@@ -258,6 +262,47 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
       result
   }
 
+  private def newUnionConverter(catalystStruct: StructType, avroUnion: Schema): InternalRow => Any = {
+    if (avroUnion.getType != UNION || !canMapUnion(catalystStruct, avroUnion)) {
+      throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
+        s"Avro type $avroUnion.")
+    }
+    val nullable = avroUnion.getTypes.size() > 0 && avroUnion.getTypes.get(0).getType == Type.NULL
+    val avroInnerTypes = if (nullable) {
+      avroUnion.getTypes.asScala.tail
+    } else {
+      avroUnion.getTypes.asScala
+    }
+    val fieldConverters = catalystStruct.zip(avroInnerTypes).map {
+      case (f1, f2) => newConverter(f1.dataType, f2)
+    }
+    val numFields = catalystStruct.length
+    (row: InternalRow) =>
+      var i = 0
+      var result: Any = null
+      while (i < numFields) {
+        if (!row.isNullAt(i)) {
+          if (result != null) {
+            throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+              s"Avro union $avroUnion. Record has more than one optional values set")
+          }
+          result = fieldConverters(i).apply(row, i)
+        }
+        i += 1
+      }
+      if (!nullable && result == null) {
+        throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+          s"Avro union $avroUnion. Record has no values set, while should have exactly one")
+      }
+      result
+  }
+
+  private def canMapUnion(catalystStruct: StructType, avroStruct: Schema): Boolean = {
+    (avroStruct.getTypes.size() > 0 &&
+      avroStruct.getTypes.get(0).getType == Type.NULL &&
+      avroStruct.getTypes.size() - 1 == catalystStruct.length) || avroStruct.getTypes.size() == catalystStruct.length
+  }
+
   /**
    * Resolve a possibly nullable Avro Type.
    *
@@ -285,12 +330,12 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
     if (avroType.getType == Type.UNION) {
       val fields = avroType.getTypes.asScala
       val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      if (fields.length == 2 && actualType.length == 1) {
+        (true, actualType.head)
+      } else {
+        // This is just a normal union, not used to designate nullability
+        (false, avroType)
       }
-      (true, actualType.head)
     } else {
       (false, avroType)
     }
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala
index 8e056c0..fc8c957 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.HoodieSparkTypeUtils.isCastPreservingOrdering
+import HoodieSparkTypeUtils.isCastPreservingOrdering
 import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
 
 object HoodieSpark3_2CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 2fe51d3..73267f4 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -211,6 +211,11 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
         val numFields = st.length
         (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
 
+      case (st: StructType, UNION) =>
+        val unionConverter = newUnionConverter(st, avroType, catalystPath, avroPath)
+        val numFields = st.length
+        (getter, ordinal) => unionConverter(getter.getStruct(ordinal, numFields))
+
       case (MapType(kt, vt, valueContainsNull), MAP) if kt == StringType =>
         val valueConverter = newConverter(
           vt, resolveNullableType(avroType.getValueType, valueContainsNull),
@@ -288,6 +293,50 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
       result
   }
 
+  private def newUnionConverter(catalystStruct: StructType,
+                                avroUnion: Schema,
+                                catalystPath: Seq[String],
+                                avroPath: Seq[String]): InternalRow => Any = {
+    if (avroUnion.getType != UNION || !canMapUnion(catalystStruct, avroUnion)) {
+      throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
+        s"Avro type $avroUnion.")
+    }
+    val nullable = avroUnion.getTypes.size() > 0 && avroUnion.getTypes.get(0).getType == Type.NULL
+    val avroInnerTypes = if (nullable) {
+      avroUnion.getTypes.asScala.tail
+    } else {
+      avroUnion.getTypes.asScala
+    }
+    val fieldConverters = catalystStruct.zip(avroInnerTypes).map {
+      case (f1, f2) => newConverter(f1.dataType, f2, catalystPath, avroPath)
+    }
+    val numFields = catalystStruct.length
+    (row: InternalRow) =>
+      var i = 0
+      var result: Any = null
+      while (i < numFields) {
+        if (!row.isNullAt(i)) {
+          if (result != null) {
+            throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+              s"Avro union $avroUnion. Record has more than one optional values set")
+          }
+          result = fieldConverters(i).apply(row, i)
+        }
+        i += 1
+      }
+      if (!nullable && result == null) {
+        throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+          s"Avro union $avroUnion. Record has no values set, while should have exactly one")
+      }
+      result
+  }
+
+  private def canMapUnion(catalystStruct: StructType, avroStruct: Schema): Boolean = {
+    (avroStruct.getTypes.size() > 0 &&
+      avroStruct.getTypes.get(0).getType == Type.NULL &&
+      avroStruct.getTypes.size() - 1 == catalystStruct.length) || avroStruct.getTypes.size() == catalystStruct.length
+  }
+
   /**
    * Resolve a possibly nullable Avro Type.
    *
@@ -315,12 +364,12 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
     if (avroType.getType == Type.UNION) {
       val fields = avroType.getTypes.asScala
       val actualType = fields.filter(_.getType != Type.NULL)
-      if (fields.length != 2 || actualType.length != 1) {
-        throw new UnsupportedAvroTypeException(
-          s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
-            "type is supported")
+      if (fields.length == 2 && actualType.length == 1) {
+        (true, actualType.head)
+      } else {
+        // This is just a normal union, not used to designate nullability
+        (false, avroType)
       }
-      (true, actualType.head)
     } else {
       (false, avroType)
     }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5a11570..2f88809 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -51,6 +51,7 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.utilities.util.BloomFilterData;
 
 import com.beust.jcommander.JCommander;
@@ -660,6 +661,7 @@ public class HoodieMetadataTableValidator implements Serializable {
         }).collect(Collectors.toList());
   }
 
+  @SuppressWarnings("rawtypes")
   private void validateAllColumnStats(
       HoodieMetadataValidationContext metadataTableBasedContext,
       HoodieMetadataValidationContext fsBasedContext,
@@ -667,9 +669,9 @@ public class HoodieMetadataTableValidator implements Serializable {
       Set<String> baseDataFilesForCleaning) {
 
     List<String> latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
-    List<HoodieColumnRangeMetadata<String>> metadataBasedColStats = metadataTableBasedContext
+    List<HoodieColumnRangeMetadata<Comparable>> metadataBasedColStats = metadataTableBasedContext
         .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
-    List<HoodieColumnRangeMetadata<String>> fsBasedColStats = fsBasedContext
+    List<HoodieColumnRangeMetadata<Comparable>> fsBasedColStats = fsBasedContext
         .getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
 
     validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats");
@@ -777,10 +779,10 @@ public class HoodieMetadataTableValidator implements Serializable {
   }
 
   public static class HoodieColumnRangeMetadataComparator
-      implements Comparator<HoodieColumnRangeMetadata<String>>, Serializable {
+      implements Comparator<HoodieColumnRangeMetadata<Comparable>>, Serializable {
 
     @Override
-    public int compare(HoodieColumnRangeMetadata<String> o1, HoodieColumnRangeMetadata<String> o2) {
+    public int compare(HoodieColumnRangeMetadata<Comparable> o1, HoodieColumnRangeMetadata<Comparable> o2) {
       return o1.toString().compareTo(o2.toString());
     }
   }
@@ -837,7 +839,8 @@ public class HoodieMetadataTableValidator implements Serializable {
           .sorted(new HoodieFileGroupComparator()).collect(Collectors.toList());
     }
 
-    public List<HoodieColumnRangeMetadata<String>> getSortedColumnStatsList(
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public List<HoodieColumnRangeMetadata<Comparable>> getSortedColumnStatsList(
         String partitionPath, List<String> baseFileNameList) {
       LOG.info("All column names for getting column stats: " + allColumnNameList);
       if (enableMetadataTable) {
@@ -846,15 +849,7 @@ public class HoodieMetadataTableValidator implements Serializable {
         return allColumnNameList.stream()
             .flatMap(columnName ->
                 tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
-                    .map(stats -> HoodieColumnRangeMetadata.create(
-                        stats.getFileName(),
-                        columnName,
-                        stats.getMinValue(),
-                        stats.getMaxValue(),
-                        stats.getNullCount(),
-                        stats.getValueCount(),
-                        stats.getTotalSize(),
-                        stats.getTotalUncompressedSize()))
+                    .map(HoodieTableMetadataUtil::convertColumnStatsRecordToColumnRangeMetadata)
                     .collect(Collectors.toList())
                     .stream())
             .sorted(new HoodieColumnRangeMetadataComparator())
@@ -865,18 +860,6 @@ public class HoodieMetadataTableValidator implements Serializable {
                     metaClient.getHadoopConf(),
                     new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
                     allColumnNameList).stream())
-            .map(rangeMetadata -> HoodieColumnRangeMetadata.create(
-                rangeMetadata.getFilePath(),
-                rangeMetadata.getColumnName(),
-                // Note: here we ignore the type in the validation,
-                // since column stats from metadata table store the min/max values as String
-                rangeMetadata.getMinValue().toString(),
-                rangeMetadata.getMaxValue().toString(),
-                rangeMetadata.getNullCount(),
-                rangeMetadata.getValueCount(),
-                rangeMetadata.getTotalSize(),
-                rangeMetadata.getTotalUncompressedSize()
-            ))
             .sorted(new HoodieColumnRangeMetadataComparator())
             .collect(Collectors.toList());
       }