You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/27 06:33:34 UTC
[spark] branch branch-3.0 updated: [SPARK-30590][SQL] Untyped
select API cannot take typed column expression that needs input type
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c3f4946 [SPARK-30590][SQL] Untyped select API cannot take typed column expression that needs input type
c3f4946 is described below
commit c3f494627b65e47c1cf7b7873fa28322a4cd0afa
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Thu Feb 27 14:09:07 2020 +0800
[SPARK-30590][SQL] Untyped select API cannot take typed column expression that needs input type
### What changes were proposed in this pull request?
This patch proposes to throw clear analysis exception if untyped `Dataset.select` takes typed column expression that needs input type.
### Why are the changes needed?
`Dataset` provides few typed `select` helper functions to select typed column expressions. The maximum number of typed columns supported is 5. If wanting to select more than 5 typed columns, it silently calls untyped `Dataset.select` and can causes weird unresolved error, like:
```
org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, N [...]
'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull( [...]
+- Project [_1#6 AS a#13, _2#7 AS b#14, _3#8 AS c#15, _4#9 AS d#16, _5#10 AS e#17, _6#11 AS F#18]
+- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
```
However, to fully disallow typed columns as input to untyped `select` API will break current usage like `count` that is a `TypedColumn` in `functions`. In order to keep compatibility, we should allow current usage of certain `TypedColumn`s as input to untyped `select` API. For the `TypedColumn`s that will cause unresolved exception, we should explicitly let users know that they are incorrectly calling untyped `select` with typed columns which need input type.
### Does this PR introduce any user-facing change?
Yes, but this PR only refines the error message.
When users call `Dataset.select` API with typed column that needs input type, an analysis exception will be thrown. Previously an unresolved error will be thrown.
### How was this patch tested?
Unit tests.
Closes #27499 from viirya/SPARK-30590.
Lead-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Co-authored-by: Liang-Chi Hsieh <li...@uber.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 160c144baa82b7f76301b718694187cf939ca465)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 21 ++++++++++++++++++-
.../apache/spark/sql/DatasetAggregatorSuite.scala | 24 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 42f3535..d85e23b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -48,6 +48,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -1432,7 +1433,25 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def select(cols: Column*): DataFrame = withPlan {
- Project(cols.map(_.named), logicalPlan)
+ val untypedCols = cols.map {
+ case typedCol: TypedColumn[_, _] =>
+ // Checks if a `TypedColumn` has been inserted with
+ // specific input type and schema by `withInputType`.
+ val needInputType = typedCol.expr.find {
+ case ta: TypedAggregateExpression if ta.inputDeserializer.isEmpty => true
+ case _ => false
+ }.isDefined
+
+ if (!needInputType) {
+ typedCol
+ } else {
+ throw new AnalysisException(s"Typed column $typedCol that needs input type and schema " +
+ "cannot be passed in untyped `select` API. Use the typed `Dataset.select` API instead.")
+ }
+
+ case other => other
+ }
+ Project(untypedCols.map(_.named), logicalPlan)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 6ffe133..a22abd5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -219,6 +219,15 @@ case class OptionBooleanIntAggregator(colName: String)
def OptionalBoolIntEncoder: Encoder[Option[(Boolean, Int)]] = ExpressionEncoder()
}
+case class FooAgg(s: Int) extends Aggregator[Row, Int, Int] {
+ def zero: Int = s
+ def reduce(b: Int, r: Row): Int = b + r.getAs[Int](0)
+ def merge(b1: Int, b2: Int): Int = b1 + b2
+ def finish(b: Int): Int = b
+ def bufferEncoder: Encoder[Int] = Encoders.scalaInt
+ def outputEncoder: Encoder[Int] = Encoders.scalaInt
+}
+
class DatasetAggregatorSuite extends QueryTest with SharedSparkSession {
import testImplicits._
@@ -394,4 +403,19 @@ class DatasetAggregatorSuite extends QueryTest with SharedSparkSession {
checkAnswer(group, Row("bob", Row(true, 3)) :: Nil)
checkDataset(group.as[OptionBooleanIntData], OptionBooleanIntData("bob", Some((true, 3))))
}
+
+ test("SPARK-30590: untyped select should not accept typed column that needs input type") {
+ val df = Seq((1, 2, 3, 4, 5, 6)).toDF("a", "b", "c", "d", "e", "f")
+ val fooAgg = (i: Int) => FooAgg(i).toColumn.name(s"foo_agg_$i")
+
+ val agg1 = df.select(fooAgg(1), fooAgg(2), fooAgg(3), fooAgg(4), fooAgg(5))
+ checkDataset(agg1, (3, 5, 7, 9, 11))
+
+ // Passes typed columns to untyped `Dataset.select` API.
+ val err = intercept[AnalysisException] {
+ df.select(fooAgg(1), fooAgg(2), fooAgg(3), fooAgg(4), fooAgg(5), fooAgg(6))
+ }.getMessage
+ assert(err.contains("cannot be passed in untyped `select` API. " +
+ "Use the typed `Dataset.select` API instead."))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org