You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/25 06:54:34 UTC

[spark] branch branch-3.1 updated: [SPARK-33726][SQL] Fix for Duplicate field names during Aggregation

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 63f07c7  [SPARK-33726][SQL] Fix for Duplicate field names during Aggregation
63f07c7 is described below

commit 63f07c73c57d9d163f2360bbe2766e33fc723d3c
Author: yliou <yl...@berkeley.edu>
AuthorDate: Mon Jan 25 06:53:26 2021 +0000

    [SPARK-33726][SQL] Fix for Duplicate field names during Aggregation
    
    ### What changes were proposed in this pull request?
    The `RowBasedKeyValueBatch` has two different implementations depending on whether the aggregation key and value uses only fixed length data types (`FixedLengthRowBasedKeyValueBatch`) or not (`VariableLengthRowBasedKeyValueBatch`).
    
    Before this PR the decision about the used implementation was based on by accessing the schema fields by their name.
    But if two fields has the same name and one with variable length and the other with fixed length type (and all the other fields are with fixed length types) a bad decision could be made.
    
    When `FixedLengthRowBasedKeyValueBatch` is chosen but there is a variable length field then an aggregation function could calculate with invalid values. This case is illustrated by the example used in the unit test:
    
    `with T as (select id as a, -id as x from range(3)),
            U as (select id as b, cast(id as string) as x from range(3))
    select T.x, U.x, min(a) as ma, min(b) as mb from T join U on a=b group by U.x, T.x`
    where the 'x' column in the left side of the join is a Long but on the right side is a String.
    
    ### Why are the changes needed?
    Fixes the issue where duplicate field name aggregation has null values in the dataframe.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT, tested manually on spark shell.
    
    Closes #30788 from yliou/SPARK-33726.
    
    Authored-by: yliou <yl...@berkeley.edu>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 512cacf7c61acb3282720192b875555543a1f3eb)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/expressions/RowBasedKeyValueBatch.java    | 10 ++++------
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala     | 14 ++++++++++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
index 6344cf1..9613c87 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
@@ -79,13 +79,11 @@ public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Cl
     boolean allFixedLength = true;
     // checking if there is any variable length fields
     // there is probably a more succinct impl of this
-    for (String name : keySchema.fieldNames()) {
-      allFixedLength = allFixedLength
-              && UnsafeRow.isFixedLength(keySchema.apply(name).dataType());
+    for (StructField field : keySchema.fields()) {
+      allFixedLength = allFixedLength && UnsafeRow.isFixedLength(field.dataType());
     }
-    for (String name : valueSchema.fieldNames()) {
-      allFixedLength = allFixedLength
-              && UnsafeRow.isFixedLength(valueSchema.apply(name).dataType());
+    for (StructField field : valueSchema.fields()) {
+      allFixedLength = allFixedLength && UnsafeRow.isFixedLength(field.dataType());
     }
 
     if (allFixedLength) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 6603fc0..c6d134b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1077,6 +1077,20 @@ class DataFrameAggregateSuite extends QueryTest
     assert(aggs.head.output.map(_.dataType.simpleString).head ===
       aggs.last.output.map(_.dataType.simpleString).head)
   }
+
+  test("SPARK-33726: Aggregation on a table where a column name is reused") {
+    val query =
+      """|with T as (
+         |select id as a, -id as x from range(3)),
+         |U as (
+         |select id as b, cast(id as string) as x from range(3))
+         |select T.x, U.x, min(a) as ma, min(b) as mb
+         |from T join U on a=b
+         |group by U.x, T.x
+      """.stripMargin
+    val df = spark.sql(query)
+    checkAnswer(df, Row(0, "0", 0, 0) :: Row(-1, "1", 1, 1) :: Row(-2, "2", 2, 2) :: Nil)
+  }
 }
 
 case class B(c: Option[Double])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org