You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Manjunath, Kiran" <ki...@akamai.com> on 2016/11/04 20:46:29 UTC

GenericRowWithSchema cannot be cast to java.lang.Double : UDAF error

I am trying to implement a sample “sum” functionality over rolling window.
Below code may not make sense (may not be efficient) but during the course of other major implementation, have stumbled on below error which is blocking.

Error Obtained -  “GenericRowWithSchema cannot be cast to java.lang.Double” during evaluation.
Is this a known problem in Spark?
I am using 2.0.0

Code
====

class GeometricMean extends UserDefinedAggregateFunction {

    def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil)
    def bufferSchema: StructType = new StructType().add("info", ArrayType(DoubleType),false)


    def dataType : DataType =  DoubleType
    def deterministic: Boolean = true

    def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = ArrayBuffer.empty[Double]

    }

    def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
        val arr1 = buffer.getAs[Seq[Double]](0)
        val arr = ArrayBuffer(input) ++ arr1
        buffer(0) = arr
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        val arr1 = buffer1.getAs[Seq[Double]](0)
        val arr = arr1 ++  buffer2.getAs[Seq[Double]](0)
        buffer1.update(0,arr)
    }

    def evaluate(buffer: Row): Any = {
        var s : Double = 0
        val arr = buffer.getAs[Seq[Double]](0)
        val arrd = arr.toArray
        arrd.foreach(s += _)
        s
    }
}

val GM  = new GeometricMean
val r = new scala.util.Random(88)
val schema = new StructType().add("id",IntegerType).add("Count",IntegerType)

val rnNum1 = for( i <- 1 to 10) yield { Row(i,r.nextInt(10-0+1)) }

val wSpec1 = Window.orderBy("id").rowsBetween(-1, +3)
val rdd = sc.parallelize(rnNum)
val df = sqlContext.createDataFrame(rdd,schema)
val dfWithMovingAvg = df.withColumn( "movingAvg",avg(df.col("Count")).over(wSpec1)).withColumn("customMovingAvg",GM(df.col("Count")).over(wSpec1))
dfWithMovingAvg.take(5).foreach(println)


Error
===


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)



Regards,
Kiran

Re: GenericRowWithSchema cannot be cast to java.lang.Double : UDAF error

Posted by "Manjunath, Kiran" <ki...@akamai.com>.
Just to add more clarity on where the issue occurs –

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:93)
at scala.collection.IndexedSeqOptimized$class.copyToArray(IndexedSeqOptimized.scala:180)
at scala.collection.mutable.WrappedArray.copyToArray(WrappedArray.scala:35)
at scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:278)
at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:104)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:286)
at scala.collection.mutable.WrappedArray.toArray(WrappedArray.scala:73)
at GeometricMean.evaluate(<console>:51)


Regards,
Kiran

From: "Manjunath, Kiran" <ki...@akamai.com>
Date: Saturday, November 5, 2016 at 2:16 AM
To: "user@spark.apache.org" <us...@spark.apache.org>
Subject: GenericRowWithSchema cannot be cast to java.lang.Double : UDAF error

I am trying to implement a sample “sum” functionality over rolling window.
Below code may not make sense (may not be efficient) but during the course of other major implementation, have stumbled on below error which is blocking.

Error Obtained -  “GenericRowWithSchema cannot be cast to java.lang.Double” during evaluation.
Is this a known problem in Spark?
I am using 2.0.0

Code
====

class GeometricMean extends UserDefinedAggregateFunction {

    def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil)
    def bufferSchema: StructType = new StructType().add("info", ArrayType(DoubleType),false)


    def dataType : DataType =  DoubleType
    def deterministic: Boolean = true

    def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = ArrayBuffer.empty[Double]

    }

    def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
        val arr1 = buffer.getAs[Seq[Double]](0)
        val arr = ArrayBuffer(input) ++ arr1
        buffer(0) = arr
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        val arr1 = buffer1.getAs[Seq[Double]](0)
        val arr = arr1 ++  buffer2.getAs[Seq[Double]](0)
        buffer1.update(0,arr)
    }

    def evaluate(buffer: Row): Any = {
        var s : Double = 0
        val arr = buffer.getAs[Seq[Double]](0)
        val arrd = arr.toArray
        arrd.foreach(s += _)
        s
    }
}

val GM  = new GeometricMean
val r = new scala.util.Random(88)
val schema = new StructType().add("id",IntegerType).add("Count",IntegerType)

val rnNum1 = for( i <- 1 to 10) yield { Row(i,r.nextInt(10-0+1)) }

val wSpec1 = Window.orderBy("id").rowsBetween(-1, +3)
val rdd = sc.parallelize(rnNum)
val df = sqlContext.createDataFrame(rdd,schema)
val dfWithMovingAvg = df.withColumn( "movingAvg",avg(df.col("Count")).over(wSpec1)).withColumn("customMovingAvg",GM(df.col("Count")).over(wSpec1))
dfWithMovingAvg.take(5).foreach(println)


Error
===


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)



Regards,
Kiran