You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:01:06 UTC

[jira] [Updated] (SPARK-21162) Cannot count rows in an empty Hive table stored as parquet when spark.sql.parquet.cacheMetadata is set to false

     [ https://issues.apache.org/jira/browse/SPARK-21162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-21162:
---------------------------------
    Labels: bulk-closed  (was: )

> Cannot count rows in an empty Hive table stored as parquet when spark.sql.parquet.cacheMetadata is set to false
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21162
>                 URL: https://issues.apache.org/jira/browse/SPARK-21162
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.2, 1.6.3
>            Reporter: Tom Ogle
>            Priority: Major
>              Labels: bulk-closed
>
> With spark.sql.parquet.cacheMetadata set to false, creating an empty Hive table stored as Parquet and then trying to count the rows using SparkSQL throws an IOException. The issue does not affect Spark 2. This issue is inconvenient in environments using Spark 1.6.x where spark.sql.parquet.cacheMetadata is explicitly set to false for some reason, such as in Google DataProc 1.0.
> Here is the stacktrace:
> {code}
> 17/06/21 15:30:10 INFO ParquetRelation: Reading Parquet file(s) from 
> Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#30L])
> +- TungstenExchange SinglePartition, None
>    +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#33L])
>       +- Scan ParquetRelation: my_test_db.test_table[] InputPaths: <snip>/my_test_db.db/test_table
> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
> 	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> 	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
> 	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
> 	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
> 	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
> 	at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
> 	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
> 	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
> 	at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1516)
> 	at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
> 	at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
> 	at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1515)
> 	at App$.main(App.scala:23)
> 	at App.main(App.scala)
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> TungstenExchange SinglePartition, None
> +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#33L])
>    +- Scan ParquetRelation: my_test_db.test_table[] InputPaths: <snip>/my_test_db.db/test_table
> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
> 	at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> 	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
> 	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> 	... 19 more
> Caused by: java.io.IOException: No input paths specified in job
> 	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231)
> 	at org.apache.parquet.hadoop.ParquetInputFormat.listStatus(ParquetInputFormat.java:339)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1$$anon$4.listStatus(ParquetRelation.scala:358)
> 	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
> 	at org.apache.parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:294)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1.getPartitions(ParquetRelation.scala:363)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> 	at scala.Option.getOrElse(Option.scala:120)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> 	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> 	at scala.Option.getOrElse(Option.scala:120)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> 	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> 	at scala.Option.getOrElse(Option.scala:120)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> 	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
> 	at org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
> 	at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
> 	at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> 	... 27 more
> {code}
> Here is some Scala code to reproduce the issue locally:
> App.scala:
> {code}
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.{SparkConf, SparkContext}
> object App {
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName("Testing Issue").setMaster("local[*]")
>     val sc = new SparkContext(conf)
>     val hiveContext = new HiveContext(sc)
>     hiveContext.setConf("spark.sql.parquet.cacheMetadata", "false")
>     val databaseName = "my_test_db"
>     val tableName = "test_table"
>     val fullTableName = databaseName + "." + tableName
>     hiveContext.sql("DROP TABLE IF EXISTS " + fullTableName)
>     hiveContext.sql("DROP DATABASE IF EXISTS " + databaseName)
>     hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + databaseName)
>     hiveContext.sql(
>       s"""CREATE TABLE IF NOT EXISTS $fullTableName
>          | (x string) stored as parquet
>        """.stripMargin)
>     hiveContext.table(fullTableName).count()
>     sc.stop()
>   }
> }
> {code}
> build.sbt:
> {code}
> name := "test-issue"
> version := "1.0"
> scalaVersion := "2.10.5"
> val sparkVersion = "1.6.3"
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % sparkVersion,
>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>   "org.apache.spark" %% "spark-hive" % sparkVersion
> )
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org