You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2017/10/12 14:10:00 UTC
[jira] [Commented] (SPARK-21165) Fail to write into partitioned
hive table due to attribute reference not working with cast on partition
column
[ https://issues.apache.org/jira/browse/SPARK-21165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16201988#comment-16201988 ]
Apache Spark commented on SPARK-21165:
--------------------------------------
User 'cloud-fan' has created a pull request for this issue:
https://github.com/apache/spark/pull/19483
> Fail to write into partitioned hive table due to attribute reference not working with cast on partition column
> --------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-21165
> URL: https://issues.apache.org/jira/browse/SPARK-21165
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.2.0
> Reporter: Imran Rashid
> Assignee: Xiao Li
> Priority: Blocker
> Fix For: 2.2.0
>
>
> A simple "insert into ... select" involving partitioned hive tables fails. Here's a simpler repro which doesn't involve hive at all -- this succeeds on 2.1.1, but fails on 2.2.0-rc5:
> {noformat}
> spark.sql("""SET hive.exec.dynamic.partition.mode=nonstrict""")
> spark.sql("""DROP TABLE IF EXISTS src""")
> spark.sql("""DROP TABLE IF EXISTS dest""")
> spark.sql("""
> CREATE TABLE src (first string, word string)
> PARTITIONED BY (length int)
> """)
> spark.sql("""
> INSERT INTO src PARTITION(length) VALUES
> ('a', 'abc', 3),
> ('b', 'bcde', 4),
> ('c', 'cdefg', 5)
> """)
> spark.sql("""
> CREATE TABLE dest (word string, length int)
> PARTITIONED BY (first string)
> """)
> spark.sql("""
> INSERT INTO TABLE dest PARTITION(first) SELECT word, length, cast(first as string) as first FROM src
> """)
> {noformat}
> The exception is
> {noformat}
> 17/06/21 14:25:53 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 10, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute
> , tree: first#74
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:49)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:43)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:884)
> at org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:363)
> at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:63)
> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:102)
> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Couldn't find first#74 in [word#76,length#77,first#75]
> at scala.sys.package$.error(package.scala:27)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> ... 40 more
> {noformat}
> The key to making this fail is the {{cast(first as string) as first}}. Doing the same thing on any other column doesn't matter, it only matters on {{first}} (which is the partition column for the destination table).
> Here's the explain plan from 2.2.0:
> {noformat}
> == Parsed Logical Plan ==
> 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
> +- 'Project ['word, 'length, cast('first as string) AS first#85]
> +- 'UnresolvedRelation `src`
> == Analyzed Logical Plan ==
> InsertIntoHiveTable CatalogTable(
> Database: default
> Table: dest
> Owner: irashid
> Created: Wed Jun 21 14:25:13 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`first`]
> Schema: root
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> -- first: string (nullable = true)
> ), Map(first -> None), false, false
> +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
> +- SubqueryAlias src
> +- CatalogRelation CatalogTable(
> Database: default
> Table: src
> Owner: irashid
> Created: Wed Jun 21 14:25:11 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Statistics: 9223372036854775807 bytes
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`length`]
> Schema: root
> -- first: string (nullable = true)
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> ), [first#88, word#89], [length#90]
> == Optimized Logical Plan == [68/2430]
> InsertIntoHiveTable CatalogTable(
> Database: default
> Table: dest
> Owner: irashid
> Created: Wed Jun 21 14:25:13 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`first`]
> Schema: root
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> -- first: string (nullable = true)
> ), Map(first -> None), false, false
> +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
> +- SubqueryAlias src
> +- CatalogRelation CatalogTable(
> Database: default
> Table: src
> Owner: irashid
> Created: Wed Jun 21 14:25:11 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Statistics: 9223372036854775807 bytes
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`length`]
> Schema: root
> -- first: string (nullable = true)
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> ), [first#88, word#89], [length#90]
> == Physical Plan ==
> ExecutedCommand
> +- InsertIntoHiveTable CatalogTable(
> Database: default
> Table: dest
> Owner: irashid
> Created: Wed Jun 21 14:25:13 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`first`]
> Schema: root
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> -- first: string (nullable = true)
> ), Map(first -> None), false, false
> +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
> +- SubqueryAlias src
> +- CatalogRelation CatalogTable(
> Database: default
> Table: src
> Owner: irashid
> Created: Wed Jun 21 14:25:11 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Statistics: 9223372036854775807 bytes
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`length`]
> Schema: root
> -- first: string (nullable = true)
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> ), [first#88, word#89], [length#90]|
> {noformat}
> And from 2.1.1:
> {noformat}
> == Parsed Logical Plan ==
> 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), OverwriteOptions(false,Map()), false
> +- 'Project ['word, 'length, cast('first as string) AS first#55]
> +- 'UnresolvedRelation `src`
> == Analyzed Logical Plan ==
> InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false
> +- Project [word#60, length#58, cast(first#59 as string) AS first#55]
> +- MetastoreRelation default, src
> == Optimized Logical Plan ==
> InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false
> +- Project [word#60, length#58, first#59]
> +- MetastoreRelation default, src
> == Physical Plan ==
> InsertIntoHiveTable MetastoreRelation default, dest, Map(first -> None), false, false
> +- HiveTableScan [word#60, length#58, first#59], MetastoreRelation default, src
> {noformat}
> While this example query is somewhat contrived, this is really iimportant because if you try to do the same thing where {{src}} was created by hive, then the query fails with the same error. In that case, the explain plan looks like:
> {noformat}
> == Parsed Logical Plan ==
> 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
> +- 'Project ['word, 'length, 'first]
> +- 'UnresolvedRelation `src`
> == Analyzed Logical Plan ==
> InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
> +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
> +- Project [word#49, length#50, first#48]
> +- SubqueryAlias src
> +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
> == Optimized Logical Plan ==
> InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
> +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
> +- Project [word#49, length#50, first#48]
> +- SubqueryAlias src
> +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
> == Physical Plan ==
> ExecutedCommand
> +- InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
> +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
> +- Project [word#49, length#50, first#48]
> +- SubqueryAlias src
> +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org