You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zh...@apache.org on 2020/03/17 06:22:26 UTC

[spark] branch branch-3.0 updated: [SPARK-31164][SQL] Inconsistent rdd and output partitioning for bucket table when output doesn't contain all bucket columns

This is an automated email from the ASF dual-hosted git repository.

zhenhuawang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 63baffd  [SPARK-31164][SQL] Inconsistent rdd and output partitioning for bucket table when output doesn't contain all bucket columns
63baffd is described below

commit 63baffdd255d0c1558ce91c1528b774ec1d35f41
Author: Zhenhua Wang <wz...@163.com>
AuthorDate: Tue Mar 17 14:20:16 2020 +0800

    [SPARK-31164][SQL] Inconsistent rdd and output partitioning for bucket table when output doesn't contain all bucket columns
    
    ### What changes were proposed in this pull request?
    
    For a bucketed table, when deciding output partitioning, if the output doesn't contain all bucket columns, the result is `UnknownPartitioning`. But when generating rdd, current Spark uses `createBucketedReadRDD` because it doesn't check if the output contains all bucket columns. So the rdd and its output partitioning are inconsistent.
    
    ### Why are the changes needed?
    
    To fix a bug.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Modified existing tests.
    
    Closes #27924 from wzhfy/inconsistent_rdd_partitioning.
    
    Authored-by: Zhenhua Wang <wz...@163.com>
    Signed-off-by: Zhenhua Wang <wz...@163.com>
    (cherry picked from commit 1369a973cdefbab177871124d5ceb2ef55ac136d)
    Signed-off-by: Zhenhua Wang <wz...@163.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 139 +++++++++++----------
 .../spark/sql/sources/BucketedReadSuite.scala      |  27 ++--
 2 files changed, 87 insertions(+), 79 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index c1f7f0a..8d488d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -253,74 +253,75 @@ case class FileSourceScanExec(
     partitionFilters.exists(ExecSubqueryExpression.hasSubquery)
   }
 
-  override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
-    val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
-      relation.bucketSpec
+  private def toAttribute(colName: String): Option[Attribute] =
+    output.find(_.name == colName)
+
+  // exposed for testing
+  lazy val bucketedScan: Boolean = {
+    if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined) {
+      val spec = relation.bucketSpec.get
+      val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+      bucketColumns.size == spec.bucketColumnNames.size
     } else {
-      None
+      false
     }
-    bucketSpec match {
-      case Some(spec) =>
-        // For bucketed columns:
-        // -----------------------
-        // `HashPartitioning` would be used only when:
-        // 1. ALL the bucketing columns are being read from the table
-        //
-        // For sorted columns:
-        // ---------------------
-        // Sort ordering should be used when ALL these criteria's match:
-        // 1. `HashPartitioning` is being used
-        // 2. A prefix (or all) of the sort columns are being read from the table.
-        //
-        // Sort ordering would be over the prefix subset of `sort columns` being read
-        // from the table.
-        // eg.
-        // Assume (col0, col2, col3) are the columns read from the table
-        // If sort columns are (col0, col1), then sort ordering would be considered as (col0)
-        // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
-        // above
-
-        def toAttribute(colName: String): Option[Attribute] =
-          output.find(_.name == colName)
-
-        val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
-        if (bucketColumns.size == spec.bucketColumnNames.size) {
-          val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
-          val sortColumns =
-            spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
-          val shouldCalculateSortOrder =
-            conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
-              sortColumns.nonEmpty &&
-              !hasPartitionsAvailableAtRunTime
-
-          val sortOrder = if (shouldCalculateSortOrder) {
-            // In case of bucketing, its possible to have multiple files belonging to the
-            // same bucket in a given relation. Each of these files are locally sorted
-            // but those files combined together are not globally sorted. Given that,
-            // the RDD partition will not be sorted even if the relation has sort columns set
-            // Current solution is to check if all the buckets have a single file in it
-
-            val files = selectedPartitions.flatMap(partition => partition.files)
-            val bucketToFilesGrouping =
-              files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
-            val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
-
-            if (singleFilePartitions) {
-              // TODO Currently Spark does not support writing columns sorting in descending order
-              // so using Ascending order. This can be fixed in future
-              sortColumns.map(attribute => SortOrder(attribute, Ascending))
-            } else {
-              Nil
-            }
-          } else {
-            Nil
-          }
-          (partitioning, sortOrder)
+  }
+
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
+    if (bucketedScan) {
+      // For bucketed columns:
+      // -----------------------
+      // `HashPartitioning` would be used only when:
+      // 1. ALL the bucketing columns are being read from the table
+      //
+      // For sorted columns:
+      // ---------------------
+      // Sort ordering should be used when ALL these criteria's match:
+      // 1. `HashPartitioning` is being used
+      // 2. A prefix (or all) of the sort columns are being read from the table.
+      //
+      // Sort ordering would be over the prefix subset of `sort columns` being read
+      // from the table.
+      // eg.
+      // Assume (col0, col2, col3) are the columns read from the table
+      // If sort columns are (col0, col1), then sort ordering would be considered as (col0)
+      // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
+      // above
+      val spec = relation.bucketSpec.get
+      val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+      val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
+      val sortColumns =
+        spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
+      val shouldCalculateSortOrder =
+        conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
+          sortColumns.nonEmpty &&
+          !hasPartitionsAvailableAtRunTime
+
+      val sortOrder = if (shouldCalculateSortOrder) {
+        // In case of bucketing, its possible to have multiple files belonging to the
+        // same bucket in a given relation. Each of these files are locally sorted
+        // but those files combined together are not globally sorted. Given that,
+        // the RDD partition will not be sorted even if the relation has sort columns set
+        // Current solution is to check if all the buckets have a single file in it
+
+        val files = selectedPartitions.flatMap(partition => partition.files)
+        val bucketToFilesGrouping =
+          files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
+        val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
+
+        if (singleFilePartitions) {
+          // TODO Currently Spark does not support writing columns sorting in descending order
+          // so using Ascending order. This can be fixed in future
+          sortColumns.map(attribute => SortOrder(attribute, Ascending))
         } else {
-          (UnknownPartitioning(0), Nil)
+          Nil
         }
-      case _ =>
-        (UnknownPartitioning(0), Nil)
+      } else {
+        Nil
+      }
+      (partitioning, sortOrder)
+    } else {
+      (UnknownPartitioning(0), Nil)
     }
   }
 
@@ -393,11 +394,11 @@ case class FileSourceScanExec(
         options = relation.options,
         hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
 
-    val readRDD = relation.bucketSpec match {
-      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
-        createBucketedReadRDD(bucketing, readFile, dynamicallySelectedPartitions, relation)
-      case _ =>
-        createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation)
+    val readRDD = if (bucketedScan) {
+      createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
+        relation)
+    } else {
+      createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation)
     }
     sendDriverMetrics()
     readRDD
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 57bbf20..14ba008 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
+import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, SortExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.datasources.BucketingUtils
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -100,6 +100,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  private def getFileScan(plan: SparkPlan): FileSourceScanExec = {
+    val fileScan = plan.collect { case f: FileSourceScanExec => f }
+    assert(fileScan.nonEmpty, plan)
+    fileScan.head
+  }
+
   // To verify if the bucket pruning works, this function checks two conditions:
   //   1) Check if the pruned buckets (before filtering) are empty.
   //   2) Verify the final result is the same as the expected one
@@ -119,8 +125,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
 
       // Filter could hide the bug in bucket pruning. Thus, skipping all the filters
       val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
-      val rdd = plan.find(_.isInstanceOf[DataSourceScanExec])
-      assert(rdd.isDefined, plan)
+      val fileScan = getFileScan(plan)
 
       // if nothing should be pruned, skip the pruning test
       if (bucketValues.nonEmpty) {
@@ -128,7 +133,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
         bucketValues.foreach { value =>
           matchedBuckets.set(BucketingUtils.getBucketIdFromValue(bucketColumn, numBuckets, value))
         }
-        val invalidBuckets = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
+        val invalidBuckets = fileScan.execute().mapPartitionsWithIndex { case (index, iter) =>
           // return indexes of partitions that should have been pruned and are not empty
           if (!matchedBuckets.get(index % numBuckets) && iter.nonEmpty) {
             Iterator(index)
@@ -297,10 +302,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
 
       val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k")
       val plan = bucketedDataFrame.queryExecution.executedPlan
-      val rdd = plan.find(_.isInstanceOf[DataSourceScanExec])
-      assert(rdd.isDefined, plan)
+      val fileScan = getFileScan(plan)
 
-      val emptyBuckets = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
+      val emptyBuckets = fileScan.execute().mapPartitionsWithIndex { case (index, iter) =>
         // return indexes of empty partitions
         if (iter.isEmpty) {
           Iterator(index)
@@ -762,10 +766,13 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
 
-      checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j"))
+      val scanDF = spark.table("bucketed_table").select("j")
+      assert(!getFileScan(scanDF.queryExecution.executedPlan).bucketedScan)
+      checkAnswer(scanDF, df1.select("j"))
 
-      checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")),
-        df1.groupBy("j").agg(max("k")))
+      val aggDF = spark.table("bucketed_table").groupBy("j").agg(max("k"))
+      assert(!getFileScan(aggDF.queryExecution.executedPlan).bucketedScan)
+      checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org