You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "sunchao (via GitHub)" <gi...@apache.org> on 2023/11/01 20:11:34 UTC

[PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

sunchao opened a new pull request, #43629:
URL: https://github.com/apache/spark/pull/43629

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   Also update partition statistics (e.g., total size in bytes, row count) with `ANALYZE TABLE` command.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Currently when a `ANALYZE TABLE <tableName>` command is triggered against a partition table, only table stats are updated, but not partition stats. For Spark users who want to update the latter, they have to use a different syntax: `ANALYZE TABLE <tableName> PARTITION(<partitionColumns>)` which is more verbose. 
   
   Given `ANALYZE TABLE` internally already calculates total size for all the partitions, it makes sense to also update partition stats using the result. In this way, Spark users do not need to remember two different syntaxes.
   
   In addition, when using `ANALYZE TABLE` with the "scan node", i.e., `NOSCAN` is NOT specified, we can also calculate row count for all the partitions and update the stats accordingly.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   Yes, now with `ANALYZE TABLE` command on a partition table, the partition stats will also get updated.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   Added a unit test for this feature.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387720552


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala:
##########
@@ -101,56 +98,19 @@ case class AnalyzePartitionCommand(
       if (noscan) {
         Map.empty
       } else {
-        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+        CommandUtils.calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
       }
 
     // Update the metastore if newly computed statistics are different from those
     // recorded in the metastore.
-
-    val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier,
-      partitions.map(_.storage.locationUri))
-    val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-      val newRowCount = rowCounts.get(p.spec)
-      val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
-      newStats.map(_ => p.copy(stats = newStats))
-    }
-
+    val (_, newPartitions) = CommandUtils.calculatePartitionStats(
+      sparkSession, tableMeta, partitions, Some(rowCounts))
     if (newPartitions.nonEmpty) {
       sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
     }
 
     Seq.empty[Row]
   }
 
-  private def calculateRowCountsPerPartition(

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387340391


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2671,6 +2671,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ANALYZE_PARTITION_STATS_ENABLED =
+    buildConf("spark.sql.statistics.update.partitionStats.enabled")

Review Comment:
   1. Do we have other configurations under `spark.sql.statistics.update.*`? We should not add a new namespace for a single configuration. Shall we rename like the following?
   ```
   - spark.sql.statistics.update.partitionStats.enabled
   + spark.sql.statistics.updatePartitionStats.enabled
   ```
   
   2. Also, the current config name looks too general. Can we revise the config name to give some idea about `ANALYZE TABLE` syntax?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387354165


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2671,6 +2671,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ANALYZE_PARTITION_STATS_ENABLED =
+    buildConf("spark.sql.statistics.update.partitionStats.enabled")

Review Comment:
   +1 for the new name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387500112


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala:
##########
@@ -101,56 +98,19 @@ case class AnalyzePartitionCommand(
       if (noscan) {
         Map.empty
       } else {
-        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+        CommandUtils.calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
       }
 
     // Update the metastore if newly computed statistics are different from those
     // recorded in the metastore.
-
-    val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier,
-      partitions.map(_.storage.locationUri))
-    val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-      val newRowCount = rowCounts.get(p.spec)
-      val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
-      newStats.map(_ => p.copy(stats = newStats))
-    }
-
+    val (_, newPartitions) = CommandUtils.calculatePartitionStats(
+      sparkSession, tableMeta, partitions, Some(rowCounts))
     if (newPartitions.nonEmpty) {
       sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
     }
 
     Seq.empty[Row]
   }
 
-  private def calculateRowCountsPerPartition(

Review Comment:
   It's now used in `CommandUtils` so moving there and switch to use qualified `CommandUtils.calculateRowCountsPerPartition` in this class



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -86,19 +91,31 @@ object CommandUtils extends Logging {
       // Calculate table size as a sum of the visible partitions. See SPARK-21079
       val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
       logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.")
-      val paths = partitions.map(_.storage.locationUri)
-      val sizes = calculateMultipleLocationSizes(spark, catalogTable.identifier, paths)
-      val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-        val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), None)
-        newStats.map(_ => p.copy(stats = newStats))
-      }
+      val (sizes, newPartitions) = calculatePartitionStats(spark, catalogTable, partitions,
+        partitionRowCount)
       (sizes.sum, newPartitions)

Review Comment:
   Yea we can use `sizes.sum` and save a line here I think



##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,85 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+          partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+              """.stripMargin)
+
+            val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
+                  .format("parquet").save(s"$path/ds=$ds")
+            }
+
+            assert(getCatalogTable(tableName).stats.isEmpty)
+            partitionDates.foreach { ds =>
+              assert(queryStats(ds).isEmpty)
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")
+
+            val expectedRowCount = 25
+
+            // Table size should also have been updated
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Row count should NOT be updated with the `NOSCAN` option
+            assert(getTableStats(tableName).rowCount.isEmpty)
+
+            partitionDates.foreach { ds =>
+              val partStats = queryStats(ds)
+              if (partitionStatsEnabled) {
+                assert(partStats.nonEmpty)
+                assert(partStats.get.sizeInBytes > 0)
+                assert(partStats.get.rowCount.isEmpty)
+              } else {
+                assert(partStats.isEmpty)
+              }
+            }

Review Comment:
   Hmm I actually like the current way better since it has less duplicated code, e.g., `partitionDates.foreach`, `val partStats = queryStats(ds)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387343974


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2671,6 +2671,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ANALYZE_PARTITION_STATS_ENABLED =
+    buildConf("spark.sql.statistics.update.partitionStats.enabled")

Review Comment:
   Hmm, how about `spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled`? it is a bit long but can fully expression the intention 😂 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2671,6 +2671,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ANALYZE_PARTITION_STATS_ENABLED =
+    buildConf("spark.sql.statistics.update.partitionStats.enabled")

Review Comment:
   Hmm, how about `spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled`? it is a bit long but can fully expression the intention 😂 



##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -372,54 +372,70 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       partition.stats
     }
 
-    withTable(tableName) {
-      withTempPath { path =>
-        // Create a table with 3 partitions all located under a single top-level directory 'path'
-        sql(
-          s"""
-             |CREATE TABLE $tableName (key STRING, value STRING)
-             |USING hive
-             |PARTITIONED BY (ds STRING)
-             |LOCATION '${path.toURI}'
-             """.stripMargin)
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.ANALYZE_PARTITION_STATS_ENABLED.key -> partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+              """.stripMargin)
 
-        val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+            val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
 
-        partitionDates.foreach { ds =>
-          sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
-          sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
-              .format("parquet").save(s"$path/ds=$ds")
-        }
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * from src").write.mode(SaveMode.Overwrite)

Review Comment:
   oops will revert this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1804237479

   Feel free to merge after adjusting minor things, @sunchao . Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1804860767

   Merged to master, thanks all!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "patsukp-db (via GitHub)" <gi...@apache.org>.
patsukp-db commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1393596801


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -37,6 +39,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex}
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.Utils

Review Comment:
   This causes a compile error.
   Perhaps, we can do
   
   ```
   import org.apache.spark.util.collection.{Utils => ColUtils}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1790038555

   > AFAIK, users are used to using REPAIR TABLE to update partition statistics.
   
   Hmm sorry I'm not aware that this is a common pattern among Spark users. However, it seems `REPAIR TABLE` is a bit more expensive than `ANALYZE TABLE` since it needs to list all the partitions under the table directory first, and process & validate them. In addition, it doesn't seem able to update row count for each partition too.
   
   > I think `ANALYZE TABLE` should update the whole statistics instead of partition statistics. How to only update the table whole statistics without partition statistics if we accepted this PR?
   
   Yea that's a valid question. I wonder what's the reason for users to only want to update table stats but not partition stats though: is it because updating the latter is significantly more expensive? In the `ANALYZE TABLE .. COMPUTE STATISTICS NOSCAN` case, the current implementation already collects the size in bytes for each partition and we just need to incur one extra HMS call (`alterPartitions`) to update the stats for these partitions.
   
   Alternatively, maybe we can introduce a new syntax `ANALYZE TABLE <tableName> PARTITIONS COMPUTE STATISTICS [NOSCAN]` to update both table and partition stats? 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387522474


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala:
##########
@@ -101,56 +98,19 @@ case class AnalyzePartitionCommand(
       if (noscan) {
         Map.empty
       } else {
-        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+        CommandUtils.calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
       }
 
     // Update the metastore if newly computed statistics are different from those
     // recorded in the metastore.
-
-    val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier,
-      partitions.map(_.storage.locationUri))
-    val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-      val newRowCount = rowCounts.get(p.spec)
-      val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
-      newStats.map(_ => p.copy(stats = newStats))
-    }
-
+    val (_, newPartitions) = CommandUtils.calculatePartitionStats(
+      sparkSession, tableMeta, partitions, Some(rowCounts))
     if (newPartitions.nonEmpty) {
       sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
     }
 
     Seq.empty[Row]
   }
 
-  private def calculateRowCountsPerPartition(

Review Comment:
   I think we shouldn't move `calculateRowCountsPerPartition` if it is used only once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387343974


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2671,6 +2671,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ANALYZE_PARTITION_STATS_ENABLED =
+    buildConf("spark.sql.statistics.update.partitionStats.enabled")

Review Comment:
   Hmm, how about `spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled`? it is a bit long but can fully express the intention 😂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387395341


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -86,19 +91,31 @@ object CommandUtils extends Logging {
       // Calculate table size as a sum of the visible partitions. See SPARK-21079
       val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
       logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.")
-      val paths = partitions.map(_.storage.locationUri)
-      val sizes = calculateMultipleLocationSizes(spark, catalogTable.identifier, paths)
-      val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-        val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), None)
-        newStats.map(_ => p.copy(stats = newStats))
-      }
+      val (sizes, newPartitions) = calculatePartitionStats(spark, catalogTable, partitions,
+        partitionRowCount)
       (sizes.sum, newPartitions)
     }
     logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" +
       s" the total size for table ${catalogTable.identifier}.")
     (totalSize, newPartitions)
   }
 
+  def calculatePartitionStats(
+      spark: SparkSession,
+      catalogTable: CatalogTable,
+      partitions: Seq[CatalogTablePartition],
+      partitionRowCount: Option[Map[TablePartitionSpec, BigInt]] = None):
+  (Seq[Long], Seq[CatalogTablePartition]) = {
+    val paths = partitions.map(_.storage.locationUri)
+    val sizes = calculateMultipleLocationSizes(spark, catalogTable.identifier, paths)
+    val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
+      val newRowCount = partitionRowCount.flatMap(_.get(p.spec))
+      val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
+      newStats.map(_ => p.copy(stats = newStats))
+    }
+    (sizes, newPartitions)

Review Comment:
   ```suggestion
       (sizes.sum, newPartitions)
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala:
##########
@@ -101,56 +98,19 @@ case class AnalyzePartitionCommand(
       if (noscan) {
         Map.empty
       } else {
-        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+        CommandUtils.calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
       }
 
     // Update the metastore if newly computed statistics are different from those
     // recorded in the metastore.
-
-    val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier,
-      partitions.map(_.storage.locationUri))
-    val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-      val newRowCount = rowCounts.get(p.spec)
-      val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
-      newStats.map(_ => p.copy(stats = newStats))
-    }
-
+    val (_, newPartitions) = CommandUtils.calculatePartitionStats(
+      sparkSession, tableMeta, partitions, Some(rowCounts))
     if (newPartitions.nonEmpty) {
       sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
     }
 
     Seq.empty[Row]
   }
 
-  private def calculateRowCountsPerPartition(

Review Comment:
   Is `calculateRowCountsPerPartition` shared with other caller?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -86,19 +91,31 @@ object CommandUtils extends Logging {
       // Calculate table size as a sum of the visible partitions. See SPARK-21079
       val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
       logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.")
-      val paths = partitions.map(_.storage.locationUri)
-      val sizes = calculateMultipleLocationSizes(spark, catalogTable.identifier, paths)
-      val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-        val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), None)
-        newStats.map(_ => p.copy(stats = newStats))
-      }
+      val (sizes, newPartitions) = calculatePartitionStats(spark, catalogTable, partitions,
+        partitionRowCount)
       (sizes.sum, newPartitions)

Review Comment:
   We can delete this line if `calculatePartitionStats` returns `sizes.sum`.



##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,85 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+          partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+              """.stripMargin)
+
+            val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")

Review Comment:
   Please put `partitionDates` out of `Seq(true, false)...`



##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,85 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+          partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+              """.stripMargin)
+
+            val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
+                  .format("parquet").save(s"$path/ds=$ds")
+            }
+
+            assert(getCatalogTable(tableName).stats.isEmpty)
+            partitionDates.foreach { ds =>
+              assert(queryStats(ds).isEmpty)
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")
+
+            val expectedRowCount = 25
+
+            // Table size should also have been updated
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Row count should NOT be updated with the `NOSCAN` option
+            assert(getTableStats(tableName).rowCount.isEmpty)
+
+            partitionDates.foreach { ds =>
+              val partStats = queryStats(ds)
+              if (partitionStatsEnabled) {
+                assert(partStats.nonEmpty)
+                assert(partStats.get.sizeInBytes > 0)
+                assert(partStats.get.rowCount.isEmpty)
+              } else {
+                assert(partStats.isEmpty)
+              }
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
+
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Table row count should be updated
+            assert(getTableStats(tableName).rowCount.get == 3 * expectedRowCount)
+
+            partitionDates.foreach { ds =>
+              val partStats = queryStats(ds)
+              if (partitionStatsEnabled) {

Review Comment:
   ditto.



##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,85 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+          partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+              """.stripMargin)

Review Comment:
   ```suggestion
                  """.stripMargin)
   ```



##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,85 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+          partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+              """.stripMargin)
+
+            val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
+                  .format("parquet").save(s"$path/ds=$ds")
+            }
+
+            assert(getCatalogTable(tableName).stats.isEmpty)
+            partitionDates.foreach { ds =>
+              assert(queryStats(ds).isEmpty)
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")
+
+            val expectedRowCount = 25
+
+            // Table size should also have been updated
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Row count should NOT be updated with the `NOSCAN` option
+            assert(getTableStats(tableName).rowCount.isEmpty)
+
+            partitionDates.foreach { ds =>
+              val partStats = queryStats(ds)
+              if (partitionStatsEnabled) {
+                assert(partStats.nonEmpty)
+                assert(partStats.get.sizeInBytes > 0)
+                assert(partStats.get.rowCount.isEmpty)
+              } else {
+                assert(partStats.isEmpty)
+              }
+            }

Review Comment:
   ```
   if (partitionStatsEnabled) {
     partitionDates.foreach { ds => ...
   } else {
     partitionDates.foreach { ds => ...
   }
   ```



##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,85 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+          partitionStatsEnabled.toString) {

Review Comment:
   two indentation



##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,85 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+          partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+              """.stripMargin)
+
+            val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
+                  .format("parquet").save(s"$path/ds=$ds")
+            }
+
+            assert(getCatalogTable(tableName).stats.isEmpty)
+            partitionDates.foreach { ds =>
+              assert(queryStats(ds).isEmpty)
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")
+
+            val expectedRowCount = 25

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1803521431

   cc @dongjoon-hyun @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao closed pull request #43629: [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command
URL: https://github.com/apache/spark/pull/43629


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1801070668

   +1 for the new direction, @sunchao .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387660060


##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,83 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+        partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+               """.stripMargin)
+
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
+                  .format("parquet").save(s"$path/ds=$ds")
+            }
+
+            assert(getCatalogTable(tableName).stats.isEmpty)
+            partitionDates.foreach { ds =>
+              assert(queryStats(ds).isEmpty)
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")
+
+            // Table size should also have been updated
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Row count should NOT be updated with the `NOSCAN` option
+            assert(getTableStats(tableName).rowCount.isEmpty)
+
+            partitionDates.foreach { ds =>
+              val partStats = queryStats(ds)
+              if (partitionStatsEnabled) {
+                assert(partStats.nonEmpty)
+                assert(partStats.get.sizeInBytes > 0)
+                assert(partStats.get.rowCount.isEmpty)
+              } else {
+                assert(partStats.isEmpty)
+              }
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
+
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Table row count should be updated
+            assert(getTableStats(tableName).rowCount.get > 0)

Review Comment:
   OK, it's because I defined the table as `USING hive` while the partitions contain Parquet files ... should use `USING parquet` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1790056890

   What about other SQL engine behavior like Hive, Presto, Snowflake?
   
   1. There is no existing SYNTAX for this proposal?
   2. If the proposed behavior is consistent with other SQL engines, we may be able to accept this PR simply as a bug fix.
   3. If the existing behavior is consistent with other SQL engines, also +1 for introducing new syntax for new improvement while keeping the existing behavior.
   
   Thank you for the suggestion and the improvement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1789938018

   AFAIK, users are used to using `REPAIR TABLE` to update partition statistics.
   
   I think `ANALYZE TABLE` should update the whole statistics instead of partition statistics. How to only update the table whole statistics without partition statistics if we accepted this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1388320262


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2671,6 +2671,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED =
+    buildConf("spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled")
+      .doc("When this config is enabled, Spark will also update partition statistics in analyze " +
+          "table command (i.e., ANALYZE TABLE .. COMPUTE STATISTICS [NOSCAN]). Note the command " +

Review Comment:
   Indentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387554850


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala:
##########
@@ -101,56 +98,19 @@ case class AnalyzePartitionCommand(
       if (noscan) {
         Map.empty
       } else {
-        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+        CommandUtils.calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
       }
 
     // Update the metastore if newly computed statistics are different from those
     // recorded in the metastore.
-
-    val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier,
-      partitions.map(_.storage.locationUri))
-    val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-      val newRowCount = rowCounts.get(p.spec)
-      val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
-      newStats.map(_ => p.copy(stats = newStats))
-    }
-
+    val (_, newPartitions) = CommandUtils.calculatePartitionStats(
+      sparkSession, tableMeta, partitions, Some(rowCounts))
     if (newPartitions.nonEmpty) {
       sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
     }
 
     Seq.empty[Row]
   }
 
-  private def calculateRowCountsPerPartition(

Review Comment:
   Is calculateRowCountsPerPartition used only one place?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala:
##########
@@ -101,56 +98,19 @@ case class AnalyzePartitionCommand(
       if (noscan) {
         Map.empty
       } else {
-        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+        CommandUtils.calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
       }
 
     // Update the metastore if newly computed statistics are different from those
     // recorded in the metastore.
-
-    val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier,
-      partitions.map(_.storage.locationUri))
-    val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-      val newRowCount = rowCounts.get(p.spec)
-      val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
-      newStats.map(_ => p.copy(stats = newStats))
-    }
-
+    val (_, newPartitions) = CommandUtils.calculatePartitionStats(
+      sparkSession, tableMeta, partitions, Some(rowCounts))
     if (newPartitions.nonEmpty) {
       sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
     }
 
     Seq.empty[Row]
   }
 
-  private def calculateRowCountsPerPartition(

Review Comment:
   Is `calculateRowCountsPerPartition` used only one place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387340831


##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -372,54 +372,70 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       partition.stats
     }
 
-    withTable(tableName) {
-      withTempPath { path =>
-        // Create a table with 3 partitions all located under a single top-level directory 'path'
-        sql(
-          s"""
-             |CREATE TABLE $tableName (key STRING, value STRING)
-             |USING hive
-             |PARTITIONED BY (ds STRING)
-             |LOCATION '${path.toURI}'
-             """.stripMargin)
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.ANALYZE_PARTITION_STATS_ENABLED.key -> partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+              """.stripMargin)
 
-        val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+            val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
 
-        partitionDates.foreach { ds =>
-          sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
-          sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
-              .format("parquet").save(s"$path/ds=$ds")
-        }
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * from src").write.mode(SaveMode.Overwrite)

Review Comment:
   Let's keep the original. It was `FROM` instead of `from`..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387527377


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala:
##########
@@ -101,56 +98,19 @@ case class AnalyzePartitionCommand(
       if (noscan) {
         Map.empty
       } else {
-        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+        CommandUtils.calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
       }
 
     // Update the metastore if newly computed statistics are different from those
     // recorded in the metastore.
-
-    val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier,
-      partitions.map(_.storage.locationUri))
-    val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-      val newRowCount = rowCounts.get(p.spec)
-      val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
-      newStats.map(_ => p.copy(stats = newStats))
-    }
-
+    val (_, newPartitions) = CommandUtils.calculatePartitionStats(
+      sparkSession, tableMeta, partitions, Some(rowCounts))
     if (newPartitions.nonEmpty) {
       sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
     }
 
     Seq.empty[Row]
   }
 
-  private def calculateRowCountsPerPartition(

Review Comment:
   Hmm what do you mean? it is used multiple times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387358004


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2671,6 +2671,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ANALYZE_PARTITION_STATS_ENABLED =
+    buildConf("spark.sql.statistics.update.partitionStats.enabled")

Review Comment:
   Thanks, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1801071255

   cc @cloud-fan , too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1799582900

   Thanks @dongjoon-hyun @beliefer for the feedback, and sorry for the late reply.
   
   > What about other SQL engine behavior like Hive, Presto, Snowflake?
   
   I'm not aware of similar syntax in the other SQL engines, so if we go this route, it will be Spark only.
   
   Instead of introducing a new syntax, I'm thinking whether we can introduce a new flag to enable this behavior. By default, the flag will be turned off, so there will be no effect on the existing users of this command.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1801223075

   adding a new config to enable this behavior SGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387524303


##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,83 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+        partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+               """.stripMargin)
+
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
+                  .format("parquet").save(s"$path/ds=$ds")
+            }
+
+            assert(getCatalogTable(tableName).stats.isEmpty)
+            partitionDates.foreach { ds =>
+              assert(queryStats(ds).isEmpty)
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")
+
+            // Table size should also have been updated
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Row count should NOT be updated with the `NOSCAN` option
+            assert(getTableStats(tableName).rowCount.isEmpty)
+
+            partitionDates.foreach { ds =>
+              val partStats = queryStats(ds)
+              if (partitionStatsEnabled) {
+                assert(partStats.nonEmpty)
+                assert(partStats.get.sizeInBytes > 0)
+                assert(partStats.get.rowCount.isEmpty)
+              } else {
+                assert(partStats.isEmpty)
+              }
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
+
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Table row count should be updated
+            assert(getTableStats(tableName).rowCount.get > 0)

Review Comment:
   I think it doesn't matter for this PR. But I still want to know what the root cause is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387523005


##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,85 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+          partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+              """.stripMargin)
+
+            val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
+                  .format("parquet").save(s"$path/ds=$ds")
+            }
+
+            assert(getCatalogTable(tableName).stats.isEmpty)
+            partitionDates.foreach { ds =>
+              assert(queryStats(ds).isEmpty)
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")
+
+            val expectedRowCount = 25
+
+            // Table size should also have been updated
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Row count should NOT be updated with the `NOSCAN` option
+            assert(getTableStats(tableName).rowCount.isEmpty)
+
+            partitionDates.foreach { ds =>
+              val partStats = queryStats(ds)
+              if (partitionStatsEnabled) {
+                assert(partStats.nonEmpty)
+                assert(partStats.get.sizeInBytes > 0)
+                assert(partStats.get.rowCount.isEmpty)
+              } else {
+                assert(partStats.isEmpty)
+              }
+            }

Review Comment:
   SGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1789852270

   Thanks @dongjoon-hyun for the quick reply.
   
   > According to the title and first sentence of PR description, is this related to another JIRA
   
   Not really. The title means this PR proposes to in addition of updating table stats, also update partition stats with `ANALYZE TABLE` command.
   
   > Just a question. Why don't we use `REPAIR TABLE` before this?
   
   Hmm I think `REPAIR TABLE` serves a different purpose, and is used to recover partitions for an existing table that is created from a directory which contains sub-directories for partitions. On the other hand, `ANALYZE TABLE` can be used to update table & partition stats. For instance, a partition could already exist for a table, but its stats could be out-of sync, due to reasons such as data was written to the partition directory without going through Spark. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1790181078

   Personally, I suggest:
   
   1. `ANALYZE TABLE <tableName>` still only update the total statistics.
   2. `ANALYZE TABLE <tableName> PARTITION COMPUTE STATISTICS` still only update the specified partition statistics.
   3. Introduce a new SYNTAX to update the total statistics and all the partitions statistics.
   
   How about: `ANALYZE TABLE <tableName> ALL PARTITIONS COMPUTE STATISTICS [NOSCAN]`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387620669


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala:
##########
@@ -101,56 +98,19 @@ case class AnalyzePartitionCommand(
       if (noscan) {
         Map.empty
       } else {
-        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+        CommandUtils.calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
       }
 
     // Update the metastore if newly computed statistics are different from those
     // recorded in the metastore.
-
-    val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier,
-      partitions.map(_.storage.locationUri))
-    val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
-      val newRowCount = rowCounts.get(p.spec)
-      val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
-      newStats.map(_ => p.copy(stats = newStats))
-    }
-
+    val (_, newPartitions) = CommandUtils.calculatePartitionStats(
+      sparkSession, tableMeta, partitions, Some(rowCounts))
     if (newPartitions.nonEmpty) {
       sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
     }
 
     Seq.empty[Row]
   }
 
-  private def calculateRowCountsPerPartition(

Review Comment:
   It is used in two places:
   - `CommandUtils.analyzeTable`
   - `AnalyzePartitionCommand.run`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1387505911


##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala:
##########
@@ -363,6 +363,83 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("SPARK-45731: update partition stats with ANALYZE TABLE") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): Option[CatalogStatistics] = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats
+    }
+
+    val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+
+    Seq(true, false).foreach { partitionStatsEnabled =>
+      withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
+        partitionStatsEnabled.toString) {
+        withTable(tableName) {
+          withTempPath { path =>
+            // Create a table with 3 partitions all located under a directory 'path'
+            sql(
+              s"""
+                 |CREATE TABLE $tableName (key STRING, value STRING)
+                 |USING hive
+                 |PARTITIONED BY (ds STRING)
+                 |LOCATION '${path.toURI}'
+               """.stripMargin)
+
+            partitionDates.foreach { ds =>
+              sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
+              sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
+                  .format("parquet").save(s"$path/ds=$ds")
+            }
+
+            assert(getCatalogTable(tableName).stats.isEmpty)
+            partitionDates.foreach { ds =>
+              assert(queryStats(ds).isEmpty)
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")
+
+            // Table size should also have been updated
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Row count should NOT be updated with the `NOSCAN` option
+            assert(getTableStats(tableName).rowCount.isEmpty)
+
+            partitionDates.foreach { ds =>
+              val partStats = queryStats(ds)
+              if (partitionStatsEnabled) {
+                assert(partStats.nonEmpty)
+                assert(partStats.get.sizeInBytes > 0)
+                assert(partStats.get.rowCount.isEmpty)
+              } else {
+                assert(partStats.isEmpty)
+              }
+            }
+
+            sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
+
+            assert(getTableStats(tableName).sizeInBytes > 0)
+            // Table row count should be updated
+            assert(getTableStats(tableName).rowCount.get > 0)

Review Comment:
   For some reason, the expected row count is different between my local run and the run in Spark CI (75 vs 69), so updated to just check if the `rowCount` is > 0 here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1801224587

   introduce a new flag to enable this behavior looks good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #43629:
URL: https://github.com/apache/spark/pull/43629#issuecomment-1802927767

   Thanks all! Updated the PR with a new flag.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command [spark]

Posted by "patsukp-db (via GitHub)" <gi...@apache.org>.
patsukp-db commented on code in PR #43629:
URL: https://github.com/apache/spark/pull/43629#discussion_r1393596801


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -37,6 +39,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex}
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.Utils

Review Comment:
   This causes a compile error.
   Perhaps, we can do
   
   ```
   import org.apache.spark.util.collection.{Utils => ColUtils}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org