You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Eyal Farago (JIRA)" <ji...@apache.org> on 2018/04/26 07:20:00 UTC

[jira] [Commented] (SPARK-21109) union two dataset[A] don't work as expected if one of the datasets is originated from a dataframe

    [ https://issues.apache.org/jira/browse/SPARK-21109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453625#comment-16453625 ] 

Eyal Farago commented on SPARK-21109:
-------------------------------------

I've also encountered this issue in two separate ways,

in one of them I got a cast exception similar to the one [~superwai] encountered, however in a slightly different one I got wrong results:


{code:java}
case class DataV1(a: Int)
case class DataV2(a: Int, b: Int)

test("weired spark bug"){
  import sqlContext.implicits._
  val df1 = List(DataV1(1)).toDS().toDF()
  val df2 = List(DataV2(2, 3)).toDS().toDF()

  val df3 = df1
    .withColumn("commitUUID", functions lit 1)
    .withColumn("b", functions lit 42)

  val df4 = df2
    .withColumn("commitUUID", functions lit 2)
    .withColumn("b", functions.coalesce($"b", functions lit 42))

  val u = df3 union df4
  u.as[DataV2].show
}
and the output:
+---+----------+---+
|  a|commitUUID|  b|
+---+----------+---+
|  1|         1| 42|
|  2|         3|  2|
+---+----------+---+
{code}

you can see that in the second line value 2 was assigned to column 'b' instead of 'commitUUID' which was assigned with b's value (3).
slightly modifying this example and making the commitUUID field a String results with an exception similar to the one [~superwai] reported:


{code:java}
Cannot up cast `b` from string to int as it may truncate
The type path of the target object is:
- field (class: "scala.Int", name: "b")
- root class: "com.nrgene.genomagic.pipeline.spark.DataV2"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
org.apache.spark.sql.AnalysisException: Cannot up cast `b` from string to int as it may truncate
The type path of the target object is:
- field (class: "scala.Int", name: "b")
- root class: "com.nrgene.genomagic.pipeline.spark.DataV2"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
{code}

[~viirya], I've seen [~rxin] comment on SPARK-21043 and I read your reasoning in the comments above, and I must says this looks like 'justifying' the original implementation detail. it simply wasn't handled properly when the union method was first introduced, then some people wrote code that relied on the 'by index' mapping and by then it could no longer be modified...

I think you can see why it doesn't make much sense from the user's perspective, since as far as she cares both sides of the union are of the same type, the fact union can't see it is a bug in its implementation which is now deemed as a feature.

If we put aside the backward compatibility issue, then fixing this is as simple as adding an analyzer rules that introduces a project if needed, an alternative might be to reject this use case during analysis. the way I see it current state of affairs is greatly misleading.

[~superwai], one way of dealing with this situation (aside from upgrading spark and using the newly introduced unionByName) is to extract the fields order of one side and 'force' it on the other:


{code:java}
//make sure to align the schema
val right2 = right.select(left.columns.map(functions.col _) : _*)
left union right2
{code}



> union two dataset[A] don't work as expected if one of the datasets is originated from a dataframe
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21109
>                 URL: https://issues.apache.org/jira/browse/SPARK-21109
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.1
>            Reporter: Jerry Lam
>            Priority: Major
>
> To reproduce the issue:
> {code}
> case class my_case(id0: Long, id1: Int, id2: Int, id3: String)
> val data1 = Seq(my_case(0L, 0, 0, "0")).toDS
> val data2 = Seq(("1", 1, 1, 1L)).toDF("id3", "id1", "id2", "id0").as[my_case]
> data1.show
> +---+---+---+---+
> |id0|id1|id2|id3|
> +---+---+---+---+
> |  0|  0|  0|  0|
> +---+---+---+---+
> data2.show
> +---+---+---+---+
> |id3|id1|id2|id0|
> +---+---+---+---+
> |  1|  1|  1|  1|
> +---+---+---+---+
> data1.union(data2).show
> org.apache.spark.sql.AnalysisException: Cannot up cast `id0` from string to bigint as it may truncate
> The type path of the target object is:
> - field (class: "scala.Long", name: "id0")
> - root class: "my_case"
> You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2123)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2153)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2140)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:336)
>   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:334)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:245)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:245)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:266)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:276)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:285)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:285)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:245)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:236)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2140)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2136)
>   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
>   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2136)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2121)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>   at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
>   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:258)
>   at org.apache.spark.sql.Dataset.<init>(Dataset.scala:209)
>   at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
>   at org.apache.spark.sql.Dataset$.apply(Dataset.scala:58)
>   at org.apache.spark.sql.Dataset.withSetOperator(Dataset.scala:2859)
>   at org.apache.spark.sql.Dataset.union(Dataset.scala:1632)
> {code}
> Note that both data1 and data2 are the same type Dataset[my_case]
> A hacky way to fix the above is:
> {code}
> data1.union(data2.map{a=>a}).show
> +---+---+---+---+
> |id0|id1|id2|id3|
> +---+---+---+---+
> |  0|  0|  0|  0|
> |  1|  1|  1|  1|
> +---+---+---+---+
> {code}
> This bug is very obscure if you are implementing an interface with 2 input arguments of Dataset[A]. If you need to union two datasets for the implementation, some datasets will work and some don't. Or some will work but WRONG.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org