You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Liang-Chi Hsieh (JIRA)" <ji...@apache.org> on 2017/07/04 01:28:00 UTC

[jira] [Commented] (SPARK-21109) union two dataset[A] don't work as expected if one of the datasets is originated from a dataframe

    [ https://issues.apache.org/jira/browse/SPARK-21109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073005#comment-16073005 ] 

Liang-Chi Hsieh commented on SPARK-21109:
-----------------------------------------

They have the same schema? Let's print schema on the two datasets:

{code}
scala> case class my_case(id0: Long, id1: Int, id2: Int, id3: String)
defined class my_case

scala> val data1 = Seq(my_case(0L, 0, 0, "0")).toDS
data1: org.apache.spark.sql.Dataset[my_case] = [id0: bigint, id1: int ... 2 more fields]

scala> val data2 = Seq(("1", 1, 1, 1L)).toDF("id3", "id1", "id2", "id0").as[my_case]
data2: org.apache.spark.sql.Dataset[my_case] = [id3: string, id1: int ... 2 more fields]

scala> data1.printSchema
root
 |-- id0: long (nullable = false)
 |-- id1: integer (nullable = false)
 |-- id2: integer (nullable = false)
 |-- id3: string (nullable = true)


scala> data2.printSchema
root
 |-- id3: string (nullable = true)
 |-- id1: integer (nullable = false)
 |-- id2: integer (nullable = false)
 |-- id0: long (nullable = false)
{code}

It's clear that they have different schema.

{{Dataset.union}} clearly states that it resolves column by position.

{{data1.union(data2.map{a=>a}).show}} enforces re-encode the data into my_case before union.

{code}
scala> val data3 = data2.map{a=>a}
data3: org.apache.spark.sql.Dataset[my_case] = [id0: bigint, id1: int ... 2 more fields]

scala> data3.printSchema
root
 |-- id0: long (nullable = false)
 |-- id1: integer (nullable = false)
 |-- id2: integer (nullable = false)
 |-- id3: string (nullable = true)
{code}

So it makes the schema as the same as {{data1}}. That makes the union work.

There's clearly defined semantics on {{union}}. I think it works accordingly.

The possibly problematic issue in above is, should we allow encoder works like the case of {{data2}} that is the order of arguments is different to the case class. For now I tend to think it's ok and it's more flexible. Btw, for such above case, I'd think of it's intended by users to create the dataset with different schema.


> union two dataset[A] don't work as expected if one of the datasets is originated from a dataframe
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21109
>                 URL: https://issues.apache.org/jira/browse/SPARK-21109
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.1
>            Reporter: Jerry Lam
>
> To reproduce the issue:
> {code}
> case class my_case(id0: Long, id1: Int, id2: Int, id3: String)
> val data1 = Seq(my_case(0L, 0, 0, "0")).toDS
> val data2 = Seq(("1", 1, 1, 1L)).toDF("id3", "id1", "id2", "id0").as[my_case]
> data1.show
> +---+---+---+---+
> |id0|id1|id2|id3|
> +---+---+---+---+
> |  0|  0|  0|  0|
> +---+---+---+---+
> data2.show
> +---+---+---+---+
> |id3|id1|id2|id0|
> +---+---+---+---+
> |  1|  1|  1|  1|
> +---+---+---+---+
> data1.union(data2).show
> org.apache.spark.sql.AnalysisException: Cannot up cast `id0` from string to bigint as it may truncate
> The type path of the target object is:
> - field (class: "scala.Long", name: "id0")
> - root class: "my_case"
> You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2123)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2153)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2140)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:336)
>   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:334)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:245)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:245)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:266)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:276)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:285)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:285)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:245)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:236)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2140)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2136)
>   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
>   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2136)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2121)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>   at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
>   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:258)
>   at org.apache.spark.sql.Dataset.<init>(Dataset.scala:209)
>   at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
>   at org.apache.spark.sql.Dataset$.apply(Dataset.scala:58)
>   at org.apache.spark.sql.Dataset.withSetOperator(Dataset.scala:2859)
>   at org.apache.spark.sql.Dataset.union(Dataset.scala:1632)
> {code}
> Note that both data1 and data2 are the same type Dataset[my_case]
> A hacky way to fix the above is:
> {code}
> data1.union(data2.map{a=>a}).show
> +---+---+---+---+
> |id0|id1|id2|id3|
> +---+---+---+---+
> |  0|  0|  0|  0|
> |  1|  1|  1|  1|
> +---+---+---+---+
> {code}
> This bug is very obscure if you are implementing an interface with 2 input arguments of Dataset[A]. If you need to union two datasets for the implementation, some datasets will work and some don't. Or some will work but WRONG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org