You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/09/21 19:23:11 UTC

[GitHub] [spark] sunchao commented on a change in pull request #33639: [SPARK-36645][SQL] Aggregate (Min/Max/Count) push down for Parquet

sunchao commented on a change in pull request #33639:
URL: https://github.com/apache/spark/pull/33639#discussion_r711225582



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -858,6 +858,14 @@ object SQLConf {
       .checkValue(threshold => threshold >= 0, "The threshold must not be negative.")
       .createWithDefault(10)
 
+  val PARQUET_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.aggregatePushdown")
+    .doc("If true, MAX/MIN/COUNT without filter and group by will be pushed" +
+      " down to Parquet for optimization. MAX/MIN/COUNT for Complex type and Timestamp" +

Review comment:
       nit: `Complex type and Timestamp` => `complex types and timestamp`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,206 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, aggSchema,
+      None, datetimeRebaseMode, LegacyBehaviorPolicy.CORRECTED, NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (_, i) =>
+        throw new SparkException("Unexpected parquet type name: " + primitiveTypeName(i))
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct a ColumnarBatch from these aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      datetimeRebaseMode,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(1, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(1, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType]` and Array[Any].
+   *         The first element is the Parquet PrimitiveType of the aggregate column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks
+    val primitiveTypeBuilder = mutable.ArrayBuilder.make[PrimitiveType]
+    val valuesBuilder = mutable.ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      var schemaName = ""
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns
+        agg match {
+          case max: Max =>
+            val colName = max.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)

Review comment:
       I wonder if we should consider case sensitivity in these cases too

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,206 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, aggSchema,
+      None, datetimeRebaseMode, LegacyBehaviorPolicy.CORRECTED, NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (_, i) =>
+        throw new SparkException("Unexpected parquet type name: " + primitiveTypeName(i))
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct a ColumnarBatch from these aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      datetimeRebaseMode,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(1, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(1, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType]` and Array[Any].
+   *         The first element is the Parquet PrimitiveType of the aggregate column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks
+    val primitiveTypeBuilder = mutable.ArrayBuilder.make[PrimitiveType]
+    val valuesBuilder = mutable.ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      var schemaName = ""
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns
+        agg match {
+          case max: Max =>
+            val colName = max.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "max(" + colName + ")"
+            val currentMax = getCurrentBlockMaxOrMin(blockMetaData, index, true)
+            if (value == None || currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0) {
+              value = currentMax
+            }
+          case min: Min =>
+            val colName = min.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "min(" + colName + ")"
+            val currentMin = getCurrentBlockMaxOrMin(blockMetaData, index, false)
+            if (value == None || currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0) {
+              value = currentMin
+            }
+          case count: Count =>
+            schemaName = "count(" + count.column.fieldNames.head + ")"
+            rowCount += block.getRowCount
+            var isPartitionCol = false
+            if (partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive))
+              .toSet.contains(count.column().fieldNames.head)) {
+              isPartitionCol = true
+            }
+            isCount = true
+            if(!isPartitionCol) {

Review comment:
       nit: space

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,206 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, aggSchema,
+      None, datetimeRebaseMode, LegacyBehaviorPolicy.CORRECTED, NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (_, i) =>
+        throw new SparkException("Unexpected parquet type name: " + primitiveTypeName(i))
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct a ColumnarBatch from these aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      datetimeRebaseMode,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(1, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(1, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType]` and Array[Any].
+   *         The first element is the Parquet PrimitiveType of the aggregate column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks
+    val primitiveTypeBuilder = mutable.ArrayBuilder.make[PrimitiveType]
+    val valuesBuilder = mutable.ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      var schemaName = ""
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns
+        agg match {
+          case max: Max =>
+            val colName = max.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "max(" + colName + ")"
+            val currentMax = getCurrentBlockMaxOrMin(blockMetaData, index, true)
+            if (value == None || currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0) {
+              value = currentMax
+            }
+          case min: Min =>
+            val colName = min.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "min(" + colName + ")"
+            val currentMin = getCurrentBlockMaxOrMin(blockMetaData, index, false)
+            if (value == None || currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0) {
+              value = currentMin
+            }
+          case count: Count =>
+            schemaName = "count(" + count.column.fieldNames.head + ")"
+            rowCount += block.getRowCount
+            var isPartitionCol = false
+            if (partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive))
+              .toSet.contains(count.column().fieldNames.head)) {
+              isPartitionCol = true
+            }
+            isCount = true
+            if(!isPartitionCol) {
+              index = dataSchema.fieldNames.toList.indexOf(count.column.fieldNames.head)
+              // Count(*) includes the null values, but Count (colName) doesn't.

Review comment:
       nit: `Count(colName)`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
##########
@@ -79,6 +83,25 @@ case class ParquetPartitionReaderFactory(
   private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
   private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
   private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+  private val columnBatchSize = sqlConf.columnBatchSize
+
+  private def getFooter(file: PartitionedFile): ParquetMetadata = {

Review comment:
       I think we should consider the start and length of the `file` and only do aggregation on those filtered row group metadata?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,206 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, aggSchema,
+      None, datetimeRebaseMode, LegacyBehaviorPolicy.CORRECTED, NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (_, i) =>
+        throw new SparkException("Unexpected parquet type name: " + primitiveTypeName(i))
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct a ColumnarBatch from these aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      datetimeRebaseMode,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(1, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(1, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType]` and Array[Any].
+   *         The first element is the Parquet PrimitiveType of the aggregate column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks
+    val primitiveTypeBuilder = mutable.ArrayBuilder.make[PrimitiveType]
+    val valuesBuilder = mutable.ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      var schemaName = ""
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns
+        agg match {
+          case max: Max =>
+            val colName = max.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "max(" + colName + ")"
+            val currentMax = getCurrentBlockMaxOrMin(blockMetaData, index, true)
+            if (value == None || currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0) {
+              value = currentMax
+            }
+          case min: Min =>
+            val colName = min.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "min(" + colName + ")"
+            val currentMin = getCurrentBlockMaxOrMin(blockMetaData, index, false)
+            if (value == None || currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0) {
+              value = currentMin
+            }
+          case count: Count =>
+            schemaName = "count(" + count.column.fieldNames.head + ")"
+            rowCount += block.getRowCount
+            var isPartitionCol = false
+            if (partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive))
+              .toSet.contains(count.column().fieldNames.head)) {
+              isPartitionCol = true
+            }
+            isCount = true
+            if(!isPartitionCol) {
+              index = dataSchema.fieldNames.toList.indexOf(count.column.fieldNames.head)
+              // Count(*) includes the null values, but Count (colName) doesn't.
+              rowCount -= getNumNulls(blockMetaData, index)
+            }
+          case _: CountStar =>
+            schemaName = "count(*)"
+            rowCount += block.getRowCount
+            isCount = true
+          case _ =>
+        }
+      }
+      if (isCount) {
+        valuesBuilder += rowCount
+        primitiveTypeBuilder += Types.required(PrimitiveTypeName.INT64).named(schemaName);
+      } else {
+        valuesBuilder += value
+        val field = fields.get(index)
+        primitiveTypeBuilder += Types.required(field.asPrimitiveType().getPrimitiveTypeName)
+          .as(field.getLogicalTypeAnnotation)
+          .length(field.asPrimitiveType().getTypeLength)
+          .named(schemaName)
+      }
+    }
+    (primitiveTypeBuilder.result(), valuesBuilder.result())
+  }
+
+  /**
+   * Get the Max or Min value for ith column in the current block
+   *
+   * @return the Max or Min value
+   */
+  private def getCurrentBlockMaxOrMin(
+      columnChunkMetaData: util.List[ColumnChunkMetaData],
+      i: Int,
+      isMax: Boolean): Any = {
+    val statistics = columnChunkMetaData.get(i).getStatistics()
+    if (!statistics.hasNonNullValue) {
+      throw new UnsupportedOperationException("No min/max found for Parquet file, Set SQLConf" +
+        s" ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to false and execute again")
+    } else {
+      if (isMax) statistics.genericGetMax() else statistics.genericGetMin()
+    }
+  }
+
+  private def getNumNulls(
+      columnChunkMetaData: util.List[ColumnChunkMetaData],
+      i: Int): Long = {
+    val statistics = columnChunkMetaData.get(i).getStatistics()
+    if (!statistics.isNumNullsSet()) {
+      throw new UnsupportedOperationException("Number of nulls not set for Parquet file." +
+        s" Set SQLConf ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to false and execute again")
+    }
+    statistics.getNumNulls();

Review comment:
       nit: remove `();`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,206 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, aggSchema,
+      None, datetimeRebaseMode, LegacyBehaviorPolicy.CORRECTED, NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (_, i) =>
+        throw new SparkException("Unexpected parquet type name: " + primitiveTypeName(i))
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct a ColumnarBatch from these aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      datetimeRebaseMode,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(1, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(1, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType]` and Array[Any].
+   *         The first element is the Parquet PrimitiveType of the aggregate column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks
+    val primitiveTypeBuilder = mutable.ArrayBuilder.make[PrimitiveType]
+    val valuesBuilder = mutable.ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      var schemaName = ""
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns
+        agg match {
+          case max: Max =>
+            val colName = max.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "max(" + colName + ")"
+            val currentMax = getCurrentBlockMaxOrMin(blockMetaData, index, true)
+            if (value == None || currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0) {
+              value = currentMax
+            }
+          case min: Min =>
+            val colName = min.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "min(" + colName + ")"
+            val currentMin = getCurrentBlockMaxOrMin(blockMetaData, index, false)
+            if (value == None || currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0) {
+              value = currentMin
+            }
+          case count: Count =>
+            schemaName = "count(" + count.column.fieldNames.head + ")"
+            rowCount += block.getRowCount
+            var isPartitionCol = false
+            if (partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive))
+              .toSet.contains(count.column().fieldNames.head)) {
+              isPartitionCol = true
+            }
+            isCount = true
+            if(!isPartitionCol) {
+              index = dataSchema.fieldNames.toList.indexOf(count.column.fieldNames.head)
+              // Count(*) includes the null values, but Count (colName) doesn't.
+              rowCount -= getNumNulls(blockMetaData, index)
+            }
+          case _: CountStar =>
+            schemaName = "count(*)"
+            rowCount += block.getRowCount
+            isCount = true
+          case _ =>
+        }
+      }
+      if (isCount) {
+        valuesBuilder += rowCount
+        primitiveTypeBuilder += Types.required(PrimitiveTypeName.INT64).named(schemaName);
+      } else {
+        valuesBuilder += value
+        val field = fields.get(index)
+        primitiveTypeBuilder += Types.required(field.asPrimitiveType().getPrimitiveTypeName)
+          .as(field.getLogicalTypeAnnotation)
+          .length(field.asPrimitiveType().getTypeLength)
+          .named(schemaName)
+      }
+    }
+    (primitiveTypeBuilder.result(), valuesBuilder.result())
+  }
+
+  /**
+   * Get the Max or Min value for ith column in the current block
+   *
+   * @return the Max or Min value
+   */
+  private def getCurrentBlockMaxOrMin(
+      columnChunkMetaData: util.List[ColumnChunkMetaData],
+      i: Int,
+      isMax: Boolean): Any = {
+    val statistics = columnChunkMetaData.get(i).getStatistics()

Review comment:
       nit: remove `()` from `getStatistics()`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,206 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics information
+   * from Parquet footer file, and then construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =

Review comment:
       nit: maybe call this `primitiveTypes`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
##########
@@ -79,6 +83,25 @@ case class ParquetPartitionReaderFactory(
   private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
   private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
   private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+  private val columnBatchSize = sqlConf.columnBatchSize

Review comment:
       nit: remove unused




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org