You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Paul Magid <Pa...@toyota.com> on 2014/09/18 18:31:17 UTC

Spark SQL Exception

All:

I am putting Spark SQL 1.1 through its paces (in a POC) and have been pleasantly surprised with what can be done with such a young technology.    I have run into an exception (listed below) that I suspect relates to the number of columns in the table I am querying.   There are 336 columns in the table.   I have included the Scala / Spark SQL I am running.  This Spark SQL code runs just fine when run against "narrower" tables.   Also, we have purpose built this POC cluster with lots of memory and we have set up Impala and Spark SQL with roughly the same amounts of memory.   There are 7 worker nodes with 20GB memory for Impala and Spark SQL each.  We are using Impala as a comparative benchmark and sanity check.  The equivalent SQL runs just fine in Impala (see below).   I am a bit of a noob and any help (even with the code below) is greatly appreciated.  Also, is there a document that lists current Spark SQL limitations/issues?

Paul Magid
Toyota Motor Sales IS Enterprise Architecture (EA)
Architect I R&D
Ph: 310-468-9091 (X69091)
PCN 1C2970, Mail Drop PN12


Successful Result In Impala
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+----------------+
| marital_status |
+----------------+
| M              |
| S              |
| U              |
| null           |
+----------------+
Returned 4 row(s) in 0.91s

Code
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
//Timer code
def time[R](block: => R): R = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0).toFloat/1000000000 + "s")
    result
}

//Declare and import SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

//Load Parquet file into a table
val parquetFile_db2 = sqlContext.parquetFile("hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/")
parquetFile_db2.registerAsTable("customer_demographic_pq")

//Run SQL code with timer
val records= time {sql("select marital_status from customer_demographic_pq group by marital_status order by marital_status ").collect().foreach(println)}


Exception
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
14/09/18 08:50:39 INFO SparkContext: Job finished: RangePartitioner at Exchange.scala:79, took 21.885859255 s
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
Sort [marital_status#9 ASC], true
Exchange (RangePartitioning [marital_status#9 ASC], 200)
  Aggregate false, [marital_status#9], [marital_status#9]
   Exchange (HashPartitioning [marital_status#9], 200)
    Aggregate true, [marital_status#9], [marital_status#9]
     ParquetTableScan [marital_status#9], (ParquetRelation hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de, []), []

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
        at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
        at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcV$sp(<console>:19)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:19)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:19)
        at $iwC$$iwC$$iwC$$iwC.time(<console>:12)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:19)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:26)
        at $iwC$$iwC$$iwC.<init>(<console>:28)
        at $iwC$$iwC.<init>(<console>:30)
        at $iwC.<init>(<console>:32)
        at <init>(<console>:34)
        at .<init>(<console>:38)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange (RangePartitioning [marital_status#9 ASC], 200)
Aggregate false, [marital_status#9], [marital_status#9]
  Exchange (HashPartitioning [marital_status#9], 200)
   Aggregate true, [marital_status#9], [marital_status#9]
    ParquetTableScan [marital_status#9], (ParquetRelation hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de, []), []

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
        at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44)
        at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:192)
        at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:193)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
        ... 49 more
Caused by: scala.MatchError: BinaryType (of class org.apache.spark.sql.catalyst.types.BinaryType$)
        at org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:256)
        at org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:238)
        at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
        at java.util.TimSort.countRunAndMakeAscending(Unknown Source)
        at java.util.TimSort.sort(Unknown Source)
        at java.util.TimSort.sort(Unknown Source)
        at java.util.Arrays.sort(Unknown Source)
        at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
        at scala.collection.AbstractSeq.sorted(Seq.scala:40)
        at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
        at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
        at org.apache.spark.RangePartitioner$.determineBounds(Partitioner.scala:279)
        at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:152)
        at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:79)
        at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
        ... 53 more

RE: Spark SQL Exception

Posted by Paul Magid <Pa...@toyota.com>.
Michael:

Thanks for the quick response.   I can confirm that once I removed the “order by” clause the exception below went away.   So, I believe this confirms what you were say and I will be opening a new feature request in JIRA.

However, that exception was replaced by a java.lang.OutOfMemoryError: Java heap space error.   I am guessing this relates to any of the following Issues:
SPARK-2902 Change default options to be more agressive (In memory columnar compression)
SPARK-3056 Sort-based Aggregation (SparkSQL only support the hash-based aggregation, which may cause OOM if too many identical keys in the input tuples.)
SPARK-2926 Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

The Exception is included below.

Paul Magid
Toyota Motor Sales IS Enterprise Architecture (EA)
Architect I R&D
Ph: 310-468-9091 (X69091)
PCN 1C2970, Mail Drop PN12

Exception
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
14/09/18 10:11:03 INFO TaskSetManager: Finished task 36.0 in stage 0.0 (TID 57) in 18681 ms on votlbdcd04.tms.toyota.com (5/200)
14/09/18 10:11:09 ERROR Utils: Uncaught exception in thread Result resolver thread-0
java.lang.OutOfMemoryError: Java heap space
Exception in thread "Result resolver thread-0" 14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
java.lang.OutOfMemoryError: Java heap space
14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
14/09/18 10:11:09 INFO Remoting: Remoting shut down
14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
        at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
        at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
        at akka.actor.ActorCell.terminate(ActorCell.scala:338)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
        at akka.dispatch.Mailbox.run(Mailbox.scala:218)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


scala> 14/09/18 10:11:09 INFO TaskSetManager: Finished task 50.0 in stage 0.0 (TID 71) in 27100 ms on votlbdcd04.tms.toyota.com (6/200)
14/09/18 10:11:10 INFO TaskSetManager: Finished task 22.0 in stage 0.0 (TID 43) in 27520 ms on votlbdcd04.tms.toyota.com (7/200)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@7a542d34
14/09/18 10:11:10 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@7a542d34
java.nio.channels.CancelledKeyException
        at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
        at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd06.tms.toyota.com,19998)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(votlbdcd06.tms.toyota.com,19998)
14/09/18 10:11:10 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(votlbdcd06.tms.toyota.com,19998) not found
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd04.tms.toyota.com,25043)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(votlbdcd04.tms.toyota.com,25043)
14/09/18 10:11:10 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(votlbdcd04.tms.toyota.com,25043) not found
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd07.tms.toyota.com,28529)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(votlbdcd07.tms.toyota.com,28529)
14/09/18 10:11:10 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(votlbdcd07.tms.toyota.com,28529) not found
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd02.tms.toyota.com,16101)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(votlbdcd02.tms.toyota.com,16101)
14/09/18 10:11:10 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(votlbdcd02.tms.toyota.com,16101) not found
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(votlbdcd08.tms.toyota.com,49294)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(votlbdcd08.tms.toyota.com,49294)
14/09/18 10:11:10 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(votlbdcd08.tms.toyota.com,49294) not found

From: Michael Armbrust [mailto:michael@databricks.com]
Sent: Thursday, September 18, 2014 9:47 AM
To: Paul Magid
Cc: user@spark.apache.org; Brian Kursar (TMS)
Subject: Re: Spark SQL Exception

Its failing to sort because the columns are of Binary type (though maybe we should support this as well).  Is this parquet data that was generated by impala that you would expect to be a String?  If so turn on spark.sql.parquet.binaryAsString<http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration>. Otherwise you could file a JIRA asking us to add support for sorting binary data (or both :) ).

Michael

On Thu, Sep 18, 2014 at 9:31 AM, Paul Magid <Pa...@toyota.com>> wrote:
All:

I am putting Spark SQL 1.1 through its paces (in a POC) and have been pleasantly surprised with what can be done with such a young technology.    I have run into an exception (listed below) that I suspect relates to the number of columns in the table I am querying.   There are 336 columns in the table.   I have included the Scala / Spark SQL I am running.  This Spark SQL code runs just fine when run against “narrower” tables.   Also, we have purpose built this POC cluster with lots of memory and we have set up Impala and Spark SQL with roughly the same amounts of memory.   There are 7 worker nodes with 20GB memory for Impala and Spark SQL each.  We are using Impala as a comparative benchmark and sanity check.  The equivalent SQL runs just fine in Impala (see below).   I am a bit of a noob and any help (even with the code below) is greatly appreciated.  Also, is there a document that lists current Spark SQL limitations/issues?

Paul Magid
Toyota Motor Sales IS Enterprise Architecture (EA)
Architect I R&D
Ph: 310-468-9091<tel:310-468-9091> (X69091)
PCN 1C2970, Mail Drop PN12


Successful Result In Impala
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+----------------+
| marital_status |
+----------------+
| M              |
| S              |
| U              |
| null           |
+----------------+
Returned 4 row(s) in 0.91s

Code
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
//Timer code
def time[R](block: => R): R = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0).toFloat/1000000000 + "s")
    result
}

//Declare and import SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

//Load Parquet file into a table
val parquetFile_db2 = sqlContext.parquetFile("hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/")
parquetFile_db2.registerAsTable("customer_demographic_pq")

//Run SQL code with timer
val records= time {sql("select marital_status from customer_demographic_pq group by marital_status order by marital_status ").collect().foreach(println)}


Exception
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
14/09/18 08:50:39 INFO SparkContext: Job finished: RangePartitioner at Exchange.scala:79, took 21.885859255 s
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
Sort [marital_status#9 ASC], true
Exchange (RangePartitioning [marital_status#9 ASC], 200)
  Aggregate false, [marital_status#9], [marital_status#9]
   Exchange (HashPartitioning [marital_status#9], 200)
    Aggregate true, [marital_status#9], [marital_status#9]
     ParquetTableScan [marital_status#9], (ParquetRelation hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de<ma...@4d79d3de>, []), []

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
        at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
        at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcV$sp(<console>:19)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:19)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:19)
        at $iwC$$iwC$$iwC$$iwC.time(<console>:12)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:19)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:26)
        at $iwC$$iwC$$iwC.<init>(<console>:28)
        at $iwC$$iwC.<init>(<console>:30)
        at $iwC.<init>(<console>:32)
        at <init>(<console>:34)
        at .<init>(<console>:38)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange (RangePartitioning [marital_status#9 ASC], 200)
Aggregate false, [marital_status#9], [marital_status#9]
  Exchange (HashPartitioning [marital_status#9], 200)
   Aggregate true, [marital_status#9], [marital_status#9]
    ParquetTableScan [marital_status#9], (ParquetRelation hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de<ma...@4d79d3de>, []), []

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
        at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44)
        at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:192)
        at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:193)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
        ... 49 more
Caused by: scala.MatchError: BinaryType (of class org.apache.spark.sql.catalyst.types.BinaryType$)
        at org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:256)
        at org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:238)
        at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
        at java.util.TimSort.countRunAndMakeAscending(Unknown Source)
        at java.util.TimSort.sort(Unknown Source)
        at java.util.TimSort.sort(Unknown Source)
        at java.util.Arrays.sort(Unknown Source)
        at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
        at scala.collection.AbstractSeq.sorted(Seq.scala:40)
        at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
        at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
        at org.apache.spark.RangePartitioner$.determineBounds(Partitioner.scala:279)
        at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:152)
        at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:79)
        at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
        ... 53 more


Re: Spark SQL Exception

Posted by Michael Armbrust <mi...@databricks.com>.
Its failing to sort because the columns are of Binary type (though maybe we
should support this as well).  Is this parquet data that was generated by
impala that you would expect to be a String?  If so turn on
spark.sql.parquet.binaryAsString
<http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration>
. Otherwise you could file a JIRA asking us to add support for sorting
binary data (or both :) ).

Michael

On Thu, Sep 18, 2014 at 9:31 AM, Paul Magid <Pa...@toyota.com> wrote:

>  All:
>
>
>
> I am putting Spark SQL 1.1 through its paces (in a POC) and have been
> pleasantly surprised with what can be done with such a young technology.
> I have run into an exception (listed below) that I suspect relates to the
> number of columns in the table I am querying.   There are 336 columns in
> the table.   I have included the Scala / Spark SQL I am running.  This
> Spark SQL code runs just fine when run against “narrower” tables.   Also,
> we have purpose built this POC cluster with lots of memory and we have set
> up Impala and Spark SQL with roughly the same amounts of memory.   There
> are 7 worker nodes with 20GB memory for Impala and Spark SQL each.  We are
> using Impala as a comparative benchmark and sanity check.  The equivalent
> SQL runs just fine in Impala (see below).   I am a bit of a noob and any
> help (even with the code below) is greatly appreciated.  Also, is there a
> document that lists current Spark SQL limitations/issues?
>
>
>
> Paul Magid
>
> Toyota Motor Sales IS Enterprise Architecture (EA)
>
> Architect I R&D
>
> Ph: 310-468-9091 (X69091)
>
> PCN 1C2970, Mail Drop PN12
>
>
>
>
>
> *Successful Result In Impala*
>
>
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> +----------------+
>
> | marital_status |
>
> +----------------+
>
> | M              |
>
> | S              |
>
> | U              |
>
> | null           |
>
> +----------------+
>
> Returned 4 row(s) in 0.91s
>
>
>
> *Code*
>
>
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> //Timer code
>
> def time[R](block: => R): R = {
>
>     val t0 = System.nanoTime()
>
>     val result = block    // call-by-name
>
>     val t1 = System.nanoTime()
>
>     println("Elapsed time: " + (t1 - t0).toFloat/1000000000 + "s")
>
>     result
>
> }
>
>
>
> //Declare and import SQLContext
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> import sqlContext._
>
>
>
> //Load Parquet file into a table
>
> val parquetFile_db2 =
> sqlContext.parquetFile("hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/")
>
> parquetFile_db2.registerAsTable("customer_demographic_pq")
>
>
>
> //Run SQL code with timer
>
> val records= time {sql("select marital_status from customer_demographic_pq
> group by marital_status order by marital_status
> ").collect().foreach(println)}
>
>
>
>
>
> *Exception*
>
>
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> 14/09/18 08:50:39 INFO SparkContext: Job finished: RangePartitioner at
> Exchange.scala:79, took 21.885859255 s
>
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
>
> Sort [marital_status#9 ASC], true
>
> Exchange (RangePartitioning [marital_status#9 ASC], 200)
>
>   Aggregate false, [marital_status#9], [marital_status#9]
>
>    Exchange (HashPartitioning [marital_status#9], 200)
>
>     Aggregate true, [marital_status#9], [marital_status#9]
>
>      ParquetTableScan [marital_status#9], (ParquetRelation
> hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/,
> Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
> mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
> hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de, []), []
>
>
>
>         at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
>
>         at
> org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
>
>         at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
>
>         at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
>
>         at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcV$sp(<console>:19)
>
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:19)
>
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:19)
>
>         at $iwC$$iwC$$iwC$$iwC.time(<console>:12)
>
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:19)
>
>         at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
>
>         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:26)
>
>         at $iwC$$iwC$$iwC.<init>(<console>:28)
>
>         at $iwC$$iwC.<init>(<console>:30)
>
>         at $iwC.<init>(<console>:32)
>
>         at <init>(<console>:34)
>
>         at .<init>(<console>:38)
>
>         at .<clinit>(<console>)
>
>         at .<init>(<console>:7)
>
>         at .<clinit>(<console>)
>
>         at $print(<console>)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>
>         at java.lang.reflect.Method.invoke(Unknown Source)
>
>         at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
>
>         at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
>
>         at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
>
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
>
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
>
>         at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
>
>         at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
>
>         at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
>
>         at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
>
>         at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
>
>         at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
>
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
>
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
>
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
>
>         at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>
>         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
>
>         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
>
>         at org.apache.spark.repl.Main$.main(Main.scala:31)
>
>         at org.apache.spark.repl.Main.main(Main.scala)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>
>         at java.lang.reflect.Method.invoke(Unknown Source)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
>
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> execute, tree:
>
> Exchange (RangePartitioning [marital_status#9 ASC], 200)
>
> Aggregate false, [marital_status#9], [marital_status#9]
>
>   Exchange (HashPartitioning [marital_status#9], 200)
>
>    Aggregate true, [marital_status#9], [marital_status#9]
>
>     ParquetTableScan [marital_status#9], (ParquetRelation
> hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/,
> Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
> mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
> hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de, []), []
>
>
>
>         at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
>
>         at
> org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44)
>
>         at
> org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:192)
>
>         at
> org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:193)
>
>         at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
>
>         ... 49 more
>
> Caused by: scala.MatchError: BinaryType (of class
> org.apache.spark.sql.catalyst.types.BinaryType$)
>
>         at
> org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:256)
>
>         at
> org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:238)
>
>         at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
>
>         at java.util.TimSort.countRunAndMakeAscending(Unknown Source)
>
>         at java.util.TimSort.sort(Unknown Source)
>
>         at java.util.TimSort.sort(Unknown Source)
>
>         at java.util.Arrays.sort(Unknown Source)
>
>         at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
>
>         at scala.collection.AbstractSeq.sorted(Seq.scala:40)
>
>         at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
>
>         at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
>
>         at
> org.apache.spark.RangePartitioner$.determineBounds(Partitioner.scala:279)
>
>         at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:152)
>
>         at
> org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:79)
>
>         at
> org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45)
>
>         at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
>
>         ... 53 more
>