You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/17 00:33:55 UTC

[GitHub] [spark] c21 opened a new pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

c21 opened a new pull request #34298:
URL: https://github.com/apache/spark/pull/34298


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR is to add aggregate push down feature for ORC data source v2 reader.
   
   At a high level, the PR does:
   
   * The supported aggregate expression is MIN/MAX/COUNT same as [Parquet aggregate push down](https://github.com/apache/spark/pull/33639).
   * Nested column, partition column, column with Timestamp and Binary type are disallowed in MIN/MAX aggregate push down. All other columns types are supported in MIN/MAX aggregate push down.
   * All columns types are supported in COUNT aggregate push down.
   * Nested column's sub-fields are disallowed in aggregate push down.
   * If the file does not have valid statistics, Spark will throw exception and fail query.
   * If aggregate has filter or group-by column, aggregate will not be pushed down.
   
   At code level, the PR does:
   * `OrcScanBuilder`: `pushAggregation()` checks whether the aggregation can be pushed down. The most checking logic is shared between Parquet and ORC, extracted into `AggregatePushDownUtils.getSchemaForPushedAggregation()`. `OrcScanBuilder` will create a `OrcScan` with aggregation and aggregation data schema.
   * `OrcScan`: `createReaderFactory` creates a ORC reader factory with aggregation and schema. Similar change with `ParquetScan`.
   * `OrcPartitionReaderFactory`: `buildReaderWithAggregates` creates a ORC reader with aggregate push down (i.e. read ORC file footer to process columns statistics, instead of reading actual data in the file). `buildColumnarReaderWithAggregates` creates a columnar ORC reader similarly. Both delegate the real work to read footer in `OrcUtils.createAggInternalRowFromFooter`.
   * `OrcUtils.createAggInternalRowFromFooter`: reads ORC file footer to process columns statistics (real heavy lift happens here). Similar to `ParquetUtils.createAggInternalRowFromFooter`. Leverage utility method such as `OrcFooterReader.readStatistics`.
   * `OrcFooterReader`: `readStatistics` reads the ORC `ColumnStatistics[]` into Spark `OrcColumnsStatistics`. The transformation is needed here, because ORC `ColumnStatistics[]` stores all columns statistics in a flatten array style, and hard to process. Spark `OrcColumnsStatistics` stores the statistics in nested tree structure (e.g. like `StructType`). This is used by `OrcUtils.createAggInternalRowFromFooter`
   * `OrcColumnsStatistics`: the easy-to-manipulate structure for ORC `ColumnStatistics`. This is used by `OrcFooterReader.readStatistics`.
   
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   To improve the performance of query with aggregate.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes. A user-facing config `spark.sql.orc.aggregatePushdown` is added to control enabling/disabling the aggregate push down for ORC. By default the feature is disabled.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Added unit test in `FileSourceAggregatePushDownSuite.scala`. Refactored all unit tests in https://github.com/apache/spark/pull/33639, and it now works for both Parquet and ORC.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-945047639


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144326/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r737894051



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnStatistics.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. The flatten array stores all data types (including nested types) in
+ * tree pre-ordering. This is used for aggregate push down in ORC.
+ *
+ * For nested data types (array, map and struct), the sub-field statistics are stored recursively
+ * inside parent column's `children` field. Here is an example of `OrcColumnStatistics`:
+ *
+ * Data schema:
+ * c1: int
+ * c2: struct<f1: int, f2: float>
+ * c3: map<key: int, value: string>
+ * c4: array<int>
+ *
+ *                        OrcColumnStatistics

Review comment:
       👍 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)

Review comment:
       I'm just curious, since from https://github.com/apache/orc/blob/main/proto/orc_proto.proto, min/max are optional fields, and ORC's `[ColumnStatisticsImpl](https://github.com/apache/orc/blob/main/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java#L337)` also doesn't set `minimum` or `maximum` if the fields from protobuf are not defined.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r737762020



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],
+      dataFilters: Seq[Expression],
+      isAllowedTypeForMinMaxAggregate: DataType => Boolean,
+      sparkSession: SparkSession): Option[StructType] = {
+
+    var finalSchema = new StructType()
+
+    def getStructFieldForCol(col: NamedReference): StructField = {
+      schema.apply(col.fieldNames.head)
+    }
+
+    def isPartitionCol(col: NamedReference) = {
+      partitionNameSet.contains(col.fieldNames.head)
+    }
+
+    def processMinOrMax(agg: AggregateFunc): Boolean = {
+      val (column, aggType) = agg match {
+        case max: Max => (max.column, "max")
+        case min: Min => (min.column, "min")
+        case _ =>
+          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
+      }
+
+      if (isPartitionCol(column)) {
+        // don't push down partition column, footer doesn't have max/min for partition column
+        return false
+      }
+      val structField = getStructFieldForCol(column)
+
+      if (isAllowedTypeForMinMaxAggregate(structField.dataType)) {
+        finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
+        true
+      } else {
+        false
+      }
+    }
+
+    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+      // Parquet/ORC footer has max/min/count for columns
+      // e.g. SELECT COUNT(col1) FROM t
+      // but footer doesn't have max/min/count for a column if max/min/count
+      // are combined with filter or group by
+      // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+      //      SELECT COUNT(col1) FROM t GROUP BY col2
+      // Todo: 1. add support if groupby column is partition col

Review comment:
       @viirya - rebased.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-950040725


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144547/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r734787209



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnsStatistics.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,

Review comment:
       Yes, added comment.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>
+          new Text(if (isMax) s.getMaximum else s.getMinimum)
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+          value
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+          value
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values
+          // for ColumnStatistics of individual column. In addition to this, ORC also returns
+          // number of non-null and null values for its top-level
+          // ColumnStatistics.getNumberOfValues().
+          val nonNullRowsCount = if (isPartitionColumn) {
+            val topLevelStatistics = columnsStatistics.getStatistics
+            if (topLevelStatistics.hasNull) {
+              throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +
+                s"values: $topLevelStatistics. Aggregate expression: $count")
+            }
+            topLevelStatistics.getNumberOfValues
+          } else {
+            getColumnStatistics(columnName).getNumberOfValues
+          }
+          new LongWritable(nonNullRowsCount)
+        case (_: CountStar, _) =>
+          // Count(*) includes both null and non-null values.
+          val topLevelStatistics = columnsStatistics.getStatistics
+          if (topLevelStatistics.hasNull) {
+            throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +

Review comment:
       @sunchao - yes same as above, this error message is quite confusing. Removed.

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnsStatistics.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. This is used for aggregate push down in ORC.
+ */
+public class OrcColumnsStatistics {

Review comment:
       @sunchao - updated with new name.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],
+      dataFilters: Seq[Expression],
+      isAllowedTypeForMinMaxAggregate: DataType => Boolean,
+      sparkSession: SparkSession): Option[StructType] = {

Review comment:
       @sunchao - sorry, removed.

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcFooterReader.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.spark.sql.types.*;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * `OrcFooterReader` is a util class which encapsulates the helper
+ * methods of reading ORC file footer.
+ */
+public class OrcFooterReader {
+
+  /**
+   * Read the columns statistics from ORC file footer.
+   *
+   * @param orcReader the reader to read ORC file footer.
+   * @return Statistics for all columns in the file.
+   */
+  public static OrcColumnsStatistics readStatistics(Reader orcReader) {
+    TypeDescription orcSchema = orcReader.getSchema();
+    ColumnStatistics[] orcStatistics = orcReader.getStatistics();
+    StructType dataType = OrcUtils.toCatalystSchema(orcSchema);

Review comment:
       @sunchao - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>

Review comment:
       @sunchao - you are right. We should not push down string for ORC, and I just checked the [string type MIN/MAX are truncated by default if exceeding 1024 characters](https://github.com/apache/orc/blob/main/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java#L719-L723). Removed, thanks.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
##########
@@ -87,84 +86,45 @@ case class ParquetScanBuilder(
   override def pushedFilters(): Array[Filter] = pushedParquetFilters
 
   override def pushAggregation(aggregation: Aggregation): Boolean = {
-
-    def getStructFieldForCol(col: NamedReference): StructField = {
-      schema.nameToField(col.fieldNames.head)
-    }
-
-    def isPartitionCol(col: NamedReference) = {
-      partitionNameSet.contains(col.fieldNames.head)
+    if (!sparkSession.sessionState.conf.parquetAggregatePushDown) {
+      return false
     }
 
-    def processMinOrMax(agg: AggregateFunc): Boolean = {
-      val (column, aggType) = agg match {
-        case max: Max => (max.column, "max")
-        case min: Min => (min.column, "min")
-        case _ =>
-          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
-      }
-
-      if (isPartitionCol(column)) {
-        // don't push down partition column, footer doesn't have max/min for partition column
-        return false
-      }
-      val structField = getStructFieldForCol(column)
-
-      structField.dataType match {
-        // not push down complex type
-        // not push down Timestamp because INT96 sort order is undefined,
-        // Parquet doesn't return statistics for INT96
-        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType =>
+    def isAllowedTypeForMinMaxAggregate(dataType: DataType): Boolean = {
+      dataType match {
+        // Not push down complex type.
+        // Not push down Timestamp because INT96 sort order is undefined,
+        // Parquet doesn't return statistics for INT96.
+        // Not push down Binary type as Parquet can truncate the statistics.
+        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>

Review comment:
       @sunchao - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],

Review comment:
       @sunchao - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +

Review comment:
       @sunchao - sorry, added.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>
+          new Text(if (isMax) s.getMaximum else s.getMinimum)
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+          value

Review comment:
       @sunchao - removed.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
##########
@@ -123,36 +126,11 @@ abstract class ParquetAggregatePushDownSuite
     }
   }
 
-  test("aggregate push down - Count(partition Col): push down") {

Review comment:
       @huaxingao - added back.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,

Review comment:
       @sunchao - sorry, removed.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>
+          new Text(if (isMax) s.getMaximum else s.getMinimum)
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+          value
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+          value
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values

Review comment:
       @sunchao - rephrased a bit. ORC stores number of non-null values in each column's statistics. In addition to that, ORC also stores number of all values (null and non-null) separately in statistics (it's stored separately from any column's statistics). 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
##########
@@ -37,35 +38,65 @@ case class OrcScan(
     readDataSchema: StructType,
     readPartitionSchema: StructType,
     options: CaseInsensitiveStringMap,
+    pushedAggregate: Option[Aggregation] = None,
     pushedFilters: Array[Filter],
     partitionFilters: Seq[Expression] = Seq.empty,
     dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
-  override def isSplitable(path: Path): Boolean = true
+  override def isSplitable(path: Path): Boolean = {
+    // If aggregate is pushed down, only the file footer will be read once,
+    // so file should be not split across multiple tasks.
+    pushedAggregate.isEmpty

Review comment:
       @sunchao - agreed, that's why I diverge from Parquet code path for this. We should make sure the file only being processed by only 1 task. Splitting the file across multiple tasks is weird and useless. I can make a change on Parquet side later after this PR is merged.
   
   > Also maybe we should change how we measure file weight when combining tasks for aggregate pushdown, since we can combine multiple large files into a single task as computing stats is much cheaper.
   
   Yes I thought this as well. It's not trivial though as we need to come up with another heuristics to decide how do we combine files when aggregate is pushed down.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>
+          new Text(if (isMax) s.getMaximum else s.getMinimum)
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+          value
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+          value
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values
+          // for ColumnStatistics of individual column. In addition to this, ORC also returns
+          // number of non-null and null values for its top-level
+          // ColumnStatistics.getNumberOfValues().
+          val nonNullRowsCount = if (isPartitionColumn) {
+            val topLevelStatistics = columnsStatistics.getStatistics
+            if (topLevelStatistics.hasNull) {
+              throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +

Review comment:
       @sunchao - here it means the ORC file is invalid. Actually we don't need this check, as ORC guarantees this and this error message is quite confusing. Removed.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
##########
@@ -58,4 +72,35 @@ case class OrcScanBuilder(
       Array.empty[Filter]
     }
   }
+
+  override def pushAggregation(aggregation: Aggregation): Boolean = {
+    if (!sparkSession.sessionState.conf.orcAggregatePushDown) {
+      return false
+    }
+
+    def isAllowedTypeForMinMaxAggregate(dataType: DataType): Boolean = {
+      dataType match {
+        // Not push down complex and Timestamp type.
+        // Not push down Binary type as ORC does not write min/max statistics for it.
+        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>

Review comment:
       @sunchao - yes, updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953541079


   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r738654047



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,116 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    var columnsStatistics: OrcColumnStatistics = null
+    try {
+      columnsStatistics = OrcFooterReader.readStatistics(reader)
+    } catch { case e: RuntimeException =>
+      throw new SparkException(
+        s"Cannot read columns statistics in file: $filePath. Please consider disabling " +
+        s"ORC aggregate push down by setting 'spark.sql.orc.aggregatePushdown' to false.", e)
+    }

Review comment:
       I think the only case is the statistics is missing and `OrcFooterReader.convertStatistics():orcStatistics.remove()` throws `RuntimeException`. But just to be safe, I changed to match all `Exception` here.

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnStatistics.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. The flatten array stores all data types (including nested types) in
+ * tree pre-ordering. This is used for aggregate push down in ORC.
+ *
+ * For nested data types (array, map and struct), the sub-field statistics are stored recursively
+ * inside parent column's `children` field. Here is an example of `OrcColumnStatistics`:

Review comment:
       @huaxingao - thanks, updated for all class-level comments. For method comments or comments in method's body, the {@code} does not work, so not changed. There are also back quotes used in comments in our code base as well for comments in body of method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954128582


   **[Test build #144724 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144724/testReport)** for PR 34298 at commit [`d85d4ba`](https://github.com/apache/spark/commit/d85d4ba720a0936ee636320be8cfefe9a72565f9).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952542508


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144632/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953398265


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144676/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954128582


   **[Test build #144724 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144724/testReport)** for PR 34298 at commit [`d85d4ba`](https://github.com/apache/spark/commit/d85d4ba720a0936ee636320be8cfefe9a72565f9).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954315584


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144724/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r737900404



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)

Review comment:
       I'm just curious, since from https://github.com/apache/orc/blob/main/proto/orc_proto.proto, min/max are optional fields, and ORC's [ColumnStatisticsImpl](https://github.com/apache/orc/blob/main/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java#L337) also doesn't set `minimum` or `maximum` if the fields from protobuf are not defined.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r730199840



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
##########
@@ -115,7 +115,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
   def names: Array[String] = fieldNames
 
   private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
-  private[sql] lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
+  private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap

Review comment:
       This reverts the change in https://github.com/apache/spark/pull/33639, as we don't need to make it more public.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
##########
@@ -353,33 +325,33 @@ abstract class ParquetAggregatePushDownSuite
 
     val rdd = sparkContext.parallelize(rows)
     withTempPath { file =>
-      spark.createDataFrame(rdd, schema).write.parquet(file.getCanonicalPath)
+      spark.createDataFrame(rdd, schema).write.format(format).save(file.getCanonicalPath)
       withTempView("test") {
-        spark.read.parquet(file.getCanonicalPath).createOrReplaceTempView("test")
-        val enableVectorizedReader = Seq("false", "true")
-        for (testVectorizedReader <- enableVectorizedReader) {
-          withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
-            vectorizedReaderEnabledKey -> testVectorizedReader) {
+        spark.read.format(format).load(file.getCanonicalPath).createOrReplaceTempView("test")
+
+        Seq("false", "true").foreach { enableVectorizedReader =>
+          withSQLConf(aggPushDownEnabledKey -> "true",
+            vectorizedReaderEnabledKey -> enableVectorizedReader) {
 
             val testMinWithTS = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " +
-              "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " +

Review comment:
       Removed the test for Binary column in MIN/MAX here, as we are discussing to remove the support in Parquet, and ORC does not support it at all.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
##########
@@ -87,84 +86,45 @@ case class ParquetScanBuilder(
   override def pushedFilters(): Array[Filter] = pushedParquetFilters
 
   override def pushAggregation(aggregation: Aggregation): Boolean = {
-
-    def getStructFieldForCol(col: NamedReference): StructField = {
-      schema.nameToField(col.fieldNames.head)
-    }
-
-    def isPartitionCol(col: NamedReference) = {
-      partitionNameSet.contains(col.fieldNames.head)
+    if (!sparkSession.sessionState.conf.parquetAggregatePushDown) {
+      return false
     }
 
-    def processMinOrMax(agg: AggregateFunc): Boolean = {
-      val (column, aggType) = agg match {
-        case max: Max => (max.column, "max")
-        case min: Min => (min.column, "min")
-        case _ =>
-          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
-      }
-
-      if (isPartitionCol(column)) {
-        // don't push down partition column, footer doesn't have max/min for partition column
-        return false
-      }
-      val structField = getStructFieldForCol(column)
-
-      structField.dataType match {
-        // not push down complex type
-        // not push down Timestamp because INT96 sort order is undefined,
-        // Parquet doesn't return statistics for INT96
-        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType =>
+    def isAllowedTypeForMinMaxAggregate(dataType: DataType): Boolean = {
+      dataType match {
+        // Not push down complex type.
+        // Not push down Timestamp because INT96 sort order is undefined,
+        // Parquet doesn't return statistics for INT96.
+        // Not push down Binary type as Parquet can truncate the statistics.
+        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>

Review comment:
       Adding in Parquet to disallow `BinaryType` here. This would make the unit tests between Parquet and ORC easier, and we are discussing to disallow it. cc @huaxingao feel free to let me revert the change if it does not make sense. Thanks.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],
+      dataFilters: Seq[Expression],
+      isAllowedTypeForMinMaxAggregate: DataType => Boolean,
+      sparkSession: SparkSession): Option[StructType] = {
+
+    var finalSchema = new StructType()
+
+    def getStructFieldForCol(col: NamedReference): StructField = {
+      schema.apply(col.fieldNames.head)
+    }
+
+    def isPartitionCol(col: NamedReference) = {
+      partitionNameSet.contains(col.fieldNames.head)
+    }
+
+    def processMinOrMax(agg: AggregateFunc): Boolean = {
+      val (column, aggType) = agg match {
+        case max: Max => (max.column, "max")
+        case min: Min => (min.column, "min")
+        case _ =>
+          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
+      }
+
+      if (isPartitionCol(column)) {
+        // don't push down partition column, footer doesn't have max/min for partition column
+        return false
+      }
+      val structField = getStructFieldForCol(column)
+
+      if (isAllowedTypeForMinMaxAggregate(structField.dataType)) {
+        finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
+        true
+      } else {
+        false
+      }
+    }
+
+    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+      // Parquet/ORC footer has max/min/count for columns
+      // e.g. SELECT COUNT(col1) FROM t
+      // but footer doesn't have max/min/count for a column if max/min/count
+      // are combined with filter or group by
+      // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+      //      SELECT COUNT(col1) FROM t GROUP BY col2
+      // Todo: 1. add support if groupby column is partition col

Review comment:
       @viirya - https://github.com/apache/spark/pull/34248 is not merged yet, I can do a rebase later once it's merged.

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnsStatistics.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. This is used for aggregate push down in ORC.
+ */
+public class OrcColumnsStatistics {

Review comment:
       No fundamental reason actually. Was following `OrcColumnVector` where it converts some ORC object to its counterpart in Spark. I can move if needed.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],
+      dataFilters: Seq[Expression],
+      isAllowedTypeForMinMaxAggregate: DataType => Boolean,
+      sparkSession: SparkSession): Option[StructType] = {
+
+    var finalSchema = new StructType()
+
+    def getStructFieldForCol(col: NamedReference): StructField = {
+      schema.apply(col.fieldNames.head)
+    }
+
+    def isPartitionCol(col: NamedReference) = {
+      partitionNameSet.contains(col.fieldNames.head)
+    }
+
+    def processMinOrMax(agg: AggregateFunc): Boolean = {
+      val (column, aggType) = agg match {
+        case max: Max => (max.column, "max")
+        case min: Min => (min.column, "min")
+        case _ =>
+          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
+      }
+
+      if (isPartitionCol(column)) {
+        // don't push down partition column, footer doesn't have max/min for partition column
+        return false
+      }
+      val structField = getStructFieldForCol(column)
+
+      if (isAllowedTypeForMinMaxAggregate(structField.dataType)) {
+        finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
+        true
+      } else {
+        false
+      }
+    }
+
+    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+      // Parquet/ORC footer has max/min/count for columns
+      // e.g. SELECT COUNT(col1) FROM t
+      // but footer doesn't have max/min/count for a column if max/min/count
+      // are combined with filter or group by
+      // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+      //      SELECT COUNT(col1) FROM t GROUP BY col2
+      // Todo: 1. add support if groupby column is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36646)
+      //       2. add support if filter col is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36647)
+      return None
+    }
+
+    aggregation.groupByColumns.foreach { col =>

Review comment:
       Good call. Didn't change it when moving logic from https://github.com/apache/spark/commit/128168d8c4019a1e10a9f1be734868524f6a09f0 to here. Will update.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],
+      dataFilters: Seq[Expression],
+      isAllowedTypeForMinMaxAggregate: DataType => Boolean,
+      sparkSession: SparkSession): Option[StructType] = {
+
+    var finalSchema = new StructType()
+
+    def getStructFieldForCol(col: NamedReference): StructField = {
+      schema.apply(col.fieldNames.head)
+    }
+
+    def isPartitionCol(col: NamedReference) = {
+      partitionNameSet.contains(col.fieldNames.head)
+    }
+
+    def processMinOrMax(agg: AggregateFunc): Boolean = {
+      val (column, aggType) = agg match {
+        case max: Max => (max.column, "max")
+        case min: Min => (min.column, "min")
+        case _ =>
+          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
+      }
+
+      if (isPartitionCol(column)) {
+        // don't push down partition column, footer doesn't have max/min for partition column
+        return false
+      }
+      val structField = getStructFieldForCol(column)
+
+      if (isAllowedTypeForMinMaxAggregate(structField.dataType)) {
+        finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
+        true
+      } else {
+        false
+      }
+    }
+
+    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+      // Parquet/ORC footer has max/min/count for columns
+      // e.g. SELECT COUNT(col1) FROM t
+      // but footer doesn't have max/min/count for a column if max/min/count
+      // are combined with filter or group by
+      // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+      //      SELECT COUNT(col1) FROM t GROUP BY col2
+      // Todo: 1. add support if groupby column is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36646)
+      //       2. add support if filter col is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36647)
+      return None
+    }
+
+    aggregation.groupByColumns.foreach { col =>

Review comment:
       @viirya - updated, thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r737923928



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)

Review comment:
       I see. Thanks @c21 !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r737115431



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNames: Set[String],
+      dataFilters: Seq[Expression]): Option[StructType] = {
+
+    var finalSchema = new StructType()
+
+    def getStructFieldForCol(col: NamedReference): StructField = {
+      schema.apply(col.fieldNames.head)
+    }
+
+    def isPartitionCol(col: NamedReference) = {
+      partitionNames.contains(col.fieldNames.head)
+    }
+
+    def processMinOrMax(agg: AggregateFunc): Boolean = {
+      val (column, aggType) = agg match {
+        case max: Max => (max.column, "max")
+        case min: Min => (min.column, "min")
+        case _ =>
+          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
+      }
+
+      if (isPartitionCol(column)) {
+        // don't push down partition column, footer doesn't have max/min for partition column
+        return false
+      }
+      val structField = getStructFieldForCol(column)
+
+      structField.dataType match {
+        // not push down complex type
+        // not push down Timestamp because INT96 sort order is undefined,
+        // Parquet doesn't return statistics for INT96
+        // not push down Parquet Binary because min/max could be truncated
+        // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
+        // could be Spark StringType, BinaryType or DecimalType.
+        // not push down for ORC with same reason.
+        case BooleanType | ByteType | ShortType | IntegerType
+             | LongType | FloatType | DoubleType | DateType =>
+          finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
+          true
+        case _ =>
+          false
+      }
+    }
+
+    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+      // Parquet/ORC footer has max/min/count for columns
+      // e.g. SELECT COUNT(col1) FROM t
+      // but footer doesn't have max/min/count for a column if max/min/count
+      // are combined with filter or group by
+      // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+      //      SELECT COUNT(col1) FROM t GROUP BY col2
+      // Todo: 1. add support if groupby column is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36646)
+      //       2. add support if filter col is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36647)
+      return None
+    }
+
+    aggregation.aggregateExpressions.foreach {
+      case max: Max =>
+        if (!processMinOrMax(max)) return None
+      case min: Min =>
+        if (!processMinOrMax(min)) return None
+      case count: Count =>
+        if (count.column.fieldNames.length != 1 || count.isDistinct) return None
+        finalSchema =
+          finalSchema.add(StructField(s"count(" + count.column.fieldNames.head + ")", LongType))
+      case _: CountStar =>
+        finalSchema = finalSchema.add(StructField("count(*)", LongType))
+      case _ =>
+        return None
+    }
+
+    Some(finalSchema)
+  }
+
+  /**
+   * Check if two Aggregation `a` and `b` is equal or not.
+   */
+  def equivalentAggregations(a: Aggregation, b: Aggregation): Boolean = {

Review comment:
       Oh, I see.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r738042531



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,116 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    var columnsStatistics: OrcColumnStatistics = null
+    try {
+      columnsStatistics = OrcFooterReader.readStatistics(reader)
+    } catch { case e: RuntimeException =>
+      throw new SparkException(
+        s"Cannot read columns statistics in file: $filePath. Please consider disabling " +
+        s"ORC aggregate push down by setting 'spark.sql.orc.aggregatePushdown' to false.", e)
+    }

Review comment:
       I think I need to do the same thing for parquet too. When the columns statistics can't be read, is this guaranteed  to be a `RuntimeException`, or it could be other Exception or Error too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954321576


   Thank you @viirya, @sunchao and @huaxingao for review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-951358417


   @c21 Did you have a chance to test large ORC files with multiple partitions? For Parquet, I did some testing using customer's data, but I still doubt my testing is sufficient. Please have more testing if possible. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r735981668



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
##########
@@ -37,35 +38,65 @@ case class OrcScan(
     readDataSchema: StructType,
     readPartitionSchema: StructType,
     options: CaseInsensitiveStringMap,
+    pushedAggregate: Option[Aggregation] = None,
     pushedFilters: Array[Filter],
     partitionFilters: Seq[Expression] = Seq.empty,
     dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
-  override def isSplitable(path: Path): Boolean = true
+  override def isSplitable(path: Path): Boolean = {
+    // If aggregate is pushed down, only the file footer will be read once,
+    // so file should be not split across multiple tasks.
+    pushedAggregate.isEmpty

Review comment:
       Yes, I agree this is a better approach and Parquet should do this way too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r736213544



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnStatistics.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. The flatten array stores all data types (including nested types) in
+ * tree pre-ordering. This is used for aggregate push down in ORC.
+ */
+public class OrcColumnStatistics {
+  private final ColumnStatistics statistics;
+  private final List<OrcColumnStatistics> children;

Review comment:
       Can you add a few comments about how we store `OrcColumnStatistics`? Especially for map and array types. Although it is understable by reading `convertStatistics`, it is better to let readers/callers quickly know the format.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +

Review comment:
       ditto

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNames: Set[String],
+      dataFilters: Seq[Expression]): Option[StructType] = {
+
+    var finalSchema = new StructType()
+
+    def getStructFieldForCol(col: NamedReference): StructField = {
+      schema.apply(col.fieldNames.head)
+    }
+
+    def isPartitionCol(col: NamedReference) = {
+      partitionNames.contains(col.fieldNames.head)
+    }
+
+    def processMinOrMax(agg: AggregateFunc): Boolean = {
+      val (column, aggType) = agg match {
+        case max: Max => (max.column, "max")
+        case min: Min => (min.column, "min")
+        case _ =>
+          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
+      }
+
+      if (isPartitionCol(column)) {
+        // don't push down partition column, footer doesn't have max/min for partition column
+        return false
+      }
+      val structField = getStructFieldForCol(column)
+
+      structField.dataType match {
+        // not push down complex type
+        // not push down Timestamp because INT96 sort order is undefined,
+        // Parquet doesn't return statistics for INT96
+        // not push down Parquet Binary because min/max could be truncated
+        // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
+        // could be Spark StringType, BinaryType or DecimalType.
+        // not push down for ORC with same reason.
+        case BooleanType | ByteType | ShortType | IntegerType
+             | LongType | FloatType | DoubleType | DateType =>
+          finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
+          true
+        case _ =>
+          false
+      }
+    }
+
+    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+      // Parquet/ORC footer has max/min/count for columns
+      // e.g. SELECT COUNT(col1) FROM t
+      // but footer doesn't have max/min/count for a column if max/min/count
+      // are combined with filter or group by
+      // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+      //      SELECT COUNT(col1) FROM t GROUP BY col2
+      // Todo: 1. add support if groupby column is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36646)
+      //       2. add support if filter col is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36647)
+      return None
+    }
+
+    aggregation.aggregateExpressions.foreach {
+      case max: Max =>
+        if (!processMinOrMax(max)) return None
+      case min: Min =>
+        if (!processMinOrMax(min)) return None
+      case count: Count =>
+        if (count.column.fieldNames.length != 1 || count.isDistinct) return None
+        finalSchema =
+          finalSchema.add(StructField(s"count(" + count.column.fieldNames.head + ")", LongType))
+      case _: CountStar =>
+        finalSchema = finalSchema.add(StructField("count(*)", LongType))
+      case _ =>
+        return None
+    }
+
+    Some(finalSchema)
+  }
+
+  /**
+   * Check if two Aggregation `a` and `b` is equal or not.
+   */
+  def equivalentAggregations(a: Aggregation, b: Aggregation): Boolean = {

Review comment:
       Do we need this? Can't we use `sameResult`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +

Review comment:
       Why not use `DateColumnStatistics` instead of `${statistics.getClass.getName}`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +

Review comment:
       getMinMaxFromColumnStatistics




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r736000999



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +

Review comment:
       nit: add a space at the end

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
##########
@@ -175,24 +175,26 @@ case class ParquetPartitionReaderFactory(
     } else {
       new PartitionReader[ColumnarBatch] {
         private var hasNext = true
-        private val row: ColumnarBatch = {
+        private val batch: ColumnarBatch = {
           val footer = getFooter(file)
           if (footer != null && footer.getBlocks.size > 0) {
-            ParquetUtils.createAggColumnarBatchFromFooter(footer, file.filePath, dataSchema,
-              partitionSchema, aggregation.get, readDataSchema, enableOffHeapColumnVector,
+            val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.filePath,
+              dataSchema, partitionSchema, aggregation.get, readDataSchema,
               getDatetimeRebaseMode(footer.getFileMetaData), isCaseSensitive)
+            AggregatePushDownUtils.convertAggregatesRowToBatch(
+              row, readDataSchema, enableOffHeapColumnVector)

Review comment:
       in case we are using off-heap memory, we might want to check `taskContext.isDefined` since otherwise the task completion listener may not be triggered to free up the memory?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -960,6 +960,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val ORC_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.aggregatePushdown")
+    .doc("If true, MAX/MIN/COUNT without filter and group by will be pushed" +
+      " down to ORC for optimization. MAX/MIN for complex types can't be pushed down")

Review comment:
       nit: does it mean `COUNT` for complex types can be pushed down? maybe make it more explicit.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum

Review comment:
       what if the column has 0 values, will min/max still be defined?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)

Review comment:
       hmm does a ORC file always have stats? 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values
+          // for ColumnStatistics of individual column. In addition to this, ORC also stores number
+          // of all values (null and non-null) separately.
+          val nonNullRowsCount = if (isPartitionColumn) {
+            columnsStatistics.getStatistics.getNumberOfValues

Review comment:
       hm why we can include both null and non-null values when the column is a partition column?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-945022000


   **[Test build #144326 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144326/testReport)** for PR 34298 at commit [`1f36f12`](https://github.com/apache/spark/commit/1f36f12fdfb99bd737f02859da19630e48c09956).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952542508


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144632/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953398265


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144676/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-945032390


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48805/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r730368632



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
##########
@@ -123,36 +126,11 @@ abstract class ParquetAggregatePushDownSuite
     }
   }
 
-  test("aggregate push down - Count(partition Col): push down") {

Review comment:
       @huaxingao - yes we still support. sorry I was removing it by mistake when copying the file. Let me add it back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r730348686



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
##########
@@ -123,36 +126,11 @@ abstract class ParquetAggregatePushDownSuite
     }
   }
 
-  test("aggregate push down - Count(partition Col): push down") {

Review comment:
       We still support push down Count(partition Col), right? Do we still need this test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953395365


   **[Test build #144676 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144676/testReport)** for PR 34298 at commit [`8c7c617`](https://github.com/apache/spark/commit/8c7c6178ae145190a6fae6fd2024946578362312).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952541525


   **[Test build #144632 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144632/testReport)** for PR 34298 at commit [`9b8b9ef`](https://github.com/apache/spark/commit/9b8b9ef7efa3ab055edced6d039cc98867f3483f).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953260405


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49145/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r738040552



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -960,6 +960,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val ORC_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.aggregatePushdown")
+    .doc("If true, aggregates will be pushed down to ORC for optimization. Support MIN, MAX and " +
+      "COUNT as aggregate expression. For MIN/MAX, support boolean, integer, float and date " +
+      "type. For COUNT, support all data types.")

Review comment:
       I thought to just use integer to represent all integer types (byte, short, int, long) and use float here to represent all float types (float and double), to be less verbose. We anyway will update Spark doc on website with more detailed explanation of this aggregate push down feature anyway (ideally a sheet).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r738620943



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnStatistics.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. The flatten array stores all data types (including nested types) in
+ * tree pre-ordering. This is used for aggregate push down in ORC.
+ */
+public class OrcColumnStatistics {
+  private final ColumnStatistics statistics;
+  private final List<OrcColumnStatistics> children;

Review comment:
       thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952411538


   Addressed all comments, and the PR is ready for review again, thanks @viirya, @sunchao and @huaxingao.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954181804


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49194/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r730348674



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
##########
@@ -87,84 +86,45 @@ case class ParquetScanBuilder(
   override def pushedFilters(): Array[Filter] = pushedParquetFilters
 
   override def pushAggregation(aggregation: Aggregation): Boolean = {
-
-    def getStructFieldForCol(col: NamedReference): StructField = {
-      schema.nameToField(col.fieldNames.head)
-    }
-
-    def isPartitionCol(col: NamedReference) = {
-      partitionNameSet.contains(col.fieldNames.head)
+    if (!sparkSession.sessionState.conf.parquetAggregatePushDown) {
+      return false
     }
 
-    def processMinOrMax(agg: AggregateFunc): Boolean = {
-      val (column, aggType) = agg match {
-        case max: Max => (max.column, "max")
-        case min: Min => (min.column, "min")
-        case _ =>
-          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
-      }
-
-      if (isPartitionCol(column)) {
-        // don't push down partition column, footer doesn't have max/min for partition column
-        return false
-      }
-      val structField = getStructFieldForCol(column)
-
-      structField.dataType match {
-        // not push down complex type
-        // not push down Timestamp because INT96 sort order is undefined,
-        // Parquet doesn't return statistics for INT96
-        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType =>
+    def isAllowedTypeForMinMaxAggregate(dataType: DataType): Boolean = {
+      dataType match {
+        // Not push down complex type.
+        // Not push down Timestamp because INT96 sort order is undefined,
+        // Parquet doesn't return statistics for INT96.
+        // Not push down Binary type as Parquet can truncate the statistics.
+        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>

Review comment:
       Looks good. Thanks for adding this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-945031710


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48805/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953303074


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49145/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953232175


   **[Test build #144676 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144676/testReport)** for PR 34298 at commit [`8c7c617`](https://github.com/apache/spark/commit/8c7c6178ae145190a6fae6fd2024946578362312).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r736874359



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -960,6 +960,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val ORC_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.aggregatePushdown")
+    .doc("If true, MAX/MIN/COUNT without filter and group by will be pushed" +
+      " down to ORC for optimization. MAX/MIN for complex types can't be pushed down")

Review comment:
       @sunchao - yes, updated the doc.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
##########
@@ -37,35 +38,65 @@ case class OrcScan(
     readDataSchema: StructType,
     readPartitionSchema: StructType,
     options: CaseInsensitiveStringMap,
+    pushedAggregate: Option[Aggregation] = None,
     pushedFilters: Array[Filter],
     partitionFilters: Seq[Expression] = Seq.empty,
     dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
-  override def isSplitable(path: Path): Boolean = true
+  override def isSplitable(path: Path): Boolean = {
+    // If aggregate is pushed down, only the file footer will be read once,
+    // so file should be not split across multiple tasks.
+    pushedAggregate.isEmpty

Review comment:
       @huaxingao - cool then I can address for Parquet in a followup PR, no urgent anyway.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))

Review comment:
       @huaxingao - thanks for checking. Removed for ORC. I can do another PR for Parquet to help this PR review faster, but if you are already on it for Parquet code path, feel free to go ahead.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNames: Set[String],
+      dataFilters: Seq[Expression]): Option[StructType] = {
+
+    var finalSchema = new StructType()
+
+    def getStructFieldForCol(col: NamedReference): StructField = {
+      schema.apply(col.fieldNames.head)
+    }
+
+    def isPartitionCol(col: NamedReference) = {
+      partitionNames.contains(col.fieldNames.head)
+    }
+
+    def processMinOrMax(agg: AggregateFunc): Boolean = {
+      val (column, aggType) = agg match {
+        case max: Max => (max.column, "max")
+        case min: Min => (min.column, "min")
+        case _ =>
+          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
+      }
+
+      if (isPartitionCol(column)) {
+        // don't push down partition column, footer doesn't have max/min for partition column
+        return false
+      }
+      val structField = getStructFieldForCol(column)
+
+      structField.dataType match {
+        // not push down complex type
+        // not push down Timestamp because INT96 sort order is undefined,
+        // Parquet doesn't return statistics for INT96
+        // not push down Parquet Binary because min/max could be truncated
+        // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
+        // could be Spark StringType, BinaryType or DecimalType.
+        // not push down for ORC with same reason.
+        case BooleanType | ByteType | ShortType | IntegerType
+             | LongType | FloatType | DoubleType | DateType =>
+          finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
+          true
+        case _ =>
+          false
+      }
+    }
+
+    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+      // Parquet/ORC footer has max/min/count for columns
+      // e.g. SELECT COUNT(col1) FROM t
+      // but footer doesn't have max/min/count for a column if max/min/count
+      // are combined with filter or group by
+      // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+      //      SELECT COUNT(col1) FROM t GROUP BY col2
+      // Todo: 1. add support if groupby column is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36646)
+      //       2. add support if filter col is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36647)
+      return None
+    }
+
+    aggregation.aggregateExpressions.foreach {
+      case max: Max =>
+        if (!processMinOrMax(max)) return None
+      case min: Min =>
+        if (!processMinOrMax(min)) return None
+      case count: Count =>
+        if (count.column.fieldNames.length != 1 || count.isDistinct) return None
+        finalSchema =
+          finalSchema.add(StructField(s"count(" + count.column.fieldNames.head + ")", LongType))
+      case _: CountStar =>
+        finalSchema = finalSchema.add(StructField("count(*)", LongType))
+      case _ =>
+        return None
+    }
+
+    Some(finalSchema)
+  }
+
+  /**
+   * Check if two Aggregation `a` and `b` is equal or not.
+   */
+  def equivalentAggregations(a: Aggregation, b: Aggregation): Boolean = {

Review comment:
       @viirya - I think so, `Aggregation` is not a `QueryPlan` here, btw this was introduced in https://github.com/apache/spark/pull/33639, and I am refactoring here.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +

Review comment:
       > Why not use DateColumnStatistics instead of ${statistics.getClass.getName}?
   
   Sorry if it's not clear, but this is code path for `case _`, not `case s: DateColumnStatistics`. I want to print out the class name for the statistics we do not handle.

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnStatistics.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. The flatten array stores all data types (including nested types) in
+ * tree pre-ordering. This is used for aggregate push down in ORC.
+ */
+public class OrcColumnStatistics {
+  private final ColumnStatistics statistics;
+  private final List<OrcColumnStatistics> children;

Review comment:
       @viirya - sure, added some comments and an example. 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum

Review comment:
       @sunchao - great catch! Added handling for empty file (0 value/row), we should return null instead. Also added the unit test for empty file in `FileSourceAggregatePushDownSuite/"aggregate push down - different data types"`, thanks.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values
+          // for ColumnStatistics of individual column. In addition to this, ORC also stores number
+          // of all values (null and non-null) separately.
+          val nonNullRowsCount = if (isPartitionColumn) {
+            columnsStatistics.getStatistics.getNumberOfValues

Review comment:
       @sunchao - because for every row, the partition column should not be NULL (similar reason for Parquet in https://github.com/apache/spark/pull/33639#discussion_r725682376). So for partition column, every row should be counted. Also updated the unit test `FileSourceAggregatePushDownSuite."Count(partition column): push down"` to test for null values.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
##########
@@ -175,24 +175,26 @@ case class ParquetPartitionReaderFactory(
     } else {
       new PartitionReader[ColumnarBatch] {
         private var hasNext = true
-        private val row: ColumnarBatch = {
+        private val batch: ColumnarBatch = {
           val footer = getFooter(file)
           if (footer != null && footer.getBlocks.size > 0) {
-            ParquetUtils.createAggColumnarBatchFromFooter(footer, file.filePath, dataSchema,
-              partitionSchema, aggregation.get, readDataSchema, enableOffHeapColumnVector,
+            val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.filePath,
+              dataSchema, partitionSchema, aggregation.get, readDataSchema,
               getDatetimeRebaseMode(footer.getFileMetaData), isCaseSensitive)
+            AggregatePushDownUtils.convertAggregatesRowToBatch(
+              row, readDataSchema, enableOffHeapColumnVector)

Review comment:
       @sunchao - makes sense to me, this is also existing behavior of `ParquetPartitionReaderFactory.createParquetVectorizedReader()`. Updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +

Review comment:
       @viirya - sorry, fixed.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)

Review comment:
       @sunchao - normally it should have. Added code here to throw an actionable exception here in case the file's statistics are not valid.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +

Review comment:
       @sunchao - added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-950040509


   **[Test build #144547 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144547/testReport)** for PR 34298 at commit [`3341440`](https://github.com/apache/spark/commit/334144026416ce81f6e9cfce76b4d5e92a71fa93).
    * This patch **fails SparkR unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-950040725


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144547/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r735996208



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))

Review comment:
       I did a quick test. We don't need this `isCaseSensitive`. I can clean this up if you don't have time for this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r735984497



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))

Review comment:
       When reading this code again, I realized that we may not need the `isCaseSensitive` because the column name has already been normalized. We probably don't need to pass down the `isCaseSensitive` at all. I will double check this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952444505


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49102/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953219162


   Rebased to latest master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-944847640






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya closed pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
viirya closed pull request #34298:
URL: https://github.com/apache/spark/pull/34298


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r738046042



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -960,6 +960,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val ORC_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.aggregatePushdown")
+    .doc("If true, aggregates will be pushed down to ORC for optimization. Support MIN, MAX and " +
+      "COUNT as aggregate expression. For MIN/MAX, support boolean, integer, float and date " +
+      "type. For COUNT, support all data types.")

Review comment:
       Sounds good. Let's have a detailed doc later on. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953232175


   **[Test build #144676 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144676/testReport)** for PR 34298 at commit [`8c7c617`](https://github.com/apache/spark/commit/8c7c6178ae145190a6fae6fd2024946578362312).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952424889


   **[Test build #144632 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144632/testReport)** for PR 34298 at commit [`9b8b9ef`](https://github.com/apache/spark/commit/9b8b9ef7efa3ab055edced6d039cc98867f3483f).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r731103956



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnsStatistics.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,

Review comment:
       is it in pre-order and does it flatten all the nested types? might worth mention here

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcFooterReader.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.spark.sql.types.*;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * `OrcFooterReader` is a util class which encapsulates the helper
+ * methods of reading ORC file footer.
+ */
+public class OrcFooterReader {
+
+  /**
+   * Read the columns statistics from ORC file footer.
+   *
+   * @param orcReader the reader to read ORC file footer.
+   * @return Statistics for all columns in the file.
+   */
+  public static OrcColumnsStatistics readStatistics(Reader orcReader) {
+    TypeDescription orcSchema = orcReader.getSchema();
+    ColumnStatistics[] orcStatistics = orcReader.getStatistics();
+    StructType dataType = OrcUtils.toCatalystSchema(orcSchema);

Review comment:
       nit: maybe rename `dataType` to `sparkSchema`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>
+          new Text(if (isMax) s.getMaximum else s.getMinimum)
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+          value
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+          value
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values

Review comment:
       I got confused reading this: `getNumberOfValues` has two different behaviors depending on the column it represents is top-level or non-top-level?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>
+          new Text(if (isMax) s.getMaximum else s.getMinimum)
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+          value
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+          value
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values
+          // for ColumnStatistics of individual column. In addition to this, ORC also returns
+          // number of non-null and null values for its top-level
+          // ColumnStatistics.getNumberOfValues().
+          val nonNullRowsCount = if (isPartitionColumn) {
+            val topLevelStatistics = columnsStatistics.getStatistics
+            if (topLevelStatistics.hasNull) {
+              throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +
+                s"values: $topLevelStatistics. Aggregate expression: $count")
+            }
+            topLevelStatistics.getNumberOfValues
+          } else {
+            getColumnStatistics(columnName).getNumberOfValues
+          }
+          new LongWritable(nonNullRowsCount)
+        case (_: CountStar, _) =>
+          // Count(*) includes both null and non-null values.
+          val topLevelStatistics = columnsStatistics.getStatistics
+          if (topLevelStatistics.hasNull) {
+            throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +

Review comment:
       not sure why we should throw exception here - doesn't `count(*)` include NULLs?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
##########
@@ -37,35 +38,65 @@ case class OrcScan(
     readDataSchema: StructType,
     readPartitionSchema: StructType,
     options: CaseInsensitiveStringMap,
+    pushedAggregate: Option[Aggregation] = None,
     pushedFilters: Array[Filter],
     partitionFilters: Seq[Expression] = Seq.empty,
     dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
-  override def isSplitable(path: Path): Boolean = true
+  override def isSplitable(path: Path): Boolean = {
+    // If aggregate is pushed down, only the file footer will be read once,
+    // so file should be not split across multiple tasks.
+    pushedAggregate.isEmpty

Review comment:
       Seems this is a better approach than we are doing on Parquet side, cc @huaxingao . Also maybe we should change how we measure file weight when combining tasks for aggregate pushdown, since we can combine multiple large files into a single task as computing stats is much cheaper.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],

Review comment:
       nit: maybe `partitionNames`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
##########
@@ -87,84 +86,45 @@ case class ParquetScanBuilder(
   override def pushedFilters(): Array[Filter] = pushedParquetFilters
 
   override def pushAggregation(aggregation: Aggregation): Boolean = {
-
-    def getStructFieldForCol(col: NamedReference): StructField = {
-      schema.nameToField(col.fieldNames.head)
-    }
-
-    def isPartitionCol(col: NamedReference) = {
-      partitionNameSet.contains(col.fieldNames.head)
+    if (!sparkSession.sessionState.conf.parquetAggregatePushDown) {
+      return false
     }
 
-    def processMinOrMax(agg: AggregateFunc): Boolean = {
-      val (column, aggType) = agg match {
-        case max: Max => (max.column, "max")
-        case min: Min => (min.column, "min")
-        case _ =>
-          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
-      }
-
-      if (isPartitionCol(column)) {
-        // don't push down partition column, footer doesn't have max/min for partition column
-        return false
-      }
-      val structField = getStructFieldForCol(column)
-
-      structField.dataType match {
-        // not push down complex type
-        // not push down Timestamp because INT96 sort order is undefined,
-        // Parquet doesn't return statistics for INT96
-        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType =>
+    def isAllowedTypeForMinMaxAggregate(dataType: DataType): Boolean = {
+      dataType match {
+        // Not push down complex type.
+        // Not push down Timestamp because INT96 sort order is undefined,
+        // Parquet doesn't return statistics for INT96.
+        // Not push down Binary type as Parquet can truncate the statistics.
+        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>

Review comment:
       We should put StringType here too

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
##########
@@ -58,4 +72,35 @@ case class OrcScanBuilder(
       Array.empty[Filter]
     }
   }
+
+  override def pushAggregation(aggregation: Aggregation): Boolean = {
+    if (!sparkSession.sessionState.conf.orcAggregatePushDown) {
+      return false
+    }
+
+    def isAllowedTypeForMinMaxAggregate(dataType: DataType): Boolean = {
+      dataType match {
+        // Not push down complex and Timestamp type.
+        // Not push down Binary type as ORC does not write min/max statistics for it.
+        case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>

Review comment:
       hm should we add `StringType`? here how does ORC store stats for long strings?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>
+          new Text(if (isMax) s.getMaximum else s.getMinimum)
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+          value
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+          value
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values
+          // for ColumnStatistics of individual column. In addition to this, ORC also returns
+          // number of non-null and null values for its top-level
+          // ColumnStatistics.getNumberOfValues().
+          val nonNullRowsCount = if (isPartitionColumn) {
+            val topLevelStatistics = columnsStatistics.getStatistics
+            if (topLevelStatistics.hasNull) {
+              throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +

Review comment:
       hm does it mean here we have an invalid ORC file or it is a valid file but Spark can't handle the case?

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnsStatistics.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. This is used for aggregate push down in ORC.
+ */
+public class OrcColumnsStatistics {

Review comment:
       `OrcColumnsStatistics` -> `OrcColumnStatistics`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>
+          new Text(if (isMax) s.getMaximum else s.getMinimum)
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+          value

Review comment:
       nit: `value` seems redundant

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +

Review comment:
       nit: space at the end

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],
+      dataFilters: Seq[Expression],
+      isAllowedTypeForMinMaxAggregate: DataType => Boolean,
+      sparkSession: SparkSession): Option[StructType] = {

Review comment:
       nit: `sparkSession` is unused.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,

Review comment:
       nit: `filePath` is unused

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>

Review comment:
       is it OK to pushdown string for ORC? I remember you mentioned there is some issues similar to Parquet?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DecimalColumnStatistics =>
+          new HiveDecimalWritable(if (isMax) s.getMaximum else s.getMinimum)
+        case s: StringColumnStatistics =>
+          new Text(if (isMax) s.getMaximum else s.getMinimum)
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+          value
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          val value = getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+          value
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values
+          // for ColumnStatistics of individual column. In addition to this, ORC also returns
+          // number of non-null and null values for its top-level
+          // ColumnStatistics.getNumberOfValues().
+          val nonNullRowsCount = if (isPartitionColumn) {
+            val topLevelStatistics = columnsStatistics.getStatistics
+            if (topLevelStatistics.hasNull) {
+              throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +

Review comment:
       I think we should also give an informative error message to the users so they know how to fallback 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-949960083


   **[Test build #144547 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144547/testReport)** for PR 34298 at commit [`3341440`](https://github.com/apache/spark/commit/334144026416ce81f6e9cfce76b4d5e92a71fa93).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-944859617






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-945032390


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48805/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r730201130



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnsStatistics.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. This is used for aggregate push down in ORC.
+ */
+public class OrcColumnsStatistics {

Review comment:
       I'm curious why this is in java?

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnsStatistics.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. This is used for aggregate push down in ORC.
+ */
+public class OrcColumnsStatistics {

Review comment:
       I'm curious why this is in java? This doesn't look like to be an API open to others.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],
+      dataFilters: Seq[Expression],
+      isAllowedTypeForMinMaxAggregate: DataType => Boolean,
+      sparkSession: SparkSession): Option[StructType] = {
+
+    var finalSchema = new StructType()
+
+    def getStructFieldForCol(col: NamedReference): StructField = {
+      schema.apply(col.fieldNames.head)
+    }
+
+    def isPartitionCol(col: NamedReference) = {
+      partitionNameSet.contains(col.fieldNames.head)
+    }
+
+    def processMinOrMax(agg: AggregateFunc): Boolean = {
+      val (column, aggType) = agg match {
+        case max: Max => (max.column, "max")
+        case min: Min => (min.column, "min")
+        case _ =>
+          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
+      }
+
+      if (isPartitionCol(column)) {
+        // don't push down partition column, footer doesn't have max/min for partition column
+        return false
+      }
+      val structField = getStructFieldForCol(column)
+
+      if (isAllowedTypeForMinMaxAggregate(structField.dataType)) {
+        finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
+        true
+      } else {
+        false
+      }
+    }
+
+    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+      // Parquet/ORC footer has max/min/count for columns
+      // e.g. SELECT COUNT(col1) FROM t
+      // but footer doesn't have max/min/count for a column if max/min/count
+      // are combined with filter or group by
+      // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+      //      SELECT COUNT(col1) FROM t GROUP BY col2
+      // Todo: 1. add support if groupby column is partition col

Review comment:
       @huaxingao Don't you already add the support? No?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation, Count, CountStar, Max, Min}
+import org.apache.spark.sql.execution.RowToColumnConverter
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Utility class for aggregate push down to Parquet and ORC.
+ */
+object AggregatePushDownUtils {
+
+  /**
+   * Get the data schema for aggregate to be pushed down.
+   */
+  def getSchemaForPushedAggregation(
+      aggregation: Aggregation,
+      schema: StructType,
+      partitionNameSet: Set[String],
+      dataFilters: Seq[Expression],
+      isAllowedTypeForMinMaxAggregate: DataType => Boolean,
+      sparkSession: SparkSession): Option[StructType] = {
+
+    var finalSchema = new StructType()
+
+    def getStructFieldForCol(col: NamedReference): StructField = {
+      schema.apply(col.fieldNames.head)
+    }
+
+    def isPartitionCol(col: NamedReference) = {
+      partitionNameSet.contains(col.fieldNames.head)
+    }
+
+    def processMinOrMax(agg: AggregateFunc): Boolean = {
+      val (column, aggType) = agg match {
+        case max: Max => (max.column, "max")
+        case min: Min => (min.column, "min")
+        case _ =>
+          throw new IllegalArgumentException(s"Unexpected type of AggregateFunc ${agg.describe}")
+      }
+
+      if (isPartitionCol(column)) {
+        // don't push down partition column, footer doesn't have max/min for partition column
+        return false
+      }
+      val structField = getStructFieldForCol(column)
+
+      if (isAllowedTypeForMinMaxAggregate(structField.dataType)) {
+        finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
+        true
+      } else {
+        false
+      }
+    }
+
+    if (aggregation.groupByColumns.nonEmpty || dataFilters.nonEmpty) {
+      // Parquet/ORC footer has max/min/count for columns
+      // e.g. SELECT COUNT(col1) FROM t
+      // but footer doesn't have max/min/count for a column if max/min/count
+      // are combined with filter or group by
+      // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+      //      SELECT COUNT(col1) FROM t GROUP BY col2
+      // Todo: 1. add support if groupby column is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36646)
+      //       2. add support if filter col is partition col
+      //          (https://issues.apache.org/jira/browse/SPARK-36647)
+      return None
+    }
+
+    aggregation.groupByColumns.foreach { col =>

Review comment:
       Hmm, doesn't `aggregation.groupByColumns` must be empty?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-944843872


   cc @huaxingao, @sunchao, @viirya, @cloud-fan and @dongjoon-hyun could you help take a look when you have time? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954181760


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49194/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954315584


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144724/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954103543


   Addressed all comments from @huaxingao. @viirya do you wanna take another look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953313863


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49145/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r738039813



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnStatistics.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import org.apache.orc.ColumnStatistics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
+ *
+ * Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
+ * this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
+ * according to data types. The flatten array stores all data types (including nested types) in
+ * tree pre-ordering. This is used for aggregate push down in ORC.
+ *
+ * For nested data types (array, map and struct), the sub-field statistics are stored recursively
+ * inside parent column's `children` field. Here is an example of `OrcColumnStatistics`:

Review comment:
       nit: I think in java doc, we are supposed to use {@code} instead of back quotes. There are a couple of other places use back quotes too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954155020


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49194/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952461662


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49102/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r737763308



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,124 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      filePath: String,

Review comment:
       @sunchao - added back now, to show an actionable message when file does not have statistics in line 407.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-945045872


   @c21 Thanks for working on this! I took a quick look, overall it is good. I will find time to take a closer look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-944847640


   **[Test build #144317 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144317/testReport)** for PR 34298 at commit [`73627ed`](https://github.com/apache/spark/commit/73627eda63cf754240d2dce8b07f6f45ae71c512).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-944859617






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-945047639


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144326/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-945047475


   **[Test build #144326 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144326/testReport)** for PR 34298 at commit [`1f36f12`](https://github.com/apache/spark/commit/1f36f12fdfb99bd737f02859da19630e48c09956).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-946391601


   Thank you @sunchao for review! Will update shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r738038588



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -960,6 +960,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val ORC_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.aggregatePushdown")
+    .doc("If true, aggregates will be pushed down to ORC for optimization. Support MIN, MAX and " +
+      "COUNT as aggregate expression. For MIN/MAX, support boolean, integer, float and date " +
+      "type. For COUNT, support all data types.")

Review comment:
       We support byte, short and double for MIN/MAX too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r737915227



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)

Review comment:
       Taking Spark write code path as an example here.
   
   Spark uses `OrcOutputWriter` to write ORC file, and internally it depends on ORC `OrcMapreduceRecordWriter` to do the actual write.
   
   The writing of file statistics is happening during `OrcOutputWriter.close()` -> `OrcMapreduceRecordWriter.close()` -> `WriterImpl.close()` -> `WriterImpl.writeFooter()` -> `TreeWriter.writeFileStatistics()`. So writing file statistics is a step of writing file footer and will throw exception if not written.
   
   `TreeWriter` contains individual writer per each column. Let's take `IntegerTreeWriter` as an example for writing int column.
   
   `TreeWriterBase` (the superclass of `IntegerTreeWriter`) maintains real-time per-row/batch statistics (`TreeWriterBase.indexStatistics`), per-stripe statistics (`TreeWriterBase.stripeColStatistics`) and per-file statistics (`TreeWriterBase.fileStatistics`). `TreeWriterBase.writeBatch()` updates the count statistics. [`IntegerTreeWriter.writeBatch()` updates min/max statistics for int column when writing each row](https://github.com/apache/orc/blob/main/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java#L88).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952424889


   **[Test build #144632 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144632/testReport)** for PR 34298 at commit [`9b8b9ef`](https://github.com/apache/spark/commit/9b8b9ef7efa3ab055edced6d039cc98867f3483f).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-953313863


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49145/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #34298:
URL: https://github.com/apache/spark/pull/34298#discussion_r737026407



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
##########
@@ -377,4 +381,106 @@ object OrcUtils extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to ORC, we don't need to read data
+   * from ORC and aggregate at Spark layer. Instead we want to get the partial aggregates
+   * (Max/Min/Count) result using the statistics information from ORC file footer, and then
+   * construct an InternalRow from these aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  def createAggInternalRowFromFooter(
+      reader: Reader,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    require(aggregation.groupByColumns.length == 0,
+      s"aggregate $aggregation with group-by column shouldn't be pushed down")
+    val columnsStatistics = OrcFooterReader.readStatistics(reader)
+
+    // Get column statistics with column name.
+    def getColumnStatistics(columnName: String): ColumnStatistics = {
+      val columnIndex = dataSchema.fieldNames.indexOf(columnName)
+      columnsStatistics.get(columnIndex).getStatistics
+    }
+
+    // Get Min/Max statistics and store as ORC `WritableComparable` format.
+    def getMinMaxFromColumnStatistics(
+        statistics: ColumnStatistics,
+        dataType: DataType,
+        isMax: Boolean): WritableComparable[_] = {
+      statistics match {
+        case s: BooleanColumnStatistics =>
+          val value = if (isMax) s.getTrueCount > 0 else !(s.getFalseCount > 0)
+          new BooleanWritable(value)
+        case s: IntegerColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case ByteType => new ByteWritable(value.toByte)
+            case ShortType => new ShortWritable(value.toShort)
+            case IntegerType => new IntWritable(value.toInt)
+            case LongType => new LongWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType " +
+              "for IntegerColumnStatistics")
+          }
+        case s: DoubleColumnStatistics =>
+          val value = if (isMax) s.getMaximum else s.getMinimum
+          dataType match {
+            case FloatType => new FloatWritable(value.toFloat)
+            case DoubleType => new DoubleWritable(value)
+            case _ => throw new IllegalArgumentException(
+              s"getMaxFromColumnStatistics should not take type $dataType" +
+                "for DoubleColumnStatistics")
+          }
+        case s: DateColumnStatistics =>
+          new DateWritable(
+            if (isMax) s.getMaximumDayOfEpoch.toInt else s.getMinimumDayOfEpoch.toInt)
+        case _ => throw new IllegalArgumentException(
+          s"getMaxFromColumnStatistics should not take ${statistics.getClass.getName}: " +
+            s"$statistics as the ORC column statistics")
+      }
+    }
+
+    val aggORCValues: Seq[WritableComparable[_]] =
+      aggregation.aggregateExpressions.zipWithIndex.map {
+        case (max: Max, index) =>
+          val columnName = max.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = true)
+        case (min: Min, index) =>
+          val columnName = min.column.fieldNames.head
+          val statistics = getColumnStatistics(columnName)
+          val dataType = aggSchema.apply(index).dataType
+          getMinMaxFromColumnStatistics(statistics, dataType, isMax = false)
+        case (count: Count, _) =>
+          val columnName = count.column.fieldNames.head
+          val isPartitionColumn = partitionSchema.fields
+            .map(PartitioningUtils.getColName(_, isCaseSensitive))
+            .contains(columnName)
+          // NOTE: Count(columnName) doesn't include null values.
+          // org.apache.orc.ColumnStatistics.getNumberOfValues() returns number of non-null values
+          // for ColumnStatistics of individual column. In addition to this, ORC also stores number
+          // of all values (null and non-null) separately.
+          val nonNullRowsCount = if (isPartitionColumn) {
+            columnsStatistics.getStatistics.getNumberOfValues

Review comment:
       I see, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954181804


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49194/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954318091


   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-954314787


   **[Test build #144724 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144724/testReport)** for PR 34298 at commit [`d85d4ba`](https://github.com/apache/spark/commit/d85d4ba720a0936ee636320be8cfefe9a72565f9).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-949960083


   **[Test build #144547 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144547/testReport)** for PR 34298 at commit [`3341440`](https://github.com/apache/spark/commit/334144026416ce81f6e9cfce76b4d5e92a71fa93).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952473461


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49102/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34298: [SPARK-34960][SQL] Aggregate push down for ORC

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34298:
URL: https://github.com/apache/spark/pull/34298#issuecomment-952473461


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49102/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org