You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Imran Rashid (JIRA)" <ji...@apache.org> on 2017/06/21 19:46:01 UTC

[jira] [Created] (SPARK-21165) Fail to write into partitioned hive table due to attribute reference not working with cast on partition column

Imran Rashid created SPARK-21165:
------------------------------------

             Summary: Fail to write into partitioned hive table due to attribute reference not working with cast on partition column
                 Key: SPARK-21165
                 URL: https://issues.apache.org/jira/browse/SPARK-21165
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.2.0
            Reporter: Imran Rashid
            Priority: Blocker


A simple "insert into ... select" involving partitioned hive tables fails.  Here's a simpler repro which doesn't involve hive at all -- this succeeds on 2.1.1, but fails on 2.2.0-rc5:

{noformat}
spark.sql("""SET hive.exec.dynamic.partition.mode=nonstrict""")
spark.sql("""DROP TABLE IF EXISTS src""")
spark.sql("""DROP TABLE IF EXISTS dest""")
spark.sql("""
CREATE TABLE src (first string, word string)
  PARTITIONED BY (length int)
""")

spark.sql("""
INSERT INTO src PARTITION(length) VALUES
  ('a', 'abc', 3),
  ('b', 'bcde', 4),
  ('c', 'cdefg', 5)
""")

spark.sql("""
  CREATE TABLE dest (word string, length int)
    PARTITIONED BY (first string)
""")

spark.sql("""
  INSERT INTO TABLE dest PARTITION(first) SELECT word, length, first FROM src
""")

spark.sql("""
  INSERT INTO TABLE dest PARTITION(first) SELECT word, length, cast(first as string) as first FROM src
""")
{noformat}

The exception is

{noformat}
17/06/21 14:25:53 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 10, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute
, tree: first#74
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:49)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:43)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:884)
        at org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:363)
        at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:63)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:102)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Couldn't find first#74 in [word#76,length#77,first#75]
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 40 more
{noformat}

The key to making this fail is the {{cast(first as string) as first}}.  Doing the same thing on any other column doesn't matter, it only matters on {{first}} (which is the partition column for the destination table).

Here's the explain plan from 2.2.0:
{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
+- 'Project ['word, 'length, cast('first as string) AS first#85]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
InsertIntoHiveTable CatalogTable(
Database: default
Table: dest
Owner: irashid
Created: Wed Jun 21 14:25:13 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`first`]
Schema: root
-- word: string (nullable = true)
-- length: integer (nullable = true)
-- first: string (nullable = true)
), Map(first -> None), false, false
   +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
      +- SubqueryAlias src
         +- CatalogRelation CatalogTable(
Database: default
Table: src
Owner: irashid
Created: Wed Jun 21 14:25:11 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`length`]
Schema: root
-- first: string (nullable = true)
-- word: string (nullable = true)
-- length: integer (nullable = true)
), [first#88, word#89], [length#90]

== Optimized Logical Plan ==                                                                                                                                                    [68/2430]
InsertIntoHiveTable CatalogTable(
Database: default
Table: dest
Owner: irashid
Created: Wed Jun 21 14:25:13 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`first`]
Schema: root
-- word: string (nullable = true)
-- length: integer (nullable = true)
-- first: string (nullable = true)
), Map(first -> None), false, false
   +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
      +- SubqueryAlias src
         +- CatalogRelation CatalogTable(
Database: default
Table: src
Owner: irashid
Created: Wed Jun 21 14:25:11 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`length`]
Schema: root
-- first: string (nullable = true)
-- word: string (nullable = true)
-- length: integer (nullable = true)
), [first#88, word#89], [length#90]

== Physical Plan ==
ExecutedCommand
   +- InsertIntoHiveTable CatalogTable(
Database: default
Table: dest
Owner: irashid
Created: Wed Jun 21 14:25:13 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`first`]
Schema: root
-- word: string (nullable = true)
-- length: integer (nullable = true)
-- first: string (nullable = true)
), Map(first -> None), false, false
         +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
            +- SubqueryAlias src
               +- CatalogRelation CatalogTable(
Database: default
Table: src
Owner: irashid
Created: Wed Jun 21 14:25:11 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`length`]
Schema: root
-- first: string (nullable = true)
-- word: string (nullable = true)
-- length: integer (nullable = true)
), [first#88, word#89], [length#90]|
{noformat}

And from 2.1.1:
{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), OverwriteOptions(false,Map()), false
+- 'Project ['word, 'length, cast('first as string) AS first#55]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false
+- Project [word#60, length#58, cast(first#59 as string) AS first#55]
   +- MetastoreRelation default, src

== Optimized Logical Plan ==
InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false
+- Project [word#60, length#58, first#59]
   +- MetastoreRelation default, src

== Physical Plan ==
InsertIntoHiveTable MetastoreRelation default, dest, Map(first -> None), false, false
+- HiveTableScan [word#60, length#58, first#59], MetastoreRelation default, src
{noformat}

While this example query is somewhat contrived, this is really iimportant because if you try to do the same thing where {{src}} was created by hive, then the query fails with the same error.  In that case, the explain plan looks like:

{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
+- 'Project ['word, 'length, 'first]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
   +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
      +- Project [word#49, length#50, first#48]
         +- SubqueryAlias src
            +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]

== Optimized Logical Plan ==
InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
   +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
      +- Project [word#49, length#50, first#48]
         +- SubqueryAlias src
            +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]

== Physical Plan ==
ExecutedCommand
   +- InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
         +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
            +- Project [word#49, length#50, first#48]
               +- SubqueryAlias src
                  +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org