You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Glenn Strycker (JIRA)" <ji...@apache.org> on 2015/09/23 16:26:04 UTC

[jira] [Resolved] (SPARK-10762) GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table

     [ https://issues.apache.org/jira/browse/SPARK-10762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Glenn Strycker resolved SPARK-10762.
------------------------------------
    Resolution: Not A Problem

Instead of {code}a(4).asInstanceOf[scala.collection.mutable.ArrayBuffer[(Int,String)]]{code}, I should be using {code}a(4).asInstanceOf[ArrayBuffer[Row]].map{case x:Row => (x(0).asInstanceOf[Int],x(1).asInstanceOf[String])}{code}

> GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-10762
>                 URL: https://issues.apache.org/jira/browse/SPARK-10762
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Glenn Strycker
>
> I have a Hive table in parquet format that was generated using
> {code}
> create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;
> {code}
> I am able to verify that it was filled -- here is a sample value
> {code}
> [1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]
> {code}
> I wish to put this into a Spark RDD of the form
> {code}
> ((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))
> {code}
> Now, using spark-shell (I get the same problem in spark-submit), I made a test RDD with these values
> {code}
> scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
> tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85
> {code}
> using an iterator, I can cast the ArrayBuffer as a HashSet in the following new RDD:
> {code}
> scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
> tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87
> scala> tempRDD2.collect.foreach(println)
> ((1,abcdef),((2,ghijkl),Set((1,hello))))
> {code}
> But when I attempt to do the EXACT SAME THING with a DataFrame with a HiveContext / SQLContext, I get the following error:
> {code}
> scala> val hc = new HiveContext(sc)
> scala> import hc._
> scala> import hc.implicits._
> scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")
> scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))
> scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
> tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91
> scala> tempRDD3.collect.foreach(println)
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
>        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91)
>        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
>        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
>        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>        at org.apache.spark.scheduler.Task.run(Task.scala:64)
>        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>        at java.lang.Thread.run(Thread.java:724)
> Driver stacktrace:
>        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
>        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
>        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
>        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>        at scala.Option.foreach(Option.scala:236)
>        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> Note that I get this same error "GenericRowWithSchema cannot be cast to scala.Tuple2" when I run this in a compiled program using spark-submit. The program crashes at RUN TIME when it encounters the conversion step, and I had no compiler errors.
> It seems very strange to me that my artificially generated RDD "tempRDD" would work with the conversion, whereas the Hive query DataFrame->RDD did not. I checked, and both of the RDDs have the same form:
> {code}
> scala> tempRDD
> org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776
> scala> tempRDDfromHive
> org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70
> {code}
> the only difference is where their last step originated. I even tried persisting, checkpointing, and materializing these RDDs before running the steps for tempRDD2 and tempRDD3. All got the same error message.
> I also read though related stackoverflow questions and Apache Spark Jira issues, and from those I attempted casting the ArrayBuffer as an Iterator instead, but that also failed on the second step with the same error.
> Since the error seems to be only for the Hive table version, I'm tempted to think that this is an issue with Spark/Hive integration in SparkQL.
> Possibly related Apache Spark Jira Issues:
> https://issues.apache.org/jira/browse/SPARK-1040
> https://issues.apache.org/jira/browse/SPARK-2737
> https://issues.apache.org/jira/browse/SPARK-4489 (still open)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org