You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/01/31 07:02:42 UTC

[spark] branch branch-2.4 updated: [SPARK-30065][SQL][2.4] DataFrameNaFunctions.drop should handle duplicate columns

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new c7c2bda  [SPARK-30065][SQL][2.4] DataFrameNaFunctions.drop should handle duplicate columns
c7c2bda is described below

commit c7c2bdadeace83bfe4b7918c46da9b09db864001
Author: Terry Kim <yu...@gmail.com>
AuthorDate: Thu Jan 30 23:01:57 2020 -0800

    [SPARK-30065][SQL][2.4] DataFrameNaFunctions.drop should handle duplicate columns
    
    (Backport of #26700)
    
    ### What changes were proposed in this pull request?
    
    `DataFrameNaFunctions.drop` doesn't handle duplicate columns even when column names are not specified.
    
    ```Scala
    val left = Seq(("1", null), ("3", "4")).toDF("col1", "col2")
    val right = Seq(("1", "2"), ("3", null)).toDF("col1", "col2")
    val df = left.join(right, Seq("col1"))
    df.printSchema
    df.na.drop("any").show
    ```
    produces
    ```
    root
     |-- col1: string (nullable = true)
     |-- col2: string (nullable = true)
     |-- col2: string (nullable = true)
    
    org.apache.spark.sql.AnalysisException: Reference 'col2' is ambiguous, could be: col2, col2.;
      at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:240)
    ```
    The reason for the above failure is that columns are resolved by name and if there are multiple columns with the same name, it will fail due to ambiguity.
    
    This PR updates `DataFrameNaFunctions.drop` such that if the columns to drop are not specified, it will resolve ambiguity gracefully by applying `drop` to all the eligible columns. (Note that if the user specifies the columns, it will still continue to fail due to ambiguity).
    
    ### Why are the changes needed?
    
    If column names are not specified, `drop` should not fail due to ambiguity since it should still be able to apply `drop` to the eligible columns.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, now all the rows with nulls are dropped in the above example:
    ```
    scala> df.na.drop("any").show
    +----+----+----+
    |col1|col2|col2|
    +----+----+----+
    +----+----+----+
    ```
    
    ### How was this patch tested?
    
    Added new unit tests.
    
    Closes #27411 from imback82/backport-SPARK-30065.
    
    Authored-by: Terry Kim <yu...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/DataFrameNaFunctions.scala    | 32 ++++++++++++++--------
 .../spark/sql/DataFrameNaFunctionsSuite.scala      | 25 +++++++++++++++--
 2 files changed, 43 insertions(+), 14 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
index e705635..fcb440c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
@@ -41,7 +41,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
    *
    * @since 1.3.1
    */
-  def drop(): DataFrame = drop("any", df.columns)
+  def drop(): DataFrame = drop0("any", outputAttributes)
 
   /**
    * Returns a new `DataFrame` that drops rows containing null or NaN values.
@@ -51,7 +51,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
    *
    * @since 1.3.1
    */
-  def drop(how: String): DataFrame = drop(how, df.columns)
+  def drop(how: String): DataFrame = drop0(how, outputAttributes)
 
   /**
    * Returns a new `DataFrame` that drops rows containing any null or NaN values
@@ -90,11 +90,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
    * @since 1.3.1
    */
   def drop(how: String, cols: Seq[String]): DataFrame = {
-    how.toLowerCase(Locale.ROOT) match {
-      case "any" => drop(cols.size, cols)
-      case "all" => drop(1, cols)
-      case _ => throw new IllegalArgumentException(s"how ($how) must be 'any' or 'all'")
-    }
+    drop0(how, toAttributes(cols))
   }
 
   /**
@@ -120,10 +116,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
    * @since 1.3.1
    */
   def drop(minNonNulls: Int, cols: Seq[String]): DataFrame = {
-    // Filtering condition:
-    // only keep the row if it has at least `minNonNulls` non-null and non-NaN values.
-    val predicate = AtLeastNNonNulls(minNonNulls, cols.map(name => df.resolve(name)))
-    df.filter(Column(predicate))
+    drop0(minNonNulls, toAttributes(cols))
   }
 
   /**
@@ -488,6 +481,23 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
     df.queryExecution.analyzed.output
   }
 
+  private def drop0(how: String, cols: Seq[Attribute]): DataFrame = {
+    how.toLowerCase(Locale.ROOT) match {
+      case "any" => drop0(cols.size, cols)
+      case "all" => drop0(1, cols)
+      case _ => throw new IllegalArgumentException(s"how ($how) must be 'any' or 'all'")
+    }
+  }
+
+  private def drop0(minNonNulls: Int, cols: Seq[Attribute]): DataFrame = {
+    // Filtering condition:
+    // only keep the row if it has at least `minNonNulls` non-null and non-NaN values.
+    val predicate = AtLeastNNonNulls(
+      minNonNulls,
+      outputAttributes.filter{ col => cols.exists(_.semanticEquals(col)) })
+    df.filter(Column(predicate))
+  }
+
   /**
    * Returns a new `DataFrame` that replaces null or NaN values in the specified
    * columns. If a specified column is not a numeric, string or boolean column,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
index c1abd1e..1587c99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
@@ -240,13 +240,14 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("fill with col(*)") {
+  test("fill/drop with col(*)") {
     val df = createDF()
     // If columns are specified with "*", they are ignored.
     checkAnswer(df.na.fill("new name", Seq("*")), df.collect())
+    checkAnswer(df.na.drop("any", Seq("*")), df.collect())
   }
 
-  test("fill with nested columns") {
+  test("fill/drop with nested columns") {
     val schema = new StructType()
       .add("c1", new StructType()
         .add("c1-1", StringType)
@@ -263,8 +264,9 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext {
     checkAnswer(df.select("c1.c1-1"),
       Row(null) :: Row("b1") :: Row(null) :: Nil)
 
-    // Nested columns are ignored for fill().
+    // Nested columns are ignored for fill() and drop().
     checkAnswer(df.na.fill("a1", Seq("c1.c1-1")), data)
+    checkAnswer(df.na.drop("any", Seq("c1.c1-1")), data)
   }
 
   test("replace") {
@@ -394,4 +396,21 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext {
       df.na.fill("hello"),
       Row("1", "hello", "2") :: Row("3", "4", "hello") :: Nil)
   }
+
+  test("SPARK-30065: duplicate names are allowed for drop() if column names are not specified.") {
+    val left = Seq(("1", null), ("3", "4"), ("5", "6")).toDF("col1", "col2")
+    val right = Seq(("1", "2"), ("3", null), ("5", "6")).toDF("col1", "col2")
+    val df = left.join(right, Seq("col1"))
+
+    // If column names are specified, the following fails due to ambiguity.
+    val exception = intercept[AnalysisException] {
+      df.na.drop("any", Seq("col2"))
+    }
+    assert(exception.getMessage.contains("Reference 'col2' is ambiguous"))
+
+    // If column names are not specified, drop() is applied to all the eligible rows.
+    checkAnswer(
+      df.na.drop("any"),
+      Row("5", "6", "6") :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org