You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2017/10/13 05:11:00 UTC

[jira] [Updated] (SPARK-21165) Fail to write into partitioned hive table due to attribute reference not working with cast on partition column

     [ https://issues.apache.org/jira/browse/SPARK-21165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan updated SPARK-21165:
--------------------------------
    Fix Version/s: 2.3.0

> Fail to write into partitioned hive table due to attribute reference not working with cast on partition column
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21165
>                 URL: https://issues.apache.org/jira/browse/SPARK-21165
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Imran Rashid
>            Assignee: Xiao Li
>            Priority: Blocker
>             Fix For: 2.2.0, 2.3.0
>
>
> A simple "insert into ... select" involving partitioned hive tables fails.  Here's a simpler repro which doesn't involve hive at all -- this succeeds on 2.1.1, but fails on 2.2.0-rc5:
> {noformat}
> spark.sql("""SET hive.exec.dynamic.partition.mode=nonstrict""")
> spark.sql("""DROP TABLE IF EXISTS src""")
> spark.sql("""DROP TABLE IF EXISTS dest""")
> spark.sql("""
> CREATE TABLE src (first string, word string)
>   PARTITIONED BY (length int)
> """)
> spark.sql("""
> INSERT INTO src PARTITION(length) VALUES
>   ('a', 'abc', 3),
>   ('b', 'bcde', 4),
>   ('c', 'cdefg', 5)
> """)
> spark.sql("""
>   CREATE TABLE dest (word string, length int)
>     PARTITIONED BY (first string)
> """)
> spark.sql("""
>   INSERT INTO TABLE dest PARTITION(first) SELECT word, length, cast(first as string) as first FROM src
> """)
> {noformat}
> The exception is
> {noformat}
> 17/06/21 14:25:53 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 10, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute
> , tree: first#74
>         at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>         at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:49)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:43)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:884)
>         at org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:363)
>         at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:63)
>         at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:102)
>         at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Couldn't find first#74 in [word#76,length#77,first#75]
>         at scala.sys.package$.error(package.scala:27)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
>         at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>         ... 40 more
> {noformat}
> The key to making this fail is the {{cast(first as string) as first}}.  Doing the same thing on any other column doesn't matter, it only matters on {{first}} (which is the partition column for the destination table).
> Here's the explain plan from 2.2.0:
> {noformat}
> == Parsed Logical Plan ==
> 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
> +- 'Project ['word, 'length, cast('first as string) AS first#85]
>    +- 'UnresolvedRelation `src`
> == Analyzed Logical Plan ==
> InsertIntoHiveTable CatalogTable(
> Database: default
> Table: dest
> Owner: irashid
> Created: Wed Jun 21 14:25:13 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`first`]
> Schema: root
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> -- first: string (nullable = true)
> ), Map(first -> None), false, false
>    +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
>       +- SubqueryAlias src
>          +- CatalogRelation CatalogTable(
> Database: default
> Table: src
> Owner: irashid
> Created: Wed Jun 21 14:25:11 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Statistics: 9223372036854775807 bytes
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`length`]
> Schema: root
> -- first: string (nullable = true)
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> ), [first#88, word#89], [length#90]
> == Optimized Logical Plan ==                                                                                                                                                    [68/2430]
> InsertIntoHiveTable CatalogTable(
> Database: default
> Table: dest
> Owner: irashid
> Created: Wed Jun 21 14:25:13 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`first`]
> Schema: root
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> -- first: string (nullable = true)
> ), Map(first -> None), false, false
>    +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
>       +- SubqueryAlias src
>          +- CatalogRelation CatalogTable(
> Database: default
> Table: src
> Owner: irashid
> Created: Wed Jun 21 14:25:11 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Statistics: 9223372036854775807 bytes
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`length`]
> Schema: root
> -- first: string (nullable = true)
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> ), [first#88, word#89], [length#90]
> == Physical Plan ==
> ExecutedCommand
>    +- InsertIntoHiveTable CatalogTable(
> Database: default
> Table: dest
> Owner: irashid
> Created: Wed Jun 21 14:25:13 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`first`]
> Schema: root
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> -- first: string (nullable = true)
> ), Map(first -> None), false, false
>          +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
>             +- SubqueryAlias src
>                +- CatalogRelation CatalogTable(
> Database: default
> Table: src
> Owner: irashid
> Created: Wed Jun 21 14:25:11 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Statistics: 9223372036854775807 bytes
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`length`]
> Schema: root
> -- first: string (nullable = true)
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> ), [first#88, word#89], [length#90]|
> {noformat}
> And from 2.1.1:
> {noformat}
> == Parsed Logical Plan ==
> 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), OverwriteOptions(false,Map()), false
> +- 'Project ['word, 'length, cast('first as string) AS first#55]
>    +- 'UnresolvedRelation `src`
> == Analyzed Logical Plan ==
> InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false
> +- Project [word#60, length#58, cast(first#59 as string) AS first#55]
>    +- MetastoreRelation default, src
> == Optimized Logical Plan ==
> InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false
> +- Project [word#60, length#58, first#59]
>    +- MetastoreRelation default, src
> == Physical Plan ==
> InsertIntoHiveTable MetastoreRelation default, dest, Map(first -> None), false, false
> +- HiveTableScan [word#60, length#58, first#59], MetastoreRelation default, src
> {noformat}
> While this example query is somewhat contrived, this is really iimportant because if you try to do the same thing where {{src}} was created by hive, then the query fails with the same error.  In that case, the explain plan looks like:
> {noformat}
> == Parsed Logical Plan ==
> 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
> +- 'Project ['word, 'length, 'first]
>    +- 'UnresolvedRelation `src`
> == Analyzed Logical Plan ==
> InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
>    +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
>       +- Project [word#49, length#50, first#48]
>          +- SubqueryAlias src
>             +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
> == Optimized Logical Plan ==
> InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
>    +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
>       +- Project [word#49, length#50, first#48]
>          +- SubqueryAlias src
>             +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
> == Physical Plan ==
> ExecutedCommand
>    +- InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
>          +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
>             +- Project [word#49, length#50, first#48]
>                +- SubqueryAlias src
>                   +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org