You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/09/15 06:06:41 UTC

[spark] branch master updated: [SPARK-40429][SQL] Only set KeyGroupedPartitioning when the referenced column is in the output

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 034e48fd47f [SPARK-40429][SQL] Only set KeyGroupedPartitioning when the referenced column is in the output
034e48fd47f is described below

commit 034e48fd47f49a603c1cad507608958f5beeddc8
Author: huaxingao <hu...@apple.com>
AuthorDate: Wed Sep 14 23:06:22 2022 -0700

    [SPARK-40429][SQL] Only set KeyGroupedPartitioning when the referenced column is in the output
    
    ### What changes were proposed in this pull request?
    Only set `KeyGroupedPartitioning` when the referenced column is in the output
    
    ### Why are the changes needed?
    bug fixing
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    new test
    
    Closes #37886 from huaxingao/keyGroupedPartitioning.
    
    Authored-by: huaxingao <hu...@apple.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../datasources/v2/V2ScanPartitioningAndOrdering.scala   | 14 ++++++++++++--
 .../apache/spark/sql/connector/MetadataColumnSuite.scala | 16 ++++++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
index 8ab0dc70726..5c8c7cf420d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
@@ -41,8 +41,18 @@ object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with SQLConfHelpe
   private def partitioning(plan: LogicalPlan) = plan.transformDown {
     case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None, _) =>
       val catalystPartitioning = scan.outputPartitioning() match {
-        case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map(
-          V2ExpressionUtils.toCatalystOpt(_, relation, relation.funCatalog)))
+        case kgp: KeyGroupedPartitioning =>
+          val partitioning = sequenceToOption(
+            kgp.keys().map(V2ExpressionUtils.toCatalystOpt(_, relation, relation.funCatalog)))
+          if (partitioning.isEmpty) {
+            None
+          } else {
+            if (partitioning.get.forall(p => p.references.subsetOf(d.outputSet))) {
+              partitioning
+            } else {
+              None
+            }
+          }
         case _: UnknownPartitioning => None
         case p => throw new IllegalArgumentException("Unsupported data source V2 partitioning " +
             "type: " + p.getClass.getSimpleName)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
index 9b90ee43657..8454b9f85ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
@@ -216,4 +216,20 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
       .withColumn("right_all", struct($"right.*"))
     checkAnswer(dfQuery, Row(1, "a", "b", Row(1, "a"), Row(1, "b")))
   }
+
+  test("SPARK-40429: Only set KeyGroupedPartitioning when the referenced column is in the output") {
+    withTable(tbl) {
+      sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
+      sql(s"INSERT INTO $tbl VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+      checkAnswer(
+        spark.table(tbl).select("index", "_partition"),
+        Seq(Row(0, "3"), Row(0, "2"), Row(0, "1"))
+      )
+
+      checkAnswer(
+        spark.table(tbl).select("id", "index", "_partition"),
+        Seq(Row(3, 0, "3"), Row(2, 0, "2"), Row(1, 0, "1"))
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org