You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/25 06:54:51 UTC

[spark] branch branch-3.0 updated: [SPARK-33726][SQL] Fix for Duplicate field names during Aggregation

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b059bb2  [SPARK-33726][SQL] Fix for Duplicate field names during Aggregation
b059bb2 is described below

commit b059bb25d15bbbc093d2c70d26c4e220fb7fbe5d
Author: yliou <yl...@berkeley.edu>
AuthorDate: Mon Jan 25 06:53:26 2021 +0000

    [SPARK-33726][SQL] Fix for Duplicate field names during Aggregation
    
    ### What changes were proposed in this pull request?
    The `RowBasedKeyValueBatch` has two different implementations depending on whether the aggregation key and value uses only fixed length data types (`FixedLengthRowBasedKeyValueBatch`) or not (`VariableLengthRowBasedKeyValueBatch`).
    
    Before this PR the decision about the used implementation was based on by accessing the schema fields by their name.
    But if two fields has the same name and one with variable length and the other with fixed length type (and all the other fields are with fixed length types) a bad decision could be made.
    
    When `FixedLengthRowBasedKeyValueBatch` is chosen but there is a variable length field then an aggregation function could calculate with invalid values. This case is illustrated by the example used in the unit test:
    
    `with T as (select id as a, -id as x from range(3)),
            U as (select id as b, cast(id as string) as x from range(3))
    select T.x, U.x, min(a) as ma, min(b) as mb from T join U on a=b group by U.x, T.x`
    where the 'x' column in the left side of the join is a Long but on the right side is a String.
    
    ### Why are the changes needed?
    Fixes the issue where duplicate field name aggregation has null values in the dataframe.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT, tested manually on spark shell.
    
    Closes #30788 from yliou/SPARK-33726.
    
    Authored-by: yliou <yl...@berkeley.edu>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 512cacf7c61acb3282720192b875555543a1f3eb)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/expressions/RowBasedKeyValueBatch.java    | 10 ++++------
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala     | 14 ++++++++++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
index 6344cf1..9613c87 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
@@ -79,13 +79,11 @@ public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Cl
     boolean allFixedLength = true;
     // checking if there is any variable length fields
     // there is probably a more succinct impl of this
-    for (String name : keySchema.fieldNames()) {
-      allFixedLength = allFixedLength
-              && UnsafeRow.isFixedLength(keySchema.apply(name).dataType());
+    for (StructField field : keySchema.fields()) {
+      allFixedLength = allFixedLength && UnsafeRow.isFixedLength(field.dataType());
     }
-    for (String name : valueSchema.fieldNames()) {
-      allFixedLength = allFixedLength
-              && UnsafeRow.isFixedLength(valueSchema.apply(name).dataType());
+    for (StructField field : valueSchema.fields()) {
+      allFixedLength = allFixedLength && UnsafeRow.isFixedLength(field.dataType());
     }
 
     if (allFixedLength) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 2cb7790..c1bff68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1052,6 +1052,20 @@ class DataFrameAggregateSuite extends QueryTest
     assert(aggs.head.output.map(_.dataType.simpleString).head ===
       aggs.last.output.map(_.dataType.simpleString).head)
   }
+
+  test("SPARK-33726: Aggregation on a table where a column name is reused") {
+    val query =
+      """|with T as (
+         |select id as a, -id as x from range(3)),
+         |U as (
+         |select id as b, cast(id as string) as x from range(3))
+         |select T.x, U.x, min(a) as ma, min(b) as mb
+         |from T join U on a=b
+         |group by U.x, T.x
+      """.stripMargin
+    val df = spark.sql(query)
+    checkAnswer(df, Row(0, "0", 0, 0) :: Row(-1, "1", 1, 1) :: Row(-2, "2", 2, 2) :: Nil)
+  }
 }
 
 case class B(c: Option[Double])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org