You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Pen Xiao (JIRA)" <ji...@apache.org> on 2017/06/05 08:56:04 UTC

[jira] [Updated] (SPARK-20983) AnalysisException when joining data frames derived from the same duplicates-dropped one.

     [ https://issues.apache.org/jira/browse/SPARK-20983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Pen Xiao updated SPARK-20983:
-----------------------------
    Description: 
reproduce steps:
``` scala
val df = sc.parallelize(Array((1,1,1),(1,1,2),(1,2,1))).toDF("c1","c2","c3").dropDuplicates("c1", "c2")

val df1 = df.select("c1", "c2")

val df2 = df.withColumn("c4", udf((c3:Int)=>c3 + 1).apply($"c3")).select("c1", "c2", "c4")

df1.join(df2, Array("c1", "c2")).show

get Error as following:
org.apache.spark.sql.AnalysisException: resolved attribute(s) c3#197 missing from c1#195,c2#196,c3#175 in operator !Project [c1#195, c2#196, c3#197];;
Project [c1#173, c2#174, c4#185]
+- Join Inner, ((c1#173 = c1#195) && (c2#174 = c2#196))
   :- Project [c1#173, c2#174]
   :  +- Project [c1#173, c2#174, c3#175]
   :     +- Aggregate [c1#173, c2#174], [c1#173, c2#174, first(c3#175, false) AS c3#175]
   :        +- Project [c1#173, c2#174, c3#175]
   :           +- Project [_1#169 AS c1#173, _2#170 AS c2#174, _3#171 AS c3#175]
   :              +- SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._1 AS _1#169, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._2 AS _2#170, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._3 AS _3#171]
   :                 +- ExternalRDD [obj#168]
   +- Project [c1#195, c2#196, c4#185]
      +- Project [c1#195, c2#196, c3#197, if (isnull(c3#197)) null else if (isnull(c3#197)) null else if (isnull(c3#197)) null else UDF(c3#197) AS c4#185]
         +- !Project [c1#195, c2#196, c3#197]
            +- Aggregate [c1#195, c2#196], [c1#195, c2#196, first(c3#197, false) AS c3#175]
               +- Project [c1#195, c2#196, c3#197]
                  +- Project [_1#169 AS c1#195, _2#170 AS c2#196, _3#171 AS c3#197]
                     +- SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._1 AS _1#169, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._2 AS _2#170, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._3 AS _3#171]
                        +- ExternalRDD [obj#168]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2822)
  at org.apache.spark.sql.Dataset.join(Dataset.scala:775)
  at org.apache.spark.sql.Dataset.join(Dataset.scala:748)
  ... 48 elided

What is strange is that the code works fine as following:

val df = sc.parallelize(Array((1,1,1),(1,1,2),(1,2,1))).toDF("c1","c2","c3")

val df1 = df.select("c1", "c2")

val df2 = df.withColumn("c4", udf((c3:Int)=>c3 + 1).apply($"c3")).select("c1", "c2", "c4")

df1.join(df2, Array("c1", "c2")).show


  was:
reproduce steps:
val df = sc.parallelize(Array((1,1,1),(1,1,2),(1,2,1))).toDF("c1","c2","c3").dropDuplicates("c1", "c2")

val df1 = df.select("c1", "c2")

val df2 = df.withColumn("c4", udf((c3:Int)=>c3 + 1).apply($"c3")).select("c1", "c2", "c4")

df1.join(df2, Array("c1", "c2")).show

get Error as following:
org.apache.spark.sql.AnalysisException: resolved attribute(s) c3#197 missing from c1#195,c2#196,c3#175 in operator !Project [c1#195, c2#196, c3#197];;
Project [c1#173, c2#174, c4#185]
+- Join Inner, ((c1#173 = c1#195) && (c2#174 = c2#196))
   :- Project [c1#173, c2#174]
   :  +- Project [c1#173, c2#174, c3#175]
   :     +- Aggregate [c1#173, c2#174], [c1#173, c2#174, first(c3#175, false) AS c3#175]
   :        +- Project [c1#173, c2#174, c3#175]
   :           +- Project [_1#169 AS c1#173, _2#170 AS c2#174, _3#171 AS c3#175]
   :              +- SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._1 AS _1#169, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._2 AS _2#170, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._3 AS _3#171]
   :                 +- ExternalRDD [obj#168]
   +- Project [c1#195, c2#196, c4#185]
      +- Project [c1#195, c2#196, c3#197, if (isnull(c3#197)) null else if (isnull(c3#197)) null else if (isnull(c3#197)) null else UDF(c3#197) AS c4#185]
         +- !Project [c1#195, c2#196, c3#197]
            +- Aggregate [c1#195, c2#196], [c1#195, c2#196, first(c3#197, false) AS c3#175]
               +- Project [c1#195, c2#196, c3#197]
                  +- Project [_1#169 AS c1#195, _2#170 AS c2#196, _3#171 AS c3#197]
                     +- SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._1 AS _1#169, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._2 AS _2#170, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._3 AS _3#171]
                        +- ExternalRDD [obj#168]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2822)
  at org.apache.spark.sql.Dataset.join(Dataset.scala:775)
  at org.apache.spark.sql.Dataset.join(Dataset.scala:748)
  ... 48 elided

What is strange is that the code works fine as following:

val df = sc.parallelize(Array((1,1,1),(1,1,2),(1,2,1))).toDF("c1","c2","c3")

val df1 = df.select("c1", "c2")

val df2 = df.withColumn("c4", udf((c3:Int)=>c3 + 1).apply($"c3")).select("c1", "c2", "c4")

df1.join(df2, Array("c1", "c2")).show



> AnalysisException when joining data frames derived from the same duplicates-dropped one.
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-20983
>                 URL: https://issues.apache.org/jira/browse/SPARK-20983
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: Pen Xiao
>
> reproduce steps:
> ``` scala
> val df = sc.parallelize(Array((1,1,1),(1,1,2),(1,2,1))).toDF("c1","c2","c3").dropDuplicates("c1", "c2")
> val df1 = df.select("c1", "c2")
> val df2 = df.withColumn("c4", udf((c3:Int)=>c3 + 1).apply($"c3")).select("c1", "c2", "c4")
> df1.join(df2, Array("c1", "c2")).show
> get Error as following:
> org.apache.spark.sql.AnalysisException: resolved attribute(s) c3#197 missing from c1#195,c2#196,c3#175 in operator !Project [c1#195, c2#196, c3#197];;
> Project [c1#173, c2#174, c4#185]
> +- Join Inner, ((c1#173 = c1#195) && (c2#174 = c2#196))
>    :- Project [c1#173, c2#174]
>    :  +- Project [c1#173, c2#174, c3#175]
>    :     +- Aggregate [c1#173, c2#174], [c1#173, c2#174, first(c3#175, false) AS c3#175]
>    :        +- Project [c1#173, c2#174, c3#175]
>    :           +- Project [_1#169 AS c1#173, _2#170 AS c2#174, _3#171 AS c3#175]
>    :              +- SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._1 AS _1#169, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._2 AS _2#170, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._3 AS _3#171]
>    :                 +- ExternalRDD [obj#168]
>    +- Project [c1#195, c2#196, c4#185]
>       +- Project [c1#195, c2#196, c3#197, if (isnull(c3#197)) null else if (isnull(c3#197)) null else if (isnull(c3#197)) null else UDF(c3#197) AS c4#185]
>          +- !Project [c1#195, c2#196, c3#197]
>             +- Aggregate [c1#195, c2#196], [c1#195, c2#196, first(c3#197, false) AS c3#175]
>                +- Project [c1#195, c2#196, c3#197]
>                   +- Project [_1#169 AS c1#195, _2#170 AS c2#196, _3#171 AS c3#197]
>                      +- SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._1 AS _1#169, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._2 AS _2#170, assertnotnull(input[0, scala.Tuple3, true], top level Product input object)._3 AS _3#171]
>                         +- ExternalRDD [obj#168]
>   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
>   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
>   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
>   at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
>   at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2822)
>   at org.apache.spark.sql.Dataset.join(Dataset.scala:775)
>   at org.apache.spark.sql.Dataset.join(Dataset.scala:748)
>   ... 48 elided
> What is strange is that the code works fine as following:
> val df = sc.parallelize(Array((1,1,1),(1,1,2),(1,2,1))).toDF("c1","c2","c3")
> val df1 = df.select("c1", "c2")
> val df2 = df.withColumn("c4", udf((c3:Int)=>c3 + 1).apply($"c3")).select("c1", "c2", "c4")
> df1.join(df2, Array("c1", "c2")).show



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org