@@ -112,154 +195,186 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
    *       column references from the filtering expressions, and only transpose records corresponding to the
    *       columns referenced in those
-   * @param spark Spark session ref
-   * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+   * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing raw Column Stats Index records
    * @param queryColumns target columns to be included into the final table
-   * @param tableSchema schema of the source data table
    * @return reshaped table according to the format outlined above
-  def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
-    val colStatsSchema = colStatsDF.schema
-    val colStatsSchemaOrdinalsMap ={
-      case (field, ordinal) => (, ordinal)
-    }).toMap
+  private def transpose(colStatsRecords: HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]): (HoodieData[Row], StructType) = {
     val tableSchemaFieldMap = => (, f)).toMap
-    val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
-    val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
-    val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
-    val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-    val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
-    val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-    // NOTE: We have to collect list of indexed columns to make sure we properly align the rows
-    //       w/in the transposed dataset: since some files might not have all of the columns indexed
-    //       either due to the Column Stats Index config changes, schema evolution, etc, we have
-    //       to make sure that all of the rows w/in transposed data-frame are properly padded (with null
-    //       values) for such file-column combinations
-    val indexedColumns: Seq[String] = => row.getString(colNameOrdinal)).distinct().collect()
     // NOTE: We're sorting the columns to make sure final index schema matches layout
     //       of the transposed table
-    val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*)
-    val transposedRDD = colStatsDF.rdd
-      .filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal)))
-      .map { row =>
-        if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
+    val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+    val sortedTargetColumns = sortedTargetColumnsSet.toSeq
+    // NOTE: This is a trick to avoid pulling all of [[ColumnStatsIndexSupport]] object into the lambdas'
+    //       closures below
+    val indexedColumns = this.indexedColumns
+    // Here we perform complex transformation which requires us to modify the layout of the rows
+    // of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
+    // penalty of the [[Dataset]], since it's required to adhere to its schema at all times, while
+    // RDDs are not;
+    val transposedRows: HoodieData[Row] = colStatsRecords
+      // NOTE: Explicit conversion is required for Scala 2.11
+      .filter(JFunction.toJavaSerializableFunction(r => sortedTargetColumnsSet.contains(r.getColumnName)))
+      .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+        if (r.getMinValue == null && r.getMaxValue == null) {
           // Corresponding row could be null in either of the 2 cases
           //    - Column contains only null values (in that case both min/max have to be nulls)
           //    - This is a stubbed Column Stats record (used as a tombstone)
-          row
+          collection.Pair.of(r.getFileName, r)
         } else {
-          val minValueStruct = row.getAs[Row](minValueOrdinal)
-          val maxValueStruct = row.getAs[Row](maxValueOrdinal)
+          val minValueWrapper = r.getMinValue
+          val maxValueWrapper = r.getMaxValue
-          checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
+          checkState(minValueWrapper != null && maxValueWrapper != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
-          val colName = row.getString(colNameOrdinal)
+          val colName = r.getColumnName
           val colType = tableSchemaFieldMap(colName).dataType
-          val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
-          val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
-          val rowValsSeq = row.toSeq.toArray
+          val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper), colType)
+          val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper), colType)
           // Update min-/max-value structs w/ unwrapped values in-place
-          rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
-          rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
+          r.setMinValue(minValue)
+          r.setMaxValue(maxValue)
-          Row(rowValsSeq: _*)
+          collection.Pair.of(r.getFileName, r)
-      }
-      .groupBy(r => r.getString(fileNameOrdinal))
-      .foldByKey(Seq[Row]()) {
-        case (_, columnRowsSeq) =>
-          // Rows seq is always non-empty (otherwise it won't be grouped into)
-          val fileName = columnRowsSeq.head.get(fileNameOrdinal)
-          val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
-          // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
-          // to align existing column-stats for individual file with the list of expected ones for the
-          // whole transposed projection (a superset of all files)
-          val columnRowsMap = => (row.getString(colNameOrdinal), row)).toMap
-          val alignedColumnRowsSeq =
-          val coalescedRowValuesSeq =
-            alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
-              case (acc, opt) =>
-                opt match {
-                  case Some(columnStatsRow) =>
-                    acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord))
-                  case None =>
-                    // NOTE: Since we're assuming missing column to essentially contain exclusively
-                    //       null values, we set null-count to be equal to value-count (this behavior is
-                    //       consistent with reading non-existent columns from Parquet)
-                    acc ++ Seq(null, null, valueCount)
-                }
-            }
-          Seq(Row(coalescedRowValuesSeq:_*))
-      }
-      .values
-      .flatMap(it => it)
+      }))
+      .groupByKey()
+      .map(JFunction.toJavaSerializableFunction(p => {
+        val columnRecordsSeq: Seq[HoodieMetadataColumnStats] = p.getValue.asScala.toSeq
+        val fileName: String = p.getKey
+        val valueCount: Long = columnRecordsSeq.head.getValueCount
+        // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
+        // to align existing column-stats for individual file with the list of expected ones for the
+        // whole transposed projection (a superset of all files)
+        val columnRecordsMap = => (r.getColumnName, r)).toMap
+        val alignedColStatRecordsSeq =
+        val coalescedRowValuesSeq =
+          alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, valueCount)) {
+            case (acc, opt) =>
+              opt match {
+                case Some(colStatRecord) =>
+                  acc ++= Seq(colStatRecord.getMinValue, colStatRecord.getMaxValue, colStatRecord.getNullCount)
+                case None =>
+                  // NOTE: This could occur in either of the following cases:
+                  //    1. Column is not indexed in Column Stats Index: in this case we won't be returning
+                  //       any statistics for such column (ie all stats will be null)
+                  //    2. Particular file does not have this particular column (which is indexed by Column Stats Index):
+                  //       in this case we're assuming missing column to essentially contain exclusively
+                  //       null values, we set min/max values as null and null-count to be equal to value-count (this
+                  //       behavior is consistent with reading non-existent columns from Parquet)
+                  //
+                  // This is a way to determine current column's index without explicit iteration (we're adding 3 stats / column)
+                  val idx = acc.length / 3
+                  val colName = sortedTargetColumns(idx)
+                  val indexed = indexedColumns.contains(colName)
+                  val nullCount = if (indexed) valueCount else null
+                  acc ++= Seq(null, null, nullCount)
+              }
+          }
+        Row(coalescedRowValuesSeq:_*)
+      }))
     // NOTE: It's crucial to maintain appropriate ordering of the columns
     //       matching table layout: hence, we cherry-pick individual columns
     //       instead of simply filtering in the ones we're interested in the schema
-    val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema)
+    val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
+    (transposedRows, indexSchema)
+  }
-    spark.createDataFrame(transposedRDD, indexSchema)
+  private def loadColumnStatsIndexForColumnsInternal(targetColumns: Seq[String], shouldReadInMemory: Boolean): DataFrame = {
+    val colStatsDF = {
+      val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+      // NOTE: Explicit conversion is required for Scala 2.11
+      val catalystRows: HoodieData[InternalRow] = colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+        val converter = AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$, columnStatsRecordStructType)
+ => converter(r).orNull).asJava
+      }), false)
+      if (shouldReadInMemory) {
+        // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
+        //       of the transposed table in memory, facilitating execution of the subsequently chained operations
+        //       on it locally (on the driver; all such operations are actually going to be performed by Spark's
+        //       Optimizer)
+        createDataFrameFromInternalRows(spark, catalystRows.collectAsList().asScala, columnStatsRecordStructType)
+      } else {
+        createDataFrameFromRDD(spark, HoodieJavaRDD.getJavaRDD(catalystRows), columnStatsRecordStructType)
+      }
+    }
+ _*)
+  }
+  private def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
+    // Read Metadata Table's Column Stats Index records into [[HoodieData]] container by
+    //    - Fetching the records from CSI by key-prefixes (encoded column names)
+    //    - Extracting [[HoodieMetadataColumnStats]] records
+    //    - Filtering out nulls
+    checkState(targetColumns.nonEmpty)
+    // TODO encoding should be done internally w/in HoodieBackedTableMetadata
+    val encodedTargetColumnNames = => new ColumnIndexID(colName).asBase64EncodedString())
+    val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
+      metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
+    val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+      // NOTE: Explicit conversion is required for Scala 2.11
+ => {
+        toScalaOption(record.getData.getInsertValue(null, null))
+          .map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
+          .orNull
+      }))
+        .filter(JFunction.toJavaSerializableFunction(columnStatsRecord => columnStatsRecord != null))
+    columnStatsRecords
-  private def readFullColumnStatsIndexInternal(spark: SparkSession, metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = {
-    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
+  private def loadFullColumnStatsIndexInternal(): DataFrame = {
+    val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString)
     // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+    val colStatsDF ="org.apache.hudi")
-  }
-  private def readColumnStatsIndexForColumnsInternal(spark: SparkSession, targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig, tableBasePath: String) = {
-    val ctx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
-    // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by
-    //    - Fetching the records from CSI by key-prefixes (encoded column names)
-    //    - Deserializing fetched records into [[InternalRow]]s
-    //    - Composing [[DataFrame]]
-    val metadataTableDF = {
-      val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig, tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-      // TODO encoding should be done internally w/in HoodieBackedTableMetadata
-      val encodedTargetColumnNames = => new ColumnIndexID(colName).asBase64EncodedString())
-      val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] =
-        HoodieJavaRDD.getJavaRDD(
-          metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
-        )
-      val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it =>
-        val metadataRecordSchema = new Parser().parse(metadataRecordSchemaString)
-        val converter = AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema, metadataRecordStructType)
- { record =>
-          // schema and props are ignored for generating metadata record from the payload
-          // instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
-          toScalaOption(record.getData.getInsertValue(null, null))
-            .flatMap(avroRecord => converter(avroRecord.asInstanceOf[GenericRecord]))
-            .orNull
-        }
-      }
+    val requiredIndexColumns =
+ =>
+        col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}"))
-      HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD, metadataRecordStructType)
-    }
-    metadataTableDF
+    colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+      .select(requiredIndexColumns: _*)
 object ColumnStatsIndexSupport {
-  private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString
-  private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
+  private val columnStatsIndexProjectionSizeInMemoryThreshold = 100000

Review Comment:
   > could you explain a little why set `columnStatsIndexProjectionSizeInMemoryThreshold` to 100,000?
   Is it possible to configure by parameter default value?

