You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/07/25 22:36:19 UTC

[hudi] branch master updated: [HUDI-4250][HUDI-4202] Optimize performance of Column Stats Index reading in Data Skipping (#5746)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e7c8df7e8b [HUDI-4250][HUDI-4202] Optimize performance of Column Stats Index reading in Data Skipping  (#5746)
e7c8df7e8b is described below

commit e7c8df7e8b886261a9b0da7696a75e73eca04f11
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Mon Jul 25 15:36:12 2022 -0700

    [HUDI-4250][HUDI-4202] Optimize performance of Column Stats Index reading in Data Skipping  (#5746)
    
    We provide an alternative way of fetching Column Stats Index within the reading process to avoid the penalty of a more heavy-weight execution scheduled through a Spark engine.
---
 .../scala/org/apache/hudi/util/JFunction.scala     |  32 +-
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   9 +
 .../functional/TestHoodieBackedMetadata.java       |   6 +-
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |  12 +-
 .../hudi/common/config/HoodieMetadataConfig.java   |  28 ++
 .../metadata/FileSystemBackedTableMetadata.java    |   2 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  82 ++--
 .../apache/hudi/metadata/HoodieTableMetadata.java  |   3 +-
 .../hudi/source/stats/ColumnStatsIndices.java      |   2 +-
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  | 481 ++++++++++++++-------
 .../org/apache/hudi/HoodieCatalystUtils.scala      |  65 +++
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |   7 +-
 .../scala/org/apache/hudi/HoodieDatasetUtils.scala |  45 --
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 100 ++---
 ...nsafeRDDUtils.scala => HoodieUnsafeUtils.scala} |  45 +-
 .../org/apache/hudi/TestHoodieFileIndex.scala      |  10 +-
 .../hudi/functional/TestColumnStatsIndex.scala     | 168 +++----
 .../functional/TestParquetColumnProjection.scala   |   4 +-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |  22 +
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |  22 +
 20 files changed, 744 insertions(+), 401 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
index 4a7dca8408..8a612f4da2 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
@@ -17,15 +17,43 @@
 
 package org.apache.hudi.util
 
+import org.apache.hudi.common.function.{SerializableFunction, SerializablePairFunction}
+import org.apache.hudi.common.util.collection
+
+import scala.language.implicitConversions
+
 /**
  * Utility allowing for seamless conversion b/w Java/Scala functional primitives
  */
 object JFunction {
 
-  def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
+  ////////////////////////////////////////////////////////////
+  // From Java to Scala
+  ////////////////////////////////////////////////////////////
+
+  implicit def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
     (t: T) => f.apply(t)
 
-  def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
+  ////////////////////////////////////////////////////////////
+  // From Scala to Java
+  ////////////////////////////////////////////////////////////
+
+  implicit def toJavaFunction[T, R](f: Function[T, R]): java.util.function.Function[T, R] =
+    new java.util.function.Function[T, R] {
+      override def apply(t: T): R = f.apply(t)
+    }
+
+  implicit def toJavaSerializableFunction[T, R](f: Function[T, R]): SerializableFunction[T, R] =
+    new SerializableFunction[T, R] {
+      override def apply(t: T): R = f.apply(t)
+    }
+
+  implicit def toJavaSerializablePairFunction[T, K, V](f: Function[T, collection.Pair[K, V]]): SerializablePairFunction[T, K, V] =
+    new SerializablePairFunction[T, K, V] {
+      override def call(t: T): collection.Pair[K, V] = f.apply(t)
+    }
+
+  implicit def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
     new java.util.function.Consumer[T] {
       override def accept(t: T): Unit = f.apply(t)
     }
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index cd30528798..24f4e6117a 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -27,12 +27,16 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
 
 import java.util.Locale
 
@@ -138,4 +142,9 @@ trait SparkAdapter extends Serializable {
    * TODO move to HoodieCatalystExpressionUtils
    */
   def createInterpretedPredicate(e: Expression): InterpretedPredicate
+
+  /**
+   * Converts instance of [[StorageLevel]] to a corresponding string
+   */
+  def convertStorageLevelToString(level: StorageLevel): String
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index e10c372be6..8828ceab6d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1504,7 +1504,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
       // prefix search for column (_hoodie_record_key)
       ColumnIndexID columnIndexID = new ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
       List<HoodieRecord<HoodieMetadataPayload>> result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString()),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
 
       // there are 3 partitions in total and 2 commits. total entries should be 6.
       assertEquals(result.size(), 6);
@@ -1515,7 +1515,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
       // prefix search for col(_hoodie_record_key) and first partition. only 2 files should be matched
       PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
       result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
       result.forEach(entry -> {
@@ -1534,7 +1534,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
       // prefix search for column {commit time} and first partition
       columnIndexID = new ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
       result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
 
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index ec70653b9c..0b2c34618e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.CachingPath;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -62,7 +63,7 @@ import java.util.stream.Collectors;
  *   <li>Query instant/range</li>
  * </ul>
  */
-public abstract class BaseHoodieTableFileIndex {
+public abstract class   BaseHoodieTableFileIndex {
 
   private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);
 
@@ -166,6 +167,11 @@ public abstract class BaseHoodieTableFileIndex {
         .collect(Collectors.toMap(e -> e.getKey().path, Map.Entry::getValue));
   }
 
+  public int getFileSlicesCount() {
+    return cachedAllInputFileSlices.values().stream()
+        .mapToInt(List::size).sum();
+  }
+
   protected List<PartitionPath> getAllQueryPartitionPaths() {
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(new Path(basePath), path))
@@ -349,10 +355,10 @@ public abstract class BaseHoodieTableFileIndex {
 
     Path fullPartitionPath(String basePath) {
       if (!path.isEmpty()) {
-        return new Path(basePath, path);
+        return new CachingPath(basePath, path);
       }
 
-      return new Path(basePath);
+      return new CachingPath(basePath);
     }
 
     @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 2cd08b9ae9..b16373ef83 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -187,6 +187,26 @@ public final class HoodieMetadataConfig extends HoodieConfig {
       .sinceVersion("0.11.0")
       .withDocumentation("Comma-separated list of columns for which column stats index will be built. If not set, all columns will be indexed");
 
+  public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY = "in-memory";
+  public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE = "engine";
+
+  public static final ConfigProperty<String> COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE = ConfigProperty
+      .key(METADATA_PREFIX + ".index.column.stats.processing.mode.override")
+      .noDefaultValue()
+      .withValidValues(COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY, COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE)
+      .sinceVersion("0.12.0")
+      .withDocumentation("By default Column Stats Index is automatically determining whether it should be read and processed either"
+          + "'in-memory' (w/in executing process) or using Spark (on a cluster), based on some factors like the size of the Index "
+          + "and how many columns are read. This config allows to override this behavior.");
+
+  public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD = ConfigProperty
+      .key(METADATA_PREFIX + ".index.column.stats.inMemory.projection.threshold")
+      .defaultValue(100000)
+      .sinceVersion("0.12.0")
+      .withDocumentation("When reading Column Stats Index, if the size of the expected resulting projection is below the in-memory"
+          + " threshold (counted by the # of rows), it will be attempted to be loaded \"in-memory\" (ie not using the execution engine"
+          + " like Spark, Flink, etc). If the value is above the threshold execution engine will be used to compose the projection.");
+
   public static final ConfigProperty<String> BLOOM_FILTER_INDEX_FOR_COLUMNS = ConfigProperty
       .key(METADATA_PREFIX + ".index.bloom.filter.column.list")
       .noDefaultValue()
@@ -246,6 +266,14 @@ public final class HoodieMetadataConfig extends HoodieConfig {
     return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER);
   }
 
+  public String getColumnStatsIndexProcessingModeOverride() {
+    return getString(COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE);
+  }
+
+  public Integer getColumnStatsIndexInMemoryProjectionThreshold() {
+    return getIntOrDefault(COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD);
+  }
+
   public List<String> getColumnsEnabledForBloomFilterIndex() {
     return StringUtils.split(getString(BLOOM_FILTER_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index f029995ba0..9877755b3c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -168,7 +168,7 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
   }
 
   @Override
-  public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName) {
+  public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName, boolean shouldLoadInMemory) {
     throw new HoodieMetadataException("Unsupported operation: getRecordsByKeyPrefixes!");
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index e96889f044..f8a0389da3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.model.FileSlice;
@@ -143,10 +144,11 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
 
   @Override
   public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
-                                                                                 String partitionName) {
+                                                                                 String partitionName,
+                                                                                 boolean shouldLoadInMemory) {
     // Sort the columns so that keys are looked up in order
-    List<String> sortedkeyPrefixes = new ArrayList<>(keyPrefixes);
-    Collections.sort(sortedkeyPrefixes);
+    List<String> sortedKeyPrefixes = new ArrayList<>(keyPrefixes);
+    Collections.sort(sortedKeyPrefixes);
 
     // NOTE: Since we partition records to a particular file-group by full key, we will have
     //       to scan all file-groups for all key-prefixes as each of these might contain some
@@ -154,44 +156,44 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     List<FileSlice> partitionFileSlices =
         HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
 
-    return engineContext.parallelize(partitionFileSlices)
-        .flatMap(
-            (SerializableFunction<FileSlice, Iterator<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
-              // NOTE: Since this will be executed by executors, we can't access previously cached
-              //       readers, and therefore have to always open new ones
-              Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
-                  openReaders(partitionName, fileSlice);
-              try {
-                List<Long> timings = new ArrayList<>();
-
-                HoodieFileReader baseFileReader = readers.getKey();
-                HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
-
-                if (baseFileReader == null && logRecordScanner == null) {
-                  // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
-                  return Collections.emptyIterator();
-                }
-
-                boolean fullKeys = false;
-
-                Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
-                    readLogRecords(logRecordScanner, sortedkeyPrefixes, fullKeys, timings);
-
-                List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
-                    readFromBaseAndMergeWithLogRecords(baseFileReader, sortedkeyPrefixes, fullKeys, logRecords, timings, partitionName);
-
-                LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
-                    sortedkeyPrefixes.size(), timings));
-
-                return mergedRecords.iterator();
-              } catch (IOException ioe) {
-                throw new HoodieIOException("Error merging records from metadata table for  " + sortedkeyPrefixes.size() + " key : ", ioe);
-              } finally {
-                closeReader(readers);
-              }
+    return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : engineContext.parallelize(partitionFileSlices))
+        .flatMap((SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
+          // NOTE: Since this will be executed by executors, we can't access previously cached
+          //       readers, and therefore have to always open new ones
+          Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+              openReaders(partitionName, fileSlice);
+
+          try {
+            List<Long> timings = new ArrayList<>();
+
+            HoodieFileReader baseFileReader = readers.getKey();
+            HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+
+            if (baseFileReader == null && logRecordScanner == null) {
+              // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
+              return Collections.emptyIterator();
             }
-        )
-        .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+
+            boolean fullKeys = false;
+
+            Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
+                readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings);
+
+            List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
+                readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+
+            LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
+                sortedKeyPrefixes.size(), timings));
+
+            return mergedRecords.stream()
+                .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+                .iterator();
+          } catch (IOException ioe) {
+            throw new HoodieIOException("Error merging records from metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
+          } finally {
+            closeReader(readers);
+          }
+        })
         .filter(Objects::nonNull);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index a059b57845..ae871e3be0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -170,7 +170,8 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
    * @return {@link HoodieData} of {@link HoodieRecord}s with records matching the passed in key prefixes.
    */
   HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
-                                                                          String partitionName);
+                                                                          String partitionName,
+                                                                          boolean shouldLoadInMemory);
 
   /**
    * Get the instant time to which the metadata is synced w.r.t data timeline.
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
index 8ec0eafde3..75e10341db 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
@@ -319,7 +319,7 @@ public class ColumnStatsIndices {
         .map(colName -> new ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList());
 
     HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
-        metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
+        metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, false);
 
     org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter converter =
         AvroToRowDataConverters.createRowConverter((RowType) METADATA_DATA_TYPE.getLogicalType());
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index b1e03f86ff..58511f791e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -17,66 +17,153 @@
 
 package org.apache.hudi
 
-import org.apache.avro.Schema.Parser
-import org.apache.avro.generic.GenericRecord
-import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, metadataRecordSchemaString, metadataRecordStructType, tryUnpackNonNullVal}
+import org.apache.avro.Conversions.DecimalConversion
+import org.apache.avro.generic.GenericData
+import org.apache.hudi.ColumnStatsIndexSupport._
+import org.apache.hudi.HoodieCatalystUtils.{withPersistedData, withPersistedDataset}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.avro.model._
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.data.HoodieData
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
 import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.collection
 import org.apache.hudi.common.util.hash.ColumnIndexID
 import org.apache.hudi.data.HoodieJavaRDD
 import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.util.JFunction
 import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, createDataFrameFromRDD, createDataFrameFromRows}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, HoodieUnsafeRDDUtils, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
 
+import java.nio.ByteBuffer
 import scala.collection.JavaConverters._
 import scala.collection.immutable.TreeSet
+import scala.collection.mutable.ListBuffer
+import scala.collection.parallel.mutable.ParHashMap
+
+class ColumnStatsIndexSupport(spark: SparkSession,
+                              tableSchema: StructType,
+                              @transient metadataConfig: HoodieMetadataConfig,
+                              @transient metaClient: HoodieTableMetaClient,
+                              allowCaching: Boolean = false) {
+
+  @transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
+  @transient private lazy val metadataTable: HoodieTableMetadata =
+    HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
+
+  @transient private lazy val cachedColumnStatsIndexViews: ParHashMap[Seq[String], DataFrame] = ParHashMap()
+
+  // NOTE: Since [[metadataConfig]] is transient this has to be eagerly persisted, before this will be passed
+  //       on to the executor
+  private val inMemoryProjectionThreshold = metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
+
+  private lazy val indexedColumns: Set[String] = {
+    val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
+    // Column Stats Index could index either
+    //    - The whole table
+    //    - Only configured columns
+    if (customIndexedColumns.isEmpty) {
+      tableSchema.fieldNames.toSet
+    } else {
+      customIndexedColumns.asScala.toSet
+    }
+  }
 
-/**
- * Mixin trait abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index,
- * providing convenient interfaces to read it, transpose, etc
- */
-trait ColumnStatsIndexSupport extends SparkAdapterSupport {
-
-  def readColumnStatsIndex(spark: SparkSession,
-                           tableBasePath: String,
-                           metadataConfig: HoodieMetadataConfig,
-                           targetColumns: Seq[String] = Seq.empty): DataFrame = {
-    val targetColStatsIndexColumns = Seq(
-      HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
-      HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
-      HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
-      HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
-      HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
-    val requiredMetadataIndexColumns =
-      (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
-        s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
-
-    val metadataTableDF: DataFrame = {
-      // NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
-      //       by only fetching Column Stats Index records pertaining to the requested columns.
-      //       Otherwise we fallback to read whole Column Stats Index
-      if (targetColumns.nonEmpty) {
-        readColumnStatsIndexForColumnsInternal(spark, targetColumns, metadataConfig, tableBasePath)
-      } else {
-        readFullColumnStatsIndexInternal(spark, metadataConfig, tableBasePath)
-      }
+  /**
+   * Returns true in cases when Column Stats Index is built and available as standalone partition
+   * w/in the Metadata Table
+   */
+  def isIndexAvailable: Boolean = {
+    checkState(metadataConfig.enabled, "Metadata Table support has to be enabled")
+    metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+  }
+
+  /**
+   * Determines whether it would be more optimal to read Column Stats Index a) in-memory of the invoking process,
+   * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
+   */
+  def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns: Seq[String]): Boolean = {
+    Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
+      case Some(mode) =>
+        mode == HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
+      case None =>
+        fileIndex.getFileSlicesCount * queryReferencedColumns.length < inMemoryProjectionThreshold
+    }
+  }
+
+  /**
+   * Loads view of the Column Stats Index in a transposed format where single row coalesces every columns'
+   * statistics for a single file, returning it as [[DataFrame]]
+   *
+   * Please check out scala-doc of the [[transpose]] method explaining this view in more details
+   */
+  def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory: Boolean)(block: DataFrame => T): T = {
+    cachedColumnStatsIndexViews.get(targetColumns) match {
+      case Some(cachedDF) =>
+        block(cachedDF)
+
+      case None =>
+        val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+          loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+
+        withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
+          val (transposedRows, indexSchema) = transpose(colStatsRecords, targetColumns)
+          val df = if (shouldReadInMemory) {
+            // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
+            //       of the transposed table in memory, facilitating execution of the subsequently chained operations
+            //       on it locally (on the driver; all such operations are actually going to be performed by Spark's
+            //       Optimizer)
+            createDataFrameFromRows(spark, transposedRows.collectAsList().asScala, indexSchema)
+          } else {
+            val rdd = HoodieJavaRDD.getJavaRDD(transposedRows)
+            spark.createDataFrame(rdd, indexSchema)
+          }
+
+          if (allowCaching) {
+            cachedColumnStatsIndexViews.put(targetColumns, df)
+            // NOTE: Instead of collecting the rows from the index and hold them in memory, we instead rely
+            //       on Spark as (potentially distributed) cache managing data lifecycle, while we simply keep
+            //       the referenced to persisted [[DataFrame]] instance
+            df.persist(StorageLevel.MEMORY_ONLY)
+
+            block(df)
+          } else {
+            withPersistedDataset(df) {
+              block(df)
+            }
+          }
+        }
     }
+  }
 
-    val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
-      .select(requiredMetadataIndexColumns.map(col): _*)
+  /**
+   * Loads a view of the Column Stats Index in a raw format, returning it as [[DataFrame]]
+   *
+   * Please check out scala-doc of the [[transpose]] method explaining this view in more details
+   */
+  def load(targetColumns: Seq[String] = Seq.empty, shouldReadInMemory: Boolean = false): DataFrame = {
+    // NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
+    //       by only fetching Column Stats Index records pertaining to the requested columns.
+    //       Otherwise we fallback to read whole Column Stats Index
+    if (targetColumns.nonEmpty) {
+      loadColumnStatsIndexForColumnsInternal(targetColumns, shouldReadInMemory)
+    } else {
+      loadFullColumnStatsIndexInternal()
+    }
+  }
 
-    colStatsDF
+  def invalidateCaches(): Unit = {
+    cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
+    cachedColumnStatsIndexViews.clear()
   }
 
   /**
@@ -112,154 +199,184 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
    *       column references from the filtering expressions, and only transpose records corresponding to the
    *       columns referenced in those
    *
-   * @param spark Spark session ref
-   * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+   * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing raw Column Stats Index records
    * @param queryColumns target columns to be included into the final table
-   * @param tableSchema schema of the source data table
    * @return reshaped table according to the format outlined above
    */
-  def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
-    val colStatsSchema = colStatsDF.schema
-    val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
-      case (field, ordinal) => (field.name, ordinal)
-    }).toMap
-
+  private def transpose(colStatsRecords: HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]): (HoodieData[Row], StructType) = {
     val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
-
-    val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
-    val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
-    val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
-    val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-    val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
-    val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
-    // NOTE: We have to collect list of indexed columns to make sure we properly align the rows
-    //       w/in the transposed dataset: since some files might not have all of the columns indexed
-    //       either due to the Column Stats Index config changes, schema evolution, etc, we have
-    //       to make sure that all of the rows w/in transposed data-frame are properly padded (with null
-    //       values) for such file-column combinations
-    val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect()
-
     // NOTE: We're sorting the columns to make sure final index schema matches layout
     //       of the transposed table
-    val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*)
-
-    val transposedRDD = colStatsDF.rdd
-      .filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal)))
-      .map { row =>
-        if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
+    val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+    val sortedTargetColumns = sortedTargetColumnsSet.toSeq
+
+    // NOTE: This is a trick to avoid pulling all of [[ColumnStatsIndexSupport]] object into the lambdas'
+    //       closures below
+    val indexedColumns = this.indexedColumns
+
+    // Here we perform complex transformation which requires us to modify the layout of the rows
+    // of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
+    // penalty of the [[Dataset]], since it's required to adhere to its schema at all times, while
+    // RDDs are not;
+    val transposedRows: HoodieData[Row] = colStatsRecords
+      // NOTE: Explicit conversion is required for Scala 2.11
+      .filter(JFunction.toJavaSerializableFunction(r => sortedTargetColumnsSet.contains(r.getColumnName)))
+      .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+        if (r.getMinValue == null && r.getMaxValue == null) {
           // Corresponding row could be null in either of the 2 cases
           //    - Column contains only null values (in that case both min/max have to be nulls)
           //    - This is a stubbed Column Stats record (used as a tombstone)
-          row
+          collection.Pair.of(r.getFileName, r)
         } else {
-          val minValueStruct = row.getAs[Row](minValueOrdinal)
-          val maxValueStruct = row.getAs[Row](maxValueOrdinal)
+          val minValueWrapper = r.getMinValue
+          val maxValueWrapper = r.getMaxValue
 
-          checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
+          checkState(minValueWrapper != null && maxValueWrapper != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
 
-          val colName = row.getString(colNameOrdinal)
+          val colName = r.getColumnName
           val colType = tableSchemaFieldMap(colName).dataType
 
-          val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
-          val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
-          val rowValsSeq = row.toSeq.toArray
+          val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper), colType)
+          val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper), colType)
+
           // Update min-/max-value structs w/ unwrapped values in-place
-          rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
-          rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
+          r.setMinValue(minValue)
+          r.setMaxValue(maxValue)
 
-          Row(rowValsSeq: _*)
+          collection.Pair.of(r.getFileName, r)
         }
-      }
-      .groupBy(r => r.getString(fileNameOrdinal))
-      .foldByKey(Seq[Row]()) {
-        case (_, columnRowsSeq) =>
-          // Rows seq is always non-empty (otherwise it won't be grouped into)
-          val fileName = columnRowsSeq.head.get(fileNameOrdinal)
-          val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
-
-          // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
-          // to align existing column-stats for individual file with the list of expected ones for the
-          // whole transposed projection (a superset of all files)
-          val columnRowsMap = columnRowsSeq.map(row => (row.getString(colNameOrdinal), row)).toMap
-          val alignedColumnRowsSeq = sortedTargetColumns.toSeq.map(columnRowsMap.get)
-
-          val coalescedRowValuesSeq =
-            alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
-              case (acc, opt) =>
-                opt match {
-                  case Some(columnStatsRow) =>
-                    acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord))
-                  case None =>
-                    // NOTE: Since we're assuming missing column to essentially contain exclusively
-                    //       null values, we set null-count to be equal to value-count (this behavior is
-                    //       consistent with reading non-existent columns from Parquet)
-                    acc ++ Seq(null, null, valueCount)
-                }
-            }
-
-          Seq(Row(coalescedRowValuesSeq:_*))
-      }
-      .values
-      .flatMap(it => it)
+      }))
+      .groupByKey()
+      .map(JFunction.toJavaSerializableFunction(p => {
+        val columnRecordsSeq: Seq[HoodieMetadataColumnStats] = p.getValue.asScala.toSeq
+        val fileName: String = p.getKey
+        val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+        // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
+        // to align existing column-stats for individual file with the list of expected ones for the
+        // whole transposed projection (a superset of all files)
+        val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, r)).toMap
+        val alignedColStatRecordsSeq = sortedTargetColumns.map(columnRecordsMap.get)
+
+        val coalescedRowValuesSeq =
+          alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, valueCount)) {
+            case (acc, opt) =>
+              opt match {
+                case Some(colStatRecord) =>
+                  acc ++= Seq(colStatRecord.getMinValue, colStatRecord.getMaxValue, colStatRecord.getNullCount)
+                case None =>
+                  // NOTE: This could occur in either of the following cases:
+                  //    1. Column is not indexed in Column Stats Index: in this case we won't be returning
+                  //       any statistics for such column (ie all stats will be null)
+                  //    2. Particular file does not have this particular column (which is indexed by Column Stats Index):
+                  //       in this case we're assuming missing column to essentially contain exclusively
+                  //       null values, we set min/max values as null and null-count to be equal to value-count (this
+                  //       behavior is consistent with reading non-existent columns from Parquet)
+                  //
+                  // This is a way to determine current column's index without explicit iteration (we're adding 3 stats / column)
+                  val idx = acc.length / 3
+                  val colName = sortedTargetColumns(idx)
+                  val indexed = indexedColumns.contains(colName)
+
+                  val nullCount = if (indexed) valueCount else null
+
+                  acc ++= Seq(null, null, nullCount)
+              }
+          }
+
+        Row(coalescedRowValuesSeq:_*)
+      }))
 
     // NOTE: It's crucial to maintain appropriate ordering of the columns
     //       matching table layout: hence, we cherry-pick individual columns
     //       instead of simply filtering in the ones we're interested in the schema
-    val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema)
+    val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
+    (transposedRows, indexSchema)
+  }
 
-    spark.createDataFrame(transposedRDD, indexSchema)
+  private def loadColumnStatsIndexForColumnsInternal(targetColumns: Seq[String], shouldReadInMemory: Boolean): DataFrame = {
+    val colStatsDF = {
+      val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+      // NOTE: Explicit conversion is required for Scala 2.11
+      val catalystRows: HoodieData[InternalRow] = colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+        val converter = AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$, columnStatsRecordStructType)
+        it.asScala.map(r => converter(r).orNull).asJava
+      }), false)
+
+      if (shouldReadInMemory) {
+        // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
+        //       of the transposed table in memory, facilitating execution of the subsequently chained operations
+        //       on it locally (on the driver; all such operations are actually going to be performed by Spark's
+        //       Optimizer)
+        createDataFrameFromInternalRows(spark, catalystRows.collectAsList().asScala, columnStatsRecordStructType)
+      } else {
+        createDataFrameFromRDD(spark, HoodieJavaRDD.getJavaRDD(catalystRows), columnStatsRecordStructType)
+      }
+    }
+
+    colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*)
   }
 
-  private def readFullColumnStatsIndexInternal(spark: SparkSession, metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = {
-    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
+  private def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
+    // Read Metadata Table's Column Stats Index records into [[HoodieData]] container by
+    //    - Fetching the records from CSI by key-prefixes (encoded column names)
+    //    - Extracting [[HoodieMetadataColumnStats]] records
+    //    - Filtering out nulls
+    checkState(targetColumns.nonEmpty)
+
+    // TODO encoding should be done internally w/in HoodieBackedTableMetadata
+    val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
+
+    val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
+      metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
+
+    val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+      // NOTE: Explicit conversion is required for Scala 2.11
+      metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
+        toScalaOption(record.getData.getInsertValue(null, null))
+          .map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
+          .orNull
+      }))
+        .filter(JFunction.toJavaSerializableFunction(columnStatsRecord => columnStatsRecord != null))
+
+    columnStatsRecords
+  }
+
+  private def loadFullColumnStatsIndexInternal(): DataFrame = {
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString)
     // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
-    spark.read.format("org.apache.hudi")
+    val colStatsDF = spark.read.format("org.apache.hudi")
       .options(metadataConfig.getProps.asScala)
       .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
-  }
-
-  private def readColumnStatsIndexForColumnsInternal(spark: SparkSession, targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig, tableBasePath: String) = {
-    val ctx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
 
-    // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by
-    //    - Fetching the records from CSI by key-prefixes (encoded column names)
-    //    - Deserializing fetched records into [[InternalRow]]s
-    //    - Composing [[DataFrame]]
-    val metadataTableDF = {
-      val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig, tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
-      // TODO encoding should be done internally w/in HoodieBackedTableMetadata
-      val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
-
-      val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] =
-        HoodieJavaRDD.getJavaRDD(
-          metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
-        )
-
-      val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it =>
-        val metadataRecordSchema = new Parser().parse(metadataRecordSchemaString)
-        val converter = AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema, metadataRecordStructType)
-
-        it.map { record =>
-          // schema and props are ignored for generating metadata record from the payload
-          // instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
-          toScalaOption(record.getData.getInsertValue(null, null))
-            .flatMap(avroRecord => converter(avroRecord.asInstanceOf[GenericRecord]))
-            .orNull
-        }
-      }
+    val requiredIndexColumns =
+      targetColumnStatsIndexColumns.map(colName =>
+        col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}"))
 
-      HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD, metadataRecordStructType)
-    }
-    metadataTableDF
+    colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+      .select(requiredIndexColumns: _*)
   }
 }
 
 object ColumnStatsIndexSupport {
 
-  private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString
-  private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
+  private val expectedAvroSchemaValues = Set("BooleanWrapper", "IntWrapper", "LongWrapper", "FloatWrapper", "DoubleWrapper",
+    "BytesWrapper", "StringWrapper", "DateWrapper", "DecimalWrapper", "TimeMicrosWrapper", "TimestampMicrosWrapper")
+
+  /**
+   * Target Column Stats Index columns which internally are mapped onto fields of the correspoding
+   * Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted w/in Metadata Table
+   */
+  private val targetColumnStatsIndexColumns = Seq(
+    HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+    HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+    HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+    HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
+    HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
+    HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME
+  )
+
+  private val columnStatsRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$)
 
   /**
    * @VisibleForTesting
@@ -300,13 +417,28 @@ object ColumnStatsIndexSupport {
   @inline private def composeColumnStatStructType(col: String, statName: String, dataType: DataType) =
     StructField(formatColName(col, statName), dataType, nullable = true, Metadata.empty)
 
-  private def tryUnpackNonNullVal(statStruct: Row): (Any, Int) =
-    statStruct.toSeq.zipWithIndex
-      .find(_._1 != null)
-      // NOTE: First non-null value will be a wrapper (converted into Row), bearing a single
-      //       value
-      .map { case (value, ord) => (value.asInstanceOf[Row].get(0), ord)}
-      .getOrElse((null, -1))
+  private def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = {
+    valueWrapper match {
+      case w: BooleanWrapper => w.getValue
+      case w: IntWrapper => w.getValue
+      case w: LongWrapper => w.getValue
+      case w: FloatWrapper => w.getValue
+      case w: DoubleWrapper => w.getValue
+      case w: BytesWrapper => w.getValue
+      case w: StringWrapper => w.getValue
+      case w: DateWrapper => w.getValue
+      case w: DecimalWrapper => w.getValue
+      case w: TimeMicrosWrapper => w.getValue
+      case w: TimestampMicrosWrapper => w.getValue
+
+      case r: GenericData.Record if expectedAvroSchemaValues.contains(r.getSchema.getName) =>
+        r.get("value")
+
+      case _ => throw new UnsupportedOperationException(s"Not recognized value wrapper type (${valueWrapper.getClass.getSimpleName})")
+    }
+  }
+
+  val decConv = new DecimalConversion()
 
   private def deserialize(value: Any, dataType: DataType): Any = {
     dataType match {
@@ -315,12 +447,37 @@ object ColumnStatsIndexSupport {
       //       here we have to decode those back into corresponding logical representation.
       case TimestampType => DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long])
       case DateType => DateTimeUtils.toJavaDate(value.asInstanceOf[Int])
-
+      // Standard types
+      case StringType => value
+      case BooleanType => value
+      // Numeric types
+      case FloatType => value
+      case DoubleType => value
+      case LongType => value
+      case IntegerType => value
       // NOTE: All integral types of size less than Int are encoded as Ints in MT
       case ShortType => value.asInstanceOf[Int].toShort
       case ByteType => value.asInstanceOf[Int].toByte
 
-      case _ => value
+      // TODO fix
+      case _: DecimalType =>
+        value match {
+          case buffer: ByteBuffer =>
+            val logicalType = DecimalWrapper.SCHEMA$.getField("value").schema().getLogicalType
+            decConv.fromBytes(buffer, null, logicalType)
+          case _ => value
+        }
+      case BinaryType =>
+        value match {
+          case b: ByteBuffer =>
+            val bytes = new Array[Byte](b.remaining)
+            b.get(bytes)
+            bytes
+          case other => other
+        }
+
+      case _ =>
+        throw new UnsupportedOperationException(s"Data type for the statistic value is not recognized $dataType")
     }
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala
new file mode 100644
index 0000000000..0f41dc1fff
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.data.HoodieData
+import org.apache.spark.sql.Dataset
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel._
+
+object HoodieCatalystUtils extends SparkAdapterSupport {
+
+  /**
+   * Executes provided function while keeping provided [[Dataset]] instance persisted for the
+   * duration of the execution
+   *
+   * @param df target [[Dataset]] to be persisted
+   * @param level desired [[StorageLevel]] of the persistence
+   * @param f target function to be executed while [[Dataset]] is kept persisted
+   * @tparam T return value of the target function
+   * @return execution outcome of the [[f]] function
+   */
+  def withPersistedDataset[T](df: Dataset[_], level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
+    df.persist(level)
+    try {
+      f
+    } finally {
+      df.unpersist()
+    }
+  }
+
+  /**
+   * Executes provided function while keeping provided [[HoodieData]] instance persisted for the
+   * duration of the execution
+   *
+   * @param data target [[Dataset]] to be persisted
+   * @param level desired [[StorageLevel]] of the persistence
+   * @param f target function to be executed while [[Dataset]] is kept persisted
+   * @tparam T return value of the target function
+   * @return execution outcome of the [[f]] function
+   */
+  def withPersistedData[T](data: HoodieData[_], level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
+    data.persist(sparkAdapter.convertStorageLevelToString(level))
+    try {
+      f
+    } finally {
+      data.unpersist()
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index dad7c17650..09f1fac2c8 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -27,11 +27,10 @@ import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
 import org.apache.hudi.table.BulkInsertPartitioner
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
 import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters.asScalaBufferConverter
@@ -92,9 +91,9 @@ object HoodieDatasetBulkInsertHelper extends Logging {
 
     val updatedDF = if (populateMetaFields && config.shouldCombineBeforeInsert) {
       val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
-      HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, dedupedRdd, updatedSchema)
+      HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema)
     } else {
-      HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema)
+      HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd, updatedSchema)
     }
 
     val trimmedDF = if (shouldDropPartitionColumns) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala
deleted file mode 100644
index a6c689610b..0000000000
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
-
-object HoodieDatasetUtils {
-
-  /**
-   * Executes provided function while keeping provided [[DataFrame]] instance persisted for the
-   * duration of the execution
-   *
-   * @param df target [[DataFrame]] to be persisted
-   * @param level desired [[StorageLevel]] of the persistence
-   * @param f target function to be executed while [[DataFrame]] is kept persisted
-   * @tparam T return value of the target function
-   * @return execution outcome of the [[f]] function
-   */
-  def withPersistence[T](df: DataFrame, level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
-    df.persist(level)
-    try {
-      f
-    } finally {
-      df.unpersist()
-    }
-  }
-}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index bd15cb250f..4e158aaa86 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -18,7 +18,6 @@
 package org.apache.hudi
 
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hudi.HoodieDatasetUtils.withPersistence
 import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, getConfigProperties}
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -26,7 +25,7 @@ import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil}
+import org.apache.hudi.metadata.HoodieMetadataPayload
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
@@ -35,7 +34,7 @@ import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndex
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
 import java.text.SimpleDateFormat
@@ -80,8 +79,9 @@ case class HoodieFileIndex(spark: SparkSession,
     specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
     fileStatusCache = fileStatusCache
   )
-    with FileIndex
-    with ColumnStatsIndexSupport {
+    with FileIndex {
+
+  @transient private lazy val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
 
   override def rootPaths: Seq[Path] = queryPaths.asScala
 
@@ -95,8 +95,9 @@ case class HoodieFileIndex(spark: SparkSession,
    */
   def allFiles: Seq[FileStatus] = {
     cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
-      .filter(_.getBaseFile.isPresent)
-      .map(_.getBaseFile.get().getFileStatus)
+      .map(fs => fs.getBaseFile.orElse(null))
+      .filter(_ != null)
+      .map(_.getFileStatus)
       .toSeq
   }
 
@@ -196,64 +197,63 @@ case class HoodieFileIndex(spark: SparkSession,
     //          nothing CSI in particular could be applied for)
     lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
 
-    if (!isMetadataTableEnabled || !isColumnStatsIndexAvailable || !isDataSkippingEnabled) {
+    if (!isMetadataTableEnabled || !isDataSkippingEnabled || !columnStatsIndex.isIndexAvailable) {
       validateConfig()
       Option.empty
     } else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) {
       Option.empty
     } else {
-      val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, metadataConfig, queryReferencedColumns)
-
-      // Persist DF to avoid re-computing column statistics unraveling
-      withPersistence(colStatsDF) {
-        val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark, colStatsDF, queryReferencedColumns, schema)
-
-        // Persist DF to avoid re-computing column statistics unraveling
-        withPersistence(transposedColStatsDF) {
-          val indexSchema = transposedColStatsDF.schema
-          val indexFilter =
-            queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
-              .reduce(And)
-
-          val allIndexedFileNames =
-            transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-              .collect()
-              .map(_.getString(0))
-              .toSet
-
-          val prunedCandidateFileNames =
-            transposedColStatsDF.where(new Column(indexFilter))
-              .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-              .collect()
-              .map(_.getString(0))
-              .toSet
-
-          // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
-          //       base-file: since it's bound to clustering, which could occur asynchronously
-          //       at arbitrary point in time, and is not likely to be touching all of the base files.
-          //
-          //       To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
-          //       files and all outstanding base-files, and make sure that all base files not
-          //       represented w/in the index are included in the output of this method
-          val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)
-
-          Some(prunedCandidateFileNames ++ notIndexedFileNames)
-        }
+      // NOTE: Since executing on-cluster via Spark API has its own non-trivial amount of overhead,
+      //       it's most often preferential to fetch Column Stats Index w/in the same process (usually driver),
+      //       w/o resorting to on-cluster execution.
+      //       For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or
+      //       on-cluster: total number of rows of the expected projected portion of the index has to be below the
+      //       threshold (of 100k records)
+      val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, queryReferencedColumns)
+
+      columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF =>
+        val indexSchema = transposedColStatsDF.schema
+        val indexFilter =
+          queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
+            .reduce(And)
+
+        val allIndexedFileNames =
+          transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+            .collect()
+            .map(_.getString(0))
+            .toSet
+
+        val prunedCandidateFileNames =
+          transposedColStatsDF.where(new Column(indexFilter))
+            .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+            .collect()
+            .map(_.getString(0))
+            .toSet
+
+        // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
+        //       base-file: since it's bound to clustering, which could occur asynchronously
+        //       at arbitrary point in time, and is not likely to be touching all of the base files.
+        //
+        //       To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
+        //       files and all outstanding base-files, and make sure that all base files not
+        //       represented w/in the index are included in the output of this method
+        val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)
+
+        Some(prunedCandidateFileNames ++ notIndexedFileNames)
       }
     }
   }
 
-  override def refresh(): Unit = super.refresh()
+  override def refresh(): Unit = {
+    super.refresh()
+    columnStatsIndex.invalidateCaches()
+  }
 
   override def inputFiles: Array[String] =
     allFiles.map(_.getPath.toString).toArray
 
   override def sizeInBytes: Long = cachedFileSize
 
-  private def isColumnStatsIndexAvailable =
-    metaClient.getTableConfig.getMetadataPartitions
-      .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
-
   private def isDataSkippingEnabled: Boolean =
     options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
       spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
similarity index 50%
rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
index 8995701d5f..bd7f2f5456 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
@@ -21,17 +21,54 @@ package org.apache.spark.sql
 import org.apache.hudi.HoodieUnsafeRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.MutablePair
 
 /**
  * Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]]
  */
-object HoodieUnsafeRDDUtils {
+object HoodieUnsafeUtils {
 
-  // TODO scala-doc
-  def createDataFrame(spark: SparkSession, rdd: RDD[InternalRow], structType: StructType): DataFrame =
-    spark.internalCreateDataFrame(rdd, structType)
+  /**
+   * Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with provided [[schema]]
+   *
+   * NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most computations with it
+   *       will be executed by Spark locally
+   *
+   * @param spark spark's session
+   * @param rows collection of rows to base [[DataFrame]] on
+   * @param schema target [[DataFrame]]'s schema
+   * @return
+   */
+  def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema: StructType): DataFrame =
+    Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes, rows))
+
+  /**
+   * Creates [[DataFrame]] from the in-memory [[Seq]] of [[InternalRow]]s with provided [[schema]]
+   *
+   * NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most computations with it
+   *       will be executed by Spark locally
+   *
+   * @param spark spark's session
+   * @param rows collection of rows to base [[DataFrame]] on
+   * @param schema target [[DataFrame]]'s schema
+   * @return
+   */
+  def createDataFrameFromInternalRows(spark: SparkSession, rows: Seq[InternalRow], schema: StructType): DataFrame =
+    Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows))
+
+
+  /**
+   * Creates [[DataFrame]] from the [[RDD]] of [[Row]]s with provided [[schema]]
+   *
+   * @param spark spark's session
+   * @param rdd RDD w/ [[Row]]s to base [[DataFrame]] on
+   * @param schema target [[DataFrame]]'s schema
+   * @return
+   */
+  def createDataFrameFromRDD(spark: SparkSession, rdd: RDD[InternalRow], schema: StructType): DataFrame =
+    spark.internalCreateDataFrame(rdd, schema)
 
   /**
    * Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], returning a properly
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 1d4dbfb1ea..19027a47bf 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -369,8 +369,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
     metaClient = HoodieTableMetaClient.reload(metaClient)
 
     case class TestCase(enableMetadata: Boolean,
-                                    enableColumnStats: Boolean,
-                                    enableDataSkipping: Boolean)
+                        enableColumnStats: Boolean,
+                        enableDataSkipping: Boolean,
+                        columnStatsProcessingModeOverride: String = null)
 
     val testCases: Seq[TestCase] =
       TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = false) ::
@@ -378,6 +379,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
       TestCase(enableMetadata = true, enableColumnStats = false, enableDataSkipping = true) ::
       TestCase(enableMetadata = false, enableColumnStats = true, enableDataSkipping = true) ::
       TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true) ::
+      TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY) ::
+      TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE) ::
       Nil
 
     for (testCase <- testCases) {
@@ -391,7 +394,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
       val props = Map[String, String](
         "path" -> basePath,
         QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
-        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString,
+        HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE.key -> testCase.columnStatsProcessingModeOverride
       ) ++ readMetadataOpts
 
       val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index b982b1851c..822d2051cb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -31,12 +31,12 @@ import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
 import org.apache.spark.sql._
-import org.apache.spark.sql.functions.typedLit
+import org.apache.spark.sql.functions.{col, lit, typedLit}
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
 import org.junit.jupiter.api._
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.{Arguments, ArgumentsSource, MethodSource, ValueSource}
 
 import java.math.BigInteger
 import java.sql.{Date, Timestamp}
@@ -44,7 +44,7 @@ import scala.collection.JavaConverters._
 import scala.util.Random
 
 @Tag("functional")
-class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSupport {
+class TestColumnStatsIndex extends HoodieClientTestBase {
   var spark: SparkSession = _
 
   val sourceTableSchema =
@@ -119,35 +119,31 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
       .fromProperties(toProperties(metadataOpts))
       .build()
 
-    val requestedColumns: Seq[String] = {
-      // Providing empty seq of columns to [[readColumnStatsIndex]] will lead to the whole
-      // MT to be read, and subsequently filtered
-      if (testCase.readFullMetadataTable) Seq.empty
-      else sourceTableSchema.fieldNames
-    }
+    val requestedColumns: Seq[String] = sourceTableSchema.fieldNames
 
-    val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
-    val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
+    val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
 
     val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
 
-    // Match against expected column stats table
-    val expectedColStatsIndexTableDf =
-      spark.read
-        .schema(expectedColStatsSchema)
-        .json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString)
+    columnStatsIndex.loadTransposed(requestedColumns, testCase.shouldReadInMemory) { transposedColStatsDF =>
+      // Match against expected column stats table
+      val expectedColStatsIndexTableDf =
+        spark.read
+          .schema(expectedColStatsSchema)
+          .json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString)
 
-    assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema)
-    // NOTE: We have to drop the `fileName` column as it contains semi-random components
-    //       that we can't control in this test. Nevertheless, since we manually verify composition of the
-    //       ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
-    assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(transposedColStatsDF.drop("fileName"))))
+      assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema)
+      // NOTE: We have to drop the `fileName` column as it contains semi-random components
+      //       that we can't control in this test. Nevertheless, since we manually verify composition of the
+      //       ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
+      assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(transposedColStatsDF.drop("fileName"))))
 
-    // Collect Column Stats manually (reading individual Parquet files)
-    val manualColStatsTableDF =
-      buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema)
+      // Collect Column Stats manually (reading individual Parquet files)
+      val manualColStatsTableDF =
+        buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, sourceTableSchema.fieldNames, expectedColStatsSchema)
 
-    assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF)))
+      assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF)))
+    }
 
     // do an upsert and validate
     val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/another-input-table-json").toString
@@ -166,26 +162,28 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
 
     metaClient = HoodieTableMetaClient.reload(metaClient)
 
-    val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
-    val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
+    val updatedColumnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
 
-    val expectedColStatsIndexUpdatedDF =
-      spark.read
-        .schema(expectedColStatsSchema)
-        .json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString)
+    updatedColumnStatsIndex.loadTransposed(requestedColumns, testCase.shouldReadInMemory) { transposedUpdatedColStatsDF =>
+      val expectedColStatsIndexUpdatedDF =
+        spark.read
+          .schema(expectedColStatsSchema)
+          .json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString)
 
-    assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
-    assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+      assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
+      assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
 
-    // Collect Column Stats manually (reading individual Parquet files)
-    val manualUpdatedColStatsTableDF =
-      buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema)
+      // Collect Column Stats manually (reading individual Parquet files)
+      val manualUpdatedColStatsTableDF =
+        buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, sourceTableSchema.fieldNames, expectedColStatsSchema)
 
-    assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
+      assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
+    }
   }
 
-  @Test
-  def testMetadataColumnStatsIndexPartialProjection(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: Boolean): Unit = {
     val targetColumnsToIndex = Seq("c1", "c2", "c3")
 
     val metadataOpts = Map(
@@ -235,11 +233,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
       // These are NOT indexed
       val requestedColumns = Seq("c4")
 
-      val emptyColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
-      val emptyTransposedColStatsDF = transposeColumnStatsIndex(spark, emptyColStatsDF, requestedColumns, sourceTableSchema)
+      val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
 
-      assertEquals(0, emptyColStatsDF.collect().length)
-      assertEquals(0, emptyTransposedColStatsDF.collect().length)
+      columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { emptyTransposedColStatsDF =>
+        assertEquals(0, emptyTransposedColStatsDF.collect().length)
+      }
     }
 
     ////////////////////////////////////////////////////////////////////////
@@ -252,29 +250,27 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
       // We have to include "c1", since we sort the expected outputs by this column
       val requestedColumns = Seq("c4", "c1")
 
-      val partialColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
-      val partialTransposedColStatsDF = transposeColumnStatsIndex(spark, partialColStatsDF, requestedColumns, sourceTableSchema)
-
-      val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns)
-      val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema)
-
+      val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
       // Match against expected column stats table
       val expectedColStatsIndexTableDf =
         spark.read
           .schema(expectedColStatsSchema)
           .json(getClass.getClassLoader.getResource("index/colstats/partial-column-stats-index-table.json").toString)
 
-      assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema)
-      // NOTE: We have to drop the `fileName` column as it contains semi-random components
-      //       that we can't control in this test. Nevertheless, since we manually verify composition of the
-      //       ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
-      assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName"))))
-
       // Collect Column Stats manually (reading individual Parquet files)
       val manualColStatsTableDF =
-        buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema)
+        buildColumnStatsTableManually(basePath, requestedColumns, targetColumnsToIndex, expectedColStatsSchema)
+
+      val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
 
-      assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF)))
+      columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF =>
+        assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema)
+        // NOTE: We have to drop the `fileName` column as it contains semi-random components
+        //       that we can't control in this test. Nevertheless, since we manually verify composition of the
+        //       ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
+        assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName"))))
+        assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF)))
+      }
     }
 
     ////////////////////////////////////////////////////////////////////////
@@ -307,27 +303,26 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
 
       val requestedColumns = sourceTableSchema.fieldNames
 
-      // Nevertheless, the last update was written with a new schema (that is a subset of the original table schema),
-      // we should be able to read CSI, which will be properly padded (with nulls) after transposition
-      val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
-      val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, requestedColumns, sourceTableSchema)
-
-      val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns)
-      val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema)
-
+      val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
       val expectedColStatsIndexUpdatedDF =
         spark.read
           .schema(expectedColStatsSchema)
           .json(getClass.getClassLoader.getResource("index/colstats/updated-partial-column-stats-index-table.json").toString)
 
-      assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
-      assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
-
       // Collect Column Stats manually (reading individual Parquet files)
       val manualUpdatedColStatsTableDF =
-        buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema)
+        buildColumnStatsTableManually(basePath, requestedColumns, targetColumnsToIndex, expectedColStatsSchema)
 
-      assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
+      val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
+
+      // Nevertheless, the last update was written with a new schema (that is a subset of the original table schema),
+      // we should be able to read CSI, which will be properly padded (with nulls) after transposition
+      columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { transposedUpdatedColStatsDF =>
+        assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
+
+        assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+        assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
+      }
     }
   }
 
@@ -370,7 +365,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
     })
   }
 
-  private def buildColumnStatsTableManually(tablePath: String, indexedCols: Seq[String], indexSchema: StructType) = {
+  private def buildColumnStatsTableManually(tablePath: String,
+                                            includedCols: Seq[String],
+                                            indexedCols: Seq[String],
+                                            indexSchema: StructType): DataFrame = {
     val files = {
       val it = fs.listFiles(new Path(tablePath), true)
       var seq = Seq[LocatedFileStatus]()
@@ -387,15 +385,23 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
           s"'${typedLit(file.getPath.getName)}' AS file" +:
           s"sum(1) AS valueCount" +:
             df.columns
-              .filter(col => indexedCols.contains(col))
+              .filter(col => includedCols.contains(col))
               .flatMap(col => {
                 val minColName = s"${col}_minValue"
                 val maxColName = s"${col}_maxValue"
-                Seq(
-                  s"min($col) AS $minColName",
-                  s"max($col) AS $maxColName",
-                  s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount"
-                )
+                if (indexedCols.contains(col)) {
+                  Seq(
+                    s"min($col) AS $minColName",
+                    s"max($col) AS $maxColName",
+                    s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount"
+                  )
+                } else {
+                  Seq(
+                    s"null AS $minColName",
+                    s"null AS $maxColName",
+                    s"null AS ${col}_nullCount"
+                  )
+                }
               })
 
         df.selectExpr(exprs: _*)
@@ -461,11 +467,13 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
 
 object TestColumnStatsIndex {
 
-  case class ColumnStatsTestCase(forceFullLogScan: Boolean, readFullMetadataTable: Boolean)
+  case class ColumnStatsTestCase(forceFullLogScan: Boolean, shouldReadInMemory: Boolean)
 
   def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
     java.util.stream.Stream.of(
-      Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, readFullMetadataTable = false)),
-      Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, readFullMetadataTable = true))
+      Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, shouldReadInMemory = true)),
+      Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, shouldReadInMemory = false)),
+      Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, shouldReadInMemory = false)),
+      Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, shouldReadInMemory = true))
     )
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index b24a341d4a..00ab709144 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSo
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode}
+import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode}
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
 import org.junit.jupiter.api.{Disabled, Tag, Test}
 
@@ -316,7 +316,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
 
       val (rows, bytesRead) = measureBytesRead { () =>
         val rdd = relation.buildScan(targetColumns, Array.empty).asInstanceOf[HoodieUnsafeRDD]
-        HoodieUnsafeRDDUtils.collect(rdd)
+        HoodieUnsafeUtils.collect(rdd)
       }
 
       val targetRecordCount = tableState.targetRecordCount;
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 30af252d2d..3c0282d710 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -22,9 +22,13 @@ import org.apache.avro.Schema
 import org.apache.hudi.Spark2RowSerDe
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.spark.sql.avro._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil}
 import org.apache.spark.sql.hudi.SparkAdapter
@@ -32,6 +36,8 @@ import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark2CatalystExpressionUtils, HoodieSpark2CatalystPlanUtils, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel._
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -115,4 +121,20 @@ class Spark2Adapter extends SparkAdapter {
   override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
     InterpretedPredicate.create(e)
   }
+
+  override def convertStorageLevelToString(level: StorageLevel): String = level match {
+    case NONE => "NONE"
+    case DISK_ONLY => "DISK_ONLY"
+    case DISK_ONLY_2 => "DISK_ONLY_2"
+    case MEMORY_ONLY => "MEMORY_ONLY"
+    case MEMORY_ONLY_2 => "MEMORY_ONLY_2"
+    case MEMORY_ONLY_SER => "MEMORY_ONLY_SER"
+    case MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2"
+    case MEMORY_AND_DISK => "MEMORY_AND_DISK"
+    case MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2"
+    case MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER"
+    case MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2"
+    case OFF_HEAP => "OFF_HEAP"
+    case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level")
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 034d21dba4..4f55039746 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -33,6 +33,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.{HoodieCatalystPlansUtils, HoodieSpark3CatalystPlanUtils, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel.{DISK_ONLY, DISK_ONLY_2, DISK_ONLY_3, MEMORY_AND_DISK, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER, MEMORY_AND_DISK_SER_2, MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER, MEMORY_ONLY_SER_2, NONE, OFF_HEAP}
 
 import scala.util.control.NonFatal
 
@@ -100,4 +102,24 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
   override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
     Predicate.createInterpreted(e)
   }
+
+  /**
+   * Converts instance of [[StorageLevel]] to a corresponding string
+   */
+  override def convertStorageLevelToString(level: StorageLevel): String = level match {
+    case NONE => "NONE"
+    case DISK_ONLY => "DISK_ONLY"
+    case DISK_ONLY_2 => "DISK_ONLY_2"
+    case DISK_ONLY_3 => "DISK_ONLY_3"
+    case MEMORY_ONLY => "MEMORY_ONLY"
+    case MEMORY_ONLY_2 => "MEMORY_ONLY_2"
+    case MEMORY_ONLY_SER => "MEMORY_ONLY_SER"
+    case MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2"
+    case MEMORY_AND_DISK => "MEMORY_AND_DISK"
+    case MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2"
+    case MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER"
+    case MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2"
+    case OFF_HEAP => "OFF_HEAP"
+    case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level")
+  }
 }