You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zh...@apache.org on 2020/03/17 06:22:26 UTC
[spark] branch branch-3.0 updated: [SPARK-31164][SQL] Inconsistent
rdd and output partitioning for bucket table when output doesn't contain
all bucket columns
This is an automated email from the ASF dual-hosted git repository.
zhenhuawang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 63baffd [SPARK-31164][SQL] Inconsistent rdd and output partitioning for bucket table when output doesn't contain all bucket columns
63baffd is described below
commit 63baffdd255d0c1558ce91c1528b774ec1d35f41
Author: Zhenhua Wang <wz...@163.com>
AuthorDate: Tue Mar 17 14:20:16 2020 +0800
[SPARK-31164][SQL] Inconsistent rdd and output partitioning for bucket table when output doesn't contain all bucket columns
### What changes were proposed in this pull request?
For a bucketed table, when deciding output partitioning, if the output doesn't contain all bucket columns, the result is `UnknownPartitioning`. But when generating rdd, current Spark uses `createBucketedReadRDD` because it doesn't check if the output contains all bucket columns. So the rdd and its output partitioning are inconsistent.
### Why are the changes needed?
To fix a bug.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Modified existing tests.
Closes #27924 from wzhfy/inconsistent_rdd_partitioning.
Authored-by: Zhenhua Wang <wz...@163.com>
Signed-off-by: Zhenhua Wang <wz...@163.com>
(cherry picked from commit 1369a973cdefbab177871124d5ceb2ef55ac136d)
Signed-off-by: Zhenhua Wang <wz...@163.com>
---
.../spark/sql/execution/DataSourceScanExec.scala | 139 +++++++++++----------
.../spark/sql/sources/BucketedReadSuite.scala | 27 ++--
2 files changed, 87 insertions(+), 79 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index c1f7f0a..8d488d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -253,74 +253,75 @@ case class FileSourceScanExec(
partitionFilters.exists(ExecSubqueryExpression.hasSubquery)
}
- override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
- val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
- relation.bucketSpec
+ private def toAttribute(colName: String): Option[Attribute] =
+ output.find(_.name == colName)
+
+ // exposed for testing
+ lazy val bucketedScan: Boolean = {
+ if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined) {
+ val spec = relation.bucketSpec.get
+ val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+ bucketColumns.size == spec.bucketColumnNames.size
} else {
- None
+ false
}
- bucketSpec match {
- case Some(spec) =>
- // For bucketed columns:
- // -----------------------
- // `HashPartitioning` would be used only when:
- // 1. ALL the bucketing columns are being read from the table
- //
- // For sorted columns:
- // ---------------------
- // Sort ordering should be used when ALL these criteria's match:
- // 1. `HashPartitioning` is being used
- // 2. A prefix (or all) of the sort columns are being read from the table.
- //
- // Sort ordering would be over the prefix subset of `sort columns` being read
- // from the table.
- // eg.
- // Assume (col0, col2, col3) are the columns read from the table
- // If sort columns are (col0, col1), then sort ordering would be considered as (col0)
- // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
- // above
-
- def toAttribute(colName: String): Option[Attribute] =
- output.find(_.name == colName)
-
- val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
- if (bucketColumns.size == spec.bucketColumnNames.size) {
- val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
- val sortColumns =
- spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
- val shouldCalculateSortOrder =
- conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
- sortColumns.nonEmpty &&
- !hasPartitionsAvailableAtRunTime
-
- val sortOrder = if (shouldCalculateSortOrder) {
- // In case of bucketing, its possible to have multiple files belonging to the
- // same bucket in a given relation. Each of these files are locally sorted
- // but those files combined together are not globally sorted. Given that,
- // the RDD partition will not be sorted even if the relation has sort columns set
- // Current solution is to check if all the buckets have a single file in it
-
- val files = selectedPartitions.flatMap(partition => partition.files)
- val bucketToFilesGrouping =
- files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
- val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
-
- if (singleFilePartitions) {
- // TODO Currently Spark does not support writing columns sorting in descending order
- // so using Ascending order. This can be fixed in future
- sortColumns.map(attribute => SortOrder(attribute, Ascending))
- } else {
- Nil
- }
- } else {
- Nil
- }
- (partitioning, sortOrder)
+ }
+
+ override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
+ if (bucketedScan) {
+ // For bucketed columns:
+ // -----------------------
+ // `HashPartitioning` would be used only when:
+ // 1. ALL the bucketing columns are being read from the table
+ //
+ // For sorted columns:
+ // ---------------------
+ // Sort ordering should be used when ALL these criteria's match:
+ // 1. `HashPartitioning` is being used
+ // 2. A prefix (or all) of the sort columns are being read from the table.
+ //
+ // Sort ordering would be over the prefix subset of `sort columns` being read
+ // from the table.
+ // eg.
+ // Assume (col0, col2, col3) are the columns read from the table
+ // If sort columns are (col0, col1), then sort ordering would be considered as (col0)
+ // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
+ // above
+ val spec = relation.bucketSpec.get
+ val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+ val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
+ val sortColumns =
+ spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
+ val shouldCalculateSortOrder =
+ conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
+ sortColumns.nonEmpty &&
+ !hasPartitionsAvailableAtRunTime
+
+ val sortOrder = if (shouldCalculateSortOrder) {
+ // In case of bucketing, its possible to have multiple files belonging to the
+ // same bucket in a given relation. Each of these files are locally sorted
+ // but those files combined together are not globally sorted. Given that,
+ // the RDD partition will not be sorted even if the relation has sort columns set
+ // Current solution is to check if all the buckets have a single file in it
+
+ val files = selectedPartitions.flatMap(partition => partition.files)
+ val bucketToFilesGrouping =
+ files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
+ val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
+
+ if (singleFilePartitions) {
+ // TODO Currently Spark does not support writing columns sorting in descending order
+ // so using Ascending order. This can be fixed in future
+ sortColumns.map(attribute => SortOrder(attribute, Ascending))
} else {
- (UnknownPartitioning(0), Nil)
+ Nil
}
- case _ =>
- (UnknownPartitioning(0), Nil)
+ } else {
+ Nil
+ }
+ (partitioning, sortOrder)
+ } else {
+ (UnknownPartitioning(0), Nil)
}
}
@@ -393,11 +394,11 @@ case class FileSourceScanExec(
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
- val readRDD = relation.bucketSpec match {
- case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
- createBucketedReadRDD(bucketing, readFile, dynamicallySelectedPartitions, relation)
- case _ =>
- createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation)
+ val readRDD = if (bucketedScan) {
+ createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
+ relation)
+ } else {
+ createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation)
}
sendDriverMetrics()
readRDD
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 57bbf20..14ba008 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
+import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.BucketingUtils
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -100,6 +100,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
}
}
+ private def getFileScan(plan: SparkPlan): FileSourceScanExec = {
+ val fileScan = plan.collect { case f: FileSourceScanExec => f }
+ assert(fileScan.nonEmpty, plan)
+ fileScan.head
+ }
+
// To verify if the bucket pruning works, this function checks two conditions:
// 1) Check if the pruned buckets (before filtering) are empty.
// 2) Verify the final result is the same as the expected one
@@ -119,8 +125,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
- val rdd = plan.find(_.isInstanceOf[DataSourceScanExec])
- assert(rdd.isDefined, plan)
+ val fileScan = getFileScan(plan)
// if nothing should be pruned, skip the pruning test
if (bucketValues.nonEmpty) {
@@ -128,7 +133,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
bucketValues.foreach { value =>
matchedBuckets.set(BucketingUtils.getBucketIdFromValue(bucketColumn, numBuckets, value))
}
- val invalidBuckets = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
+ val invalidBuckets = fileScan.execute().mapPartitionsWithIndex { case (index, iter) =>
// return indexes of partitions that should have been pruned and are not empty
if (!matchedBuckets.get(index % numBuckets) && iter.nonEmpty) {
Iterator(index)
@@ -297,10 +302,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k")
val plan = bucketedDataFrame.queryExecution.executedPlan
- val rdd = plan.find(_.isInstanceOf[DataSourceScanExec])
- assert(rdd.isDefined, plan)
+ val fileScan = getFileScan(plan)
- val emptyBuckets = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
+ val emptyBuckets = fileScan.execute().mapPartitionsWithIndex { case (index, iter) =>
// return indexes of empty partitions
if (iter.isEmpty) {
Iterator(index)
@@ -762,10 +766,13 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
- checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j"))
+ val scanDF = spark.table("bucketed_table").select("j")
+ assert(!getFileScan(scanDF.queryExecution.executedPlan).bucketedScan)
+ checkAnswer(scanDF, df1.select("j"))
- checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")),
- df1.groupBy("j").agg(max("k")))
+ val aggDF = spark.table("bucketed_table").groupBy("j").agg(max("k"))
+ assert(!getFileScan(aggDF.queryExecution.executedPlan).bucketedScan)
+ checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org