You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/31 00:39:38 UTC

[GitHub] [spark] maropu commented on a change in pull request #31179: [SPARK-34113][SQL] Use metric data update metadata statistic's size and rowCount

maropu commented on a change in pull request #31179:
URL: https://github.com/apache/spark/pull/31179#discussion_r604513642



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>
+            key -> value.get
+          }
+          val partition = catalog.listPartitions(table.identifier, Some(spec))
+          val newTableStats = CommandUtils.mergeNewStats(
+            newTable.stats, statsTracker.totalNumBytes, Some(statsTracker.totalNumOutput))
+          val newPartitions = partition.flatten { part =>
+            val newStates = if (part.stats.isDefined && part.stats.get.rowCount.isDefined) {
+              CommandUtils.mergeNewStats(

Review comment:
       ditto

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -199,6 +312,17 @@ object CommandUtils extends Logging {
     newStats
   }
 
+  def mergeNewStats(

Review comment:
       private

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>
+            key -> value.get
+          }
+          val partition = catalog.listPartitions(table.identifier, Some(spec))
+          val newTableStats = CommandUtils.mergeNewStats(

Review comment:
       `CommandUtils.mergeNewStats` -> `mergeNewStats`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>

Review comment:
       `val spec = partitionSpec.mapValues(_.get)`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>
+            key -> value.get
+          }
+          val partition = catalog.listPartitions(table.identifier, Some(spec))
+          val newTableStats = CommandUtils.mergeNewStats(
+            newTable.stats, statsTracker.totalNumBytes, Some(statsTracker.totalNumOutput))
+          val newPartitions = partition.flatten { part =>
+            val newStates = if (part.stats.isDefined && part.stats.get.rowCount.isDefined) {
+              CommandUtils.mergeNewStats(
+                part.stats, statsTracker.totalNumBytes, Some(statsTracker.totalNumOutput))
+            } else {
+              CommandUtils.compareAndGetNewStats(
+                part.stats, statsTracker.totalNumBytes, Some(statsTracker.totalNumOutput))
+            }
+            newStates.map(_ => part.copy(stats = newStates))
+          }
+          if (newTableStats.isDefined) {
+            catalog.alterTableStats(table.identifier, newTableStats)
+          }
+          if (newPartitions.nonEmpty) {
+            catalog.alterPartitions(table.identifier, newPartitions)
+          }
+        } else {
+          // update all partitions statistics
+          val partitions = statsTracker.partitionsStats.map { case (part, stats) =>
+            val partition = catalog.getPartition(table.identifier, part)
+            val newStats = Some(CatalogStatistics(
+              sizeInBytes = stats.numBytes, rowCount = Some(stats.numRows)))
+            partition.copy(stats = newStats)
+          }.toSeq
+          if (partitions.nonEmpty) {
+            catalog.alterPartitions(table.identifier, partitions)
+          }
+
+          if (isPartialPartitions) {
+            val newStats = CommandUtils.mergeNewStats(
+              newTable.stats, statsTracker.totalNumBytes, Some(statsTracker.totalNumOutput))
+            if (newStats.isDefined) {
+              catalog.alterTableStats(table.identifier, newStats)
+            }
+          } else {
+            val newStats = CommandUtils.compareAndGetNewStats(
+              newTable.stats, statsTracker.totalNumBytes, Some(statsTracker.totalNumOutput))
+            if (newStats.isDefined) {
+              catalog.alterTableStats(table.identifier, newStats)
+            }
+          }
+        }
+      } else {
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>

Review comment:
       `val spec = partitionSpec.mapValues(_.get)`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&

Review comment:
       Could you move `isPartialPartitions` into L102?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
##########
@@ -38,25 +39,40 @@ import org.apache.spark.util.SerializableConfiguration
  * These were first introduced in https://github.com/apache/spark/pull/18159 (SPARK-20703).
  */
 case class BasicWriteTaskStats(
-    partitions: Seq[InternalRow],
-    numFiles: Int,
-    numBytes: Long,
-    numRows: Long)
+    partitionsStats: mutable.Map[TablePartitionSpec, PartitionStats],

Review comment:
       partitionsStats -> partitionSpecWithStats

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {

Review comment:
       Why do we need to handle the single partition case and the non-single partition case separately?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>
+            key -> value.get
+          }
+          val partition = catalog.listPartitions(table.identifier, Some(spec))
+          val newTableStats = CommandUtils.mergeNewStats(
+            newTable.stats, statsTracker.totalNumBytes, Some(statsTracker.totalNumOutput))
+          val newPartitions = partition.flatten { part =>

Review comment:
       This block seems to be the same with the `overwrite=false` case? https://github.com/apache/spark/pull/31179/files#diff-6309057f8f41f20f8de513ab67d7755aae5fb30d7441fc21000999c9e8e8e0bfR125-R140




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org