You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tao Xiao <xi...@gmail.com> on 2014/09/15 08:06:44 UTC

combineByKey throws ClassCastException

I followd an example presented in the tutorial Learning Spark
<http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html>
to compute the per-key average as follows:


val Array(appName) = args
val sparkConf = new SparkConf()
.setAppName(appName)
val sc = new SparkContext(sparkConf)
/*
 * compute the per-key average of values
 * results should be:
 *    A : 5.8
 *    B : 14
 *    C : 60.6
 */
val rdd = sc.parallelize(List(
("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),
("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),
("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2)
val avg = rdd.combineByKey(
(x:Int) => (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
cannot be cast to java.lang.Integer
(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
acc2._2))
.map{case (s, t) => (s, t._1/t._2.toFloat)}
 avg.collect.foreach(t => println(t._1 + " ->" + t._2))



When I submitted the application, an exception of
"*java.lang.ClassCastException:
scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown out.
The tutorial said that the first function of *combineByKey*, *(x:Int) =>
(x, 1)*, should take a single element in the source RDD and return an
element of the desired type in the resulting RDD. In my application, we
take a single element of type *Int *from the source RDD and return a tuple
of type (*Int*, *Int*), which meets the requirements quite well. But why
would such an exception be thrown?

I'm using CDH 5.0 and Spark 0.9

Thanks.

Re: combineByKey throws ClassCastException

Posted by Tao Xiao <xi...@gmail.com>.
This problem was caused by the fact that I used a package jar with a Spark
version (0.9.1) different from that of the cluster (0.9.0). When I used the
correct package jar
(spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar) instead the
application can run as expected.



2014-09-15 14:57 GMT+08:00 x <wa...@gmail.com>:

> How about this.
>
> scala> val rdd2 = rdd.combineByKey(
>      | (v: Int) => v.toLong,
>      | (c: Long, v: Int) => c + v,
>      | (c1: Long, c2: Long) => c1 + c2)
> rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
> combineB
> yKey at <console>:14
>
> xj @ Tokyo
>
> On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao <xi...@gmail.com>
> wrote:
>
>> I followd an example presented in the tutorial Learning Spark
>> <http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html>
>> to compute the per-key average as follows:
>>
>>
>> val Array(appName) = args
>> val sparkConf = new SparkConf()
>> .setAppName(appName)
>> val sc = new SparkContext(sparkConf)
>> /*
>>  * compute the per-key average of values
>>  * results should be:
>>  *    A : 5.8
>>  *    B : 14
>>  *    C : 60.6
>>  */
>> val rdd = sc.parallelize(List(
>> ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),
>> ("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),
>> ("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2)
>> val avg = rdd.combineByKey(
>> (x:Int) => (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
>> cannot be cast to java.lang.Integer
>> (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
>> (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> acc2._2))
>> .map{case (s, t) => (s, t._1/t._2.toFloat)}
>>  avg.collect.foreach(t => println(t._1 + " ->" + t._2))
>>
>>
>>
>> When I submitted the application, an exception of "*java.lang.ClassCastException:
>> scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown
>> out. The tutorial said that the first function of *combineByKey*, *(x:Int)
>> => (x, 1)*, should take a single element in the source RDD and return an
>> element of the desired type in the resulting RDD. In my application, we
>> take a single element of type *Int *from the source RDD and return a
>> tuple of type (*Int*, *Int*), which meets the requirements quite well.
>> But why would such an exception be thrown?
>>
>> I'm using CDH 5.0 and Spark 0.9
>>
>> Thanks.
>>
>>
>>
>

Re: combineByKey throws ClassCastException

Posted by x <wa...@gmail.com>.
How about this.

scala> val rdd2 = rdd.combineByKey(
     | (v: Int) => v.toLong,
     | (c: Long, v: Int) => c + v,
     | (c1: Long, c2: Long) => c1 + c2)
rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
combineB
yKey at <console>:14

xj @ Tokyo

On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao <xi...@gmail.com> wrote:

> I followd an example presented in the tutorial Learning Spark
> <http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html>
> to compute the per-key average as follows:
>
>
> val Array(appName) = args
> val sparkConf = new SparkConf()
> .setAppName(appName)
> val sc = new SparkContext(sparkConf)
> /*
>  * compute the per-key average of values
>  * results should be:
>  *    A : 5.8
>  *    B : 14
>  *    C : 60.6
>  */
> val rdd = sc.parallelize(List(
> ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),
> ("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),
> ("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2)
> val avg = rdd.combineByKey(
> (x:Int) => (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
> cannot be cast to java.lang.Integer
> (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
> (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> acc2._2))
> .map{case (s, t) => (s, t._1/t._2.toFloat)}
>  avg.collect.foreach(t => println(t._1 + " ->" + t._2))
>
>
>
> When I submitted the application, an exception of "*java.lang.ClassCastException:
> scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown
> out. The tutorial said that the first function of *combineByKey*, *(x:Int)
> => (x, 1)*, should take a single element in the source RDD and return an
> element of the desired type in the resulting RDD. In my application, we
> take a single element of type *Int *from the source RDD and return a
> tuple of type (*Int*, *Int*), which meets the requirements quite well.
> But why would such an exception be thrown?
>
> I'm using CDH 5.0 and Spark 0.9
>
> Thanks.
>
>
>