You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/27 06:33:34 UTC

[spark] branch branch-3.0 updated: [SPARK-30590][SQL] Untyped select API cannot take typed column expression that needs input type

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c3f4946  [SPARK-30590][SQL] Untyped select API cannot take typed column expression that needs input type
c3f4946 is described below

commit c3f494627b65e47c1cf7b7873fa28322a4cd0afa
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Thu Feb 27 14:09:07 2020 +0800

    [SPARK-30590][SQL] Untyped select API cannot take typed column expression that needs input type
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to throw clear analysis exception if untyped `Dataset.select` takes typed column expression that needs input type.
    
    ### Why are the changes needed?
    
    `Dataset` provides few typed `select` helper functions to select typed column expressions. The maximum number of typed columns supported is 5. If wanting to select more than 5 typed columns, it silently calls untyped `Dataset.select` and can causes weird unresolved error, like:
    
    ```
    org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, N [...]
    'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull( [...]
    +- Project [_1#6 AS a#13, _2#7 AS b#14, _3#8 AS c#15, _4#9 AS d#16, _5#10 AS e#17, _6#11 AS F#18]
     +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]
    
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
     at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
    ```
    
    However, to fully disallow typed columns as input to untyped `select` API will break current usage like `count` that is a `TypedColumn` in `functions`. In order to keep compatibility, we should allow current usage of certain `TypedColumn`s as input to untyped `select` API. For the `TypedColumn`s that will cause unresolved exception, we should explicitly let users know that they are incorrectly calling untyped `select` with typed columns which need input type.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, but this PR only refines the error message.
    
    When users call `Dataset.select` API with typed column that needs input type, an analysis exception will be thrown. Previously an unresolved error will be thrown.
    
    ### How was this patch tested?
    
    Unit tests.
    
    Closes #27499 from viirya/SPARK-30590.
    
    Lead-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Co-authored-by: Liang-Chi Hsieh <li...@uber.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 160c144baa82b7f76301b718694187cf939ca465)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 21 ++++++++++++++++++-
 .../apache/spark/sql/DatasetAggregatorSuite.scala  | 24 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 42f3535..d85e23b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -48,6 +48,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.catalyst.util.IntervalUtils
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
 import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -1432,7 +1433,25 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def select(cols: Column*): DataFrame = withPlan {
-    Project(cols.map(_.named), logicalPlan)
+    val untypedCols = cols.map {
+      case typedCol: TypedColumn[_, _] =>
+        // Checks if a `TypedColumn` has been inserted with
+        // specific input type and schema by `withInputType`.
+        val needInputType = typedCol.expr.find {
+          case ta: TypedAggregateExpression if ta.inputDeserializer.isEmpty => true
+          case _ => false
+        }.isDefined
+
+        if (!needInputType) {
+          typedCol
+        } else {
+          throw new AnalysisException(s"Typed column $typedCol that needs input type and schema " +
+            "cannot be passed in untyped `select` API. Use the typed `Dataset.select` API instead.")
+        }
+
+      case other => other
+    }
+    Project(untypedCols.map(_.named), logicalPlan)
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 6ffe133..a22abd5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -219,6 +219,15 @@ case class OptionBooleanIntAggregator(colName: String)
   def OptionalBoolIntEncoder: Encoder[Option[(Boolean, Int)]] = ExpressionEncoder()
 }
 
+case class FooAgg(s: Int) extends Aggregator[Row, Int, Int] {
+  def zero: Int = s
+  def reduce(b: Int, r: Row): Int = b + r.getAs[Int](0)
+  def merge(b1: Int, b2: Int): Int = b1 + b2
+  def finish(b: Int): Int = b
+  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
+  def outputEncoder: Encoder[Int] = Encoders.scalaInt
+}
+
 class DatasetAggregatorSuite extends QueryTest with SharedSparkSession {
   import testImplicits._
 
@@ -394,4 +403,19 @@ class DatasetAggregatorSuite extends QueryTest with SharedSparkSession {
     checkAnswer(group, Row("bob", Row(true, 3)) :: Nil)
     checkDataset(group.as[OptionBooleanIntData], OptionBooleanIntData("bob", Some((true, 3))))
   }
+
+  test("SPARK-30590: untyped select should not accept typed column that needs input type") {
+    val df = Seq((1, 2, 3, 4, 5, 6)).toDF("a", "b", "c", "d", "e", "f")
+    val fooAgg = (i: Int) => FooAgg(i).toColumn.name(s"foo_agg_$i")
+
+    val agg1 = df.select(fooAgg(1), fooAgg(2), fooAgg(3), fooAgg(4), fooAgg(5))
+    checkDataset(agg1, (3, 5, 7, 9, 11))
+
+    // Passes typed columns to untyped `Dataset.select` API.
+    val err = intercept[AnalysisException] {
+      df.select(fooAgg(1), fooAgg(2), fooAgg(3), fooAgg(4), fooAgg(5), fooAgg(6))
+    }.getMessage
+    assert(err.contains("cannot be passed in untyped `select` API. " +
+      "Use the typed `Dataset.select` API instead."))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org