You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/26 16:39:02 UTC

[spark] branch branch-2.4 updated: [SPARK-33726][SQL][2.4] Fix for Duplicate field names during Aggregation

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new a5f844b  [SPARK-33726][SQL][2.4] Fix for Duplicate field names during Aggregation
a5f844b is described below

commit a5f844b0375231573a416da929752ecccfaeac98
Author: yliou <yl...@berkeley.edu>
AuthorDate: Tue Jan 26 16:38:20 2021 +0000

    [SPARK-33726][SQL][2.4] Fix for Duplicate field names during Aggregation
    
    ### What changes were proposed in this pull request?
    The `RowBasedKeyValueBatch` has two different implementations depending on whether the aggregation key and value uses only fixed length data types (`FixedLengthRowBasedKeyValueBatch`) or not (`VariableLengthRowBasedKeyValueBatch`).
    
    Before this PR the decision about the used implementation was based on by accessing the schema fields by their name.
    But if two fields has the same name and one with variable length and the other with fixed length type (and all the other fields are with fixed length types) a bad decision could be made.
    
    When `FixedLengthRowBasedKeyValueBatch` is chosen but there is a variable length field then an aggregation function could calculate with invalid values. This case is illustrated by the example used in the unit test:
    
    `with T as (select id as a, -id as x from range(3)),
            U as (select id as b, cast(id as string) as x from range(3))
    select T.x, U.x, min(a) as ma, min(b) as mb from T join U on a=b group by U.x, T.x`
    where the 'x' column in the left side of the join is a Long but on the right side is a String.
    
    ### Why are the changes needed?
    Fixes the issue where duplicate field name aggregation has null values in the dataframe.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT, tested manually on spark shell.
    
    Closes #31327 from yliou/SPARK-33726_2.4.
    
    Authored-by: yliou <yl...@berkeley.edu>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/expressions/RowBasedKeyValueBatch.java    | 10 ++++------
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala     | 14 ++++++++++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
index 551443a..f4dceb4 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
@@ -77,13 +77,11 @@ public abstract class RowBasedKeyValueBatch extends MemoryConsumer {
     boolean allFixedLength = true;
     // checking if there is any variable length fields
     // there is probably a more succinct impl of this
-    for (String name : keySchema.fieldNames()) {
-      allFixedLength = allFixedLength
-              && UnsafeRow.isFixedLength(keySchema.apply(name).dataType());
+    for (StructField field : keySchema.fields()) {
+      allFixedLength = allFixedLength && UnsafeRow.isFixedLength(field.dataType());
     }
-    for (String name : valueSchema.fieldNames()) {
-      allFixedLength = allFixedLength
-              && UnsafeRow.isFixedLength(valueSchema.apply(name).dataType());
+    for (StructField field : valueSchema.fields()) {
+      allFixedLength = allFixedLength && UnsafeRow.isFixedLength(field.dataType());
     }
 
     if (allFixedLength) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 86a1086..bedf9e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -750,4 +750,18 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
     checkAnswer(sql(queryTemplate("FIRST")), Row(1))
     checkAnswer(sql(queryTemplate("LAST")), Row(3))
   }
+
+  test("SPARK-33726: Aggregation on a table where a column name is reused") {
+    val query =
+      """|with T as (
+         |select id as a, -id as x from range(3)),
+         |U as (
+         |select id as b, cast(id as string) as x from range(3))
+         |select T.x, U.x, min(a) as ma, min(b) as mb
+         |from T join U on a=b
+         |group by U.x, T.x
+      """.stripMargin
+    val df = spark.sql(query)
+    checkAnswer(df, Row(0, "0", 0, 0) :: Row(-1, "1", 1, 1) :: Row(-2, "2", 2, 2) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org