You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tom Ogle (JIRA)" <ji...@apache.org> on 2017/06/21 14:56:00 UTC

[jira] [Created] (SPARK-21162) Cannot count rows in an empty Hive table stored as parquet when spark.sql.parquet.cacheMetadata is set to false

Tom Ogle created SPARK-21162:
--------------------------------

             Summary: Cannot count rows in an empty Hive table stored as parquet when spark.sql.parquet.cacheMetadata is set to false
                 Key: SPARK-21162
                 URL: https://issues.apache.org/jira/browse/SPARK-21162
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.6.3, 1.6.2
            Reporter: Tom Ogle


With spark.sql.parquet.cacheMetadata set to false, creating an empty Hive table stored as Parquet and then trying to count the rows using SparkSQL throws an IOException. The issue does not affect Spark 2. This issue is inconvenient in environments using Spark 1.6.x where spark.sql.parquet.cacheMetadata is explicitly set to false for some reason, such as in Google DataProc 1.0.

Here is the stacktrace:

{code}
17/06/21 15:30:10 INFO ParquetRelation: Reading Parquet file(s) from 
Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#30L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#33L])
      +- Scan ParquetRelation: my_test_db.test_table[] InputPaths: <snip>/my_test_db.db/test_table

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
	at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
	at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1516)
	at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
	at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
	at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1515)
	at App$.main(App.scala:23)
	at App.main(App.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#33L])
   +- Scan ParquetRelation: my_test_db.test_table[] InputPaths: <snip>/my_test_db.db/test_table

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
	at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
	... 19 more
Caused by: java.io.IOException: No input paths specified in job
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231)
	at org.apache.parquet.hadoop.ParquetInputFormat.listStatus(ParquetInputFormat.java:339)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1$$anon$4.listStatus(ParquetRelation.scala:358)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
	at org.apache.parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:294)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1.getPartitions(ParquetRelation.scala:363)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
	at org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
	at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
	at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
	... 27 more
{code}

Here is some Scala code to reproduce the issue locally:

App.scala:
{code}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object App {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Testing Issue").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)
    hiveContext.setConf("spark.sql.parquet.cacheMetadata", "false")

    val databaseName = "my_test_db"
    val tableName = "test_table"
    val fullTableName = databaseName + "." + tableName

    hiveContext.sql("DROP TABLE IF EXISTS " + fullTableName)
    hiveContext.sql("DROP DATABASE IF EXISTS " + databaseName)
    hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + databaseName)
    hiveContext.sql(
      s"""CREATE TABLE IF NOT EXISTS $fullTableName
         | (x string) stored as parquet
       """.stripMargin)

    hiveContext.table(fullTableName).count()
    sc.stop()
  }
}
{code}
build.sbt:
{code}
name := "test-issue"

version := "1.0"

scalaVersion := "2.10.5"

val sparkVersion = "1.6.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion
)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org