You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Nick Pentreath <ni...@gmail.com> on 2016/04/06 13:35:43 UTC

ClassCastException when extracting and collecting DF array column type

Hi there,

In writing some tests for a PR I'm working on, with a more complex array
type in a DF, I ran into this issue (running off latest master).

Any thoughts?

*// create DF with a column of Array[(Int, Double)]*
val df = sc.parallelize(Seq(
(0, Array((1, 6.0), (1, 4.0))),
(1, Array((1, 3.0), (2, 1.0))),
(2, Array((3, 3.0), (4, 6.0))))
).toDF("id", "predictions")

*// extract the field from the Row, and use map to extract first element of
tuple*
*// the type of RDD appears correct*
scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }
res14: org.apache.spark.rdd.RDD[Seq[Int]] = MapPartitionsRDD[32] at map at
<console>:27

*// however, calling collect on the same expression throws
ClassCastException*
scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }.collect
16/04/06 13:02:49 ERROR Executor: Exception in task 5.0 in stage 10.0 (TID
74)
java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to scala.Tuple2
at
$line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1$$anonfun$apply$1.apply(<console>:27)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
$line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:27)
at
$line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:27)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
at scala.collection.AbstractIterator.to(Iterator.scala:1194)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)

*// can collect the extracted field*
*// again, return type appears correct*
scala> df.rdd.map { row => row.getSeq[(Int, Double)](1) }.collect
res23: Array[Seq[(Int, Double)]] = Array(WrappedArray([1,6.0], [1,4.0]),
WrappedArray([1,3.0], [2,1.0]), WrappedArray([3,3.0], [4,6.0]))

*// trying to apply map to extract first element of tuple fails*
scala> df.rdd.map { row => row.getSeq[(Int, Double)](1)
}.collect.map(_.map(_._1))
java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to scala.Tuple2
  at $anonfun$2$$anonfun$apply$1.apply(<console>:27)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at $anonfun$2.apply(<console>:27)
  at $anonfun$2.apply(<console>:27)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)

Re: ClassCastException when extracting and collecting DF array column type

Posted by Nick Pentreath <ni...@gmail.com>.
Ah I got it - Seq[(Int, Float)] is actually represented as Seq[Row] (seq of
struct type) internally.

So a further extraction is required, e.g. row => row.getSeq[Row](1).map { r
=> r.getInt(0) }

On Wed, 6 Apr 2016 at 13:35 Nick Pentreath <ni...@gmail.com> wrote:

> Hi there,
>
> In writing some tests for a PR I'm working on, with a more complex array
> type in a DF, I ran into this issue (running off latest master).
>
> Any thoughts?
>
> *// create DF with a column of Array[(Int, Double)]*
> val df = sc.parallelize(Seq(
> (0, Array((1, 6.0), (1, 4.0))),
> (1, Array((1, 3.0), (2, 1.0))),
> (2, Array((3, 3.0), (4, 6.0))))
> ).toDF("id", "predictions")
>
> *// extract the field from the Row, and use map to extract first element
> of tuple*
> *// the type of RDD appears correct*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }
> res14: org.apache.spark.rdd.RDD[Seq[Int]] = MapPartitionsRDD[32] at map at
> <console>:27
>
> *// however, calling collect on the same expression throws
> ClassCastException*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }.collect
> 16/04/06 13:02:49 ERROR Executor: Exception in task 5.0 in stage 10.0 (TID
> 74)
> java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
> cast to scala.Tuple2
> at
> $line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1$$anonfun$apply$1.apply(<console>:27)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> $line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:27)
> at
> $line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:27)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)
>
> *// can collect the extracted field*
> *// again, return type appears correct*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1) }.collect
> res23: Array[Seq[(Int, Double)]] = Array(WrappedArray([1,6.0], [1,4.0]),
> WrappedArray([1,3.0], [2,1.0]), WrappedArray([3,3.0], [4,6.0]))
>
> *// trying to apply map to extract first element of tuple fails*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1)
> }.collect.map(_.map(_._1))
> java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
> cast to scala.Tuple2
>   at $anonfun$2$$anonfun$apply$1.apply(<console>:27)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at $anonfun$2.apply(<console>:27)
>   at $anonfun$2.apply(<console>:27)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>

Re: ClassCastException when extracting and collecting DF array column type

Posted by Nick Pentreath <ni...@gmail.com>.
Ah I got it - Seq[(Int, Float)] is actually represented as Seq[Row] (seq of
struct type) internally.

So a further extraction is required, e.g. row => row.getSeq[Row](1).map { r
=> r.getInt(0) }

On Wed, 6 Apr 2016 at 13:35 Nick Pentreath <ni...@gmail.com> wrote:

> Hi there,
>
> In writing some tests for a PR I'm working on, with a more complex array
> type in a DF, I ran into this issue (running off latest master).
>
> Any thoughts?
>
> *// create DF with a column of Array[(Int, Double)]*
> val df = sc.parallelize(Seq(
> (0, Array((1, 6.0), (1, 4.0))),
> (1, Array((1, 3.0), (2, 1.0))),
> (2, Array((3, 3.0), (4, 6.0))))
> ).toDF("id", "predictions")
>
> *// extract the field from the Row, and use map to extract first element
> of tuple*
> *// the type of RDD appears correct*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }
> res14: org.apache.spark.rdd.RDD[Seq[Int]] = MapPartitionsRDD[32] at map at
> <console>:27
>
> *// however, calling collect on the same expression throws
> ClassCastException*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1).map(_._1) }.collect
> 16/04/06 13:02:49 ERROR Executor: Exception in task 5.0 in stage 10.0 (TID
> 74)
> java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
> cast to scala.Tuple2
> at
> $line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1$$anonfun$apply$1.apply(<console>:27)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> $line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:27)
> at
> $line54.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:27)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:880)
>
> *// can collect the extracted field*
> *// again, return type appears correct*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1) }.collect
> res23: Array[Seq[(Int, Double)]] = Array(WrappedArray([1,6.0], [1,4.0]),
> WrappedArray([1,3.0], [2,1.0]), WrappedArray([3,3.0], [4,6.0]))
>
> *// trying to apply map to extract first element of tuple fails*
> scala> df.rdd.map { row => row.getSeq[(Int, Double)](1)
> }.collect.map(_.map(_._1))
> java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
> cast to scala.Tuple2
>   at $anonfun$2$$anonfun$apply$1.apply(<console>:27)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at $anonfun$2.apply(<console>:27)
>   at $anonfun$2.apply(<console>:27)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>