You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by shyla deshpande <de...@gmail.com> on 2017/03/24 00:18:11 UTC

Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

This is my input data. The UDAF needs to aggregate the goals for a team and
return a map that  gives the count for every goal in the team.
I am getting the following error

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
cannot be cast to [Ljava.lang.String;
at com.whil.common.GoalAggregator.update(GoalAggregator.scala:27)

+------+--------------+
|teamid|goals         |
+------+--------------+
|t1    |[Goal1, Goal2]|
|t1    |[Goal1, Goal3]|
|t2    |[Goal1, Goal2]|
|t3    |[Goal2, Goal3]|
+------+--------------+

root
 |-- teamid: string (nullable = true)
 |-- goals: array (nullable = true)
 |    |-- element: string (containsNull = true)

/////////////////////////Calling the UDAF//////////

object TestUDAF {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext
    val sqlContext = spark.sqlContext

    import sqlContext.implicits._

    val data = Seq(
      ("t1", Seq("Goal1", "Goal2")),
      ("t1", Seq("Goal1", "Goal3")),
      ("t2", Seq("Goal1", "Goal2")),
      ("t3", Seq("Goal2", "Goal3"))).toDF("teamid","goals")

    data.show(truncate = false)
    data.printSchema()

    import spark.implicits._

    val sumgoals = new GoalAggregator
    val result = data.groupBy("teamid").agg(sumgoals(col("goals")))

    result.show(truncate = false)

  }
}

///////////////UDAF/////////////////

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

class GoalAggregator extends UserDefinedAggregateFunction{

  override def inputSchema: org.apache.spark.sql.types.StructType =
  StructType(StructField("value", ArrayType(StringType)) :: Nil)

  override def bufferSchema: StructType = StructType(
      StructField("combined", MapType(StringType,IntegerType)) :: Nil
  )

  override def dataType: DataType = MapType(StringType,IntegerType)

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0, Map[String, Integer]())
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val mapbuf = buffer.getAs[Map[String, Int]](0)
    val arrayinput = input.getAs[Array[String]](0)
    val result = mapbuf ++ arrayinput.map(goal => {
      val cnt  = mapbuf.get(goal).getOrElse(0) + 1
      goal -> cnt
    })
    buffer.update(0, result)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val map1 = buffer1.getAs[Map[String, Int]](0)
    val map2 = buffer2.getAs[Map[String, Int]](0)
    val result = map1 ++ map2.map { case (k,v) =>
      val cnt = map1.get(k).getOrElse(0) + 1
      k -> cnt
    }
    buffer1.update(0, result)
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[Map[String, Int]](0)
  }
}

Re: Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

Posted by Georg Heiler <ge...@gmail.com>.
Maybe an udf to flatten is an interesting option as well.
http://stackoverflow.com/q/42888711/2587904 would a uadf very more
performant?
shyla deshpande <de...@gmail.com> schrieb am Fr. 24. März 2017 um
04:04:

> Thanks a million Yong. Great help!!! It solved my problem.
>
> On Thu, Mar 23, 2017 at 6:00 PM, Yong Zhang <ja...@hotmail.com> wrote:
>
> Change:
>
> val arrayinput = input.getAs[Array[String]](0)
>
> to:
>
> val arrayinput = input.getAs[*Seq*[String]](0)
>
>
> Yong
>
>
> ------------------------------
> *From:* shyla deshpande <de...@gmail.com>
> *Sent:* Thursday, March 23, 2017 8:18 PM
> *To:* user
> *Subject:* Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!
>
> This is my input data. The UDAF needs to aggregate the goals for a team
> and return a map that  gives the count for every goal in the team.
> I am getting the following error
>
> java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
> cannot be cast to [Ljava.lang.String;
> at com.whil.common.GoalAggregator.update(GoalAggregator.scala:27)
>
> +------+--------------+
> |teamid|goals         |
> +------+--------------+
> |t1    |[Goal1, Goal2]|
> |t1    |[Goal1, Goal3]|
> |t2    |[Goal1, Goal2]|
> |t3    |[Goal2, Goal3]|
> +------+--------------+
>
> root
>  |-- teamid: string (nullable = true)
>  |-- goals: array (nullable = true)
>  |    |-- element: string (containsNull = true)
>
> /////////////////////////Calling the UDAF//////////
>
> object TestUDAF {
>   def main(args: Array[String]): Unit = {
>
>     val spark = SparkSession
>       .builder
>       .getOrCreate()
>
>     val sc: SparkContext = spark.sparkContext
>     val sqlContext = spark.sqlContext
>
>     import sqlContext.implicits._
>
>     val data = Seq(
>       ("t1", Seq("Goal1", "Goal2")),
>       ("t1", Seq("Goal1", "Goal3")),
>       ("t2", Seq("Goal1", "Goal2")),
>       ("t3", Seq("Goal2", "Goal3"))).toDF("teamid","goals")
>
>     data.show(truncate = false)
>     data.printSchema()
>
>     import spark.implicits._
>
>     val sumgoals = new GoalAggregator
>     val result = data.groupBy("teamid").agg(sumgoals(col("goals")))
>
>     result.show(truncate = false)
>
>   }
> }
>
> ///////////////UDAF/////////////////
>
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
>
> class GoalAggregator extends UserDefinedAggregateFunction{
>
>   override def inputSchema: org.apache.spark.sql.types.StructType =
>   StructType(StructField("value", ArrayType(StringType)) :: Nil)
>
>   override def bufferSchema: StructType = StructType(
>       StructField("combined", MapType(StringType,IntegerType)) :: Nil
>   )
>
>   override def dataType: DataType = MapType(StringType,IntegerType)
>
>   override def deterministic: Boolean = true
>
>   override def initialize(buffer: MutableAggregationBuffer): Unit = {
>     buffer.update(0, Map[String, Integer]())
>   }
>
>   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
>     val mapbuf = buffer.getAs[Map[String, Int]](0)
>     val arrayinput = input.getAs[Array[String]](0)
>     val result = mapbuf ++ arrayinput.map(goal => {
>       val cnt  = mapbuf.get(goal).getOrElse(0) + 1
>       goal -> cnt
>     })
>     buffer.update(0, result)
>   }
>
>   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>     val map1 = buffer1.getAs[Map[String, Int]](0)
>     val map2 = buffer2.getAs[Map[String, Int]](0)
>     val result = map1 ++ map2.map { case (k,v) =>
>       val cnt = map1.get(k).getOrElse(0) + 1
>       k -> cnt
>     }
>     buffer1.update(0, result)
>   }
>
>   override def evaluate(buffer: Row): Any = {
>     buffer.getAs[Map[String, Int]](0)
>   }
> }
>
>
>
>
>

Re: Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

Posted by shyla deshpande <de...@gmail.com>.
Thanks a million Yong. Great help!!! It solved my problem.

On Thu, Mar 23, 2017 at 6:00 PM, Yong Zhang <ja...@hotmail.com> wrote:

> Change:
>
> val arrayinput = input.getAs[Array[String]](0)
>
> to:
>
> val arrayinput = input.getAs[*Seq*[String]](0)
>
>
> Yong
>
>
> ------------------------------
> *From:* shyla deshpande <de...@gmail.com>
> *Sent:* Thursday, March 23, 2017 8:18 PM
> *To:* user
> *Subject:* Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!
>
> This is my input data. The UDAF needs to aggregate the goals for a team
> and return a map that  gives the count for every goal in the team.
> I am getting the following error
>
> java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
> cannot be cast to [Ljava.lang.String;
> at com.whil.common.GoalAggregator.update(GoalAggregator.scala:27)
>
> +------+--------------+
> |teamid|goals         |
> +------+--------------+
> |t1    |[Goal1, Goal2]|
> |t1    |[Goal1, Goal3]|
> |t2    |[Goal1, Goal2]|
> |t3    |[Goal2, Goal3]|
> +------+--------------+
>
> root
>  |-- teamid: string (nullable = true)
>  |-- goals: array (nullable = true)
>  |    |-- element: string (containsNull = true)
>
> /////////////////////////Calling the UDAF//////////
>
> object TestUDAF {
>   def main(args: Array[String]): Unit = {
>
>     val spark = SparkSession
>       .builder
>       .getOrCreate()
>
>     val sc: SparkContext = spark.sparkContext
>     val sqlContext = spark.sqlContext
>
>     import sqlContext.implicits._
>
>     val data = Seq(
>       ("t1", Seq("Goal1", "Goal2")),
>       ("t1", Seq("Goal1", "Goal3")),
>       ("t2", Seq("Goal1", "Goal2")),
>       ("t3", Seq("Goal2", "Goal3"))).toDF("teamid","goals")
>
>     data.show(truncate = false)
>     data.printSchema()
>
>     import spark.implicits._
>
>     val sumgoals = new GoalAggregator
>     val result = data.groupBy("teamid").agg(sumgoals(col("goals")))
>
>     result.show(truncate = false)
>
>   }
> }
>
> ///////////////UDAF/////////////////
>
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
>
> class GoalAggregator extends UserDefinedAggregateFunction{
>
>   override def inputSchema: org.apache.spark.sql.types.StructType =
>   StructType(StructField("value", ArrayType(StringType)) :: Nil)
>
>   override def bufferSchema: StructType = StructType(
>       StructField("combined", MapType(StringType,IntegerType)) :: Nil
>   )
>
>   override def dataType: DataType = MapType(StringType,IntegerType)
>
>   override def deterministic: Boolean = true
>
>   override def initialize(buffer: MutableAggregationBuffer): Unit = {
>     buffer.update(0, Map[String, Integer]())
>   }
>
>   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
>     val mapbuf = buffer.getAs[Map[String, Int]](0)
>     val arrayinput = input.getAs[Array[String]](0)
>     val result = mapbuf ++ arrayinput.map(goal => {
>       val cnt  = mapbuf.get(goal).getOrElse(0) + 1
>       goal -> cnt
>     })
>     buffer.update(0, result)
>   }
>
>   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>     val map1 = buffer1.getAs[Map[String, Int]](0)
>     val map2 = buffer2.getAs[Map[String, Int]](0)
>     val result = map1 ++ map2.map { case (k,v) =>
>       val cnt = map1.get(k).getOrElse(0) + 1
>       k -> cnt
>     }
>     buffer1.update(0, result)
>   }
>
>   override def evaluate(buffer: Row): Any = {
>     buffer.getAs[Map[String, Int]](0)
>   }
> }
>
>
>
>

Re: Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

Posted by Yong Zhang <ja...@hotmail.com>.
Change:

val arrayinput = input.getAs[Array[String]](0)

to:

val arrayinput = input.getAs[Seq[String]](0)


Yong


________________________________
From: shyla deshpande <de...@gmail.com>
Sent: Thursday, March 23, 2017 8:18 PM
To: user
Subject: Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

This is my input data. The UDAF needs to aggregate the goals for a team and return a map that  gives the count for every goal in the team.
I am getting the following error

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String;
at com.whil.common.GoalAggregator.update(GoalAggregator.scala:27)

+------+--------------+
|teamid|goals         |
+------+--------------+
|t1    |[Goal1, Goal2]|
|t1    |[Goal1, Goal3]|
|t2    |[Goal1, Goal2]|
|t3    |[Goal2, Goal3]|
+------+--------------+

root
 |-- teamid: string (nullable = true)
 |-- goals: array (nullable = true)
 |    |-- element: string (containsNull = true)

/////////////////////////Calling the UDAF//////////

object TestUDAF {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext
    val sqlContext = spark.sqlContext

    import sqlContext.implicits._

    val data = Seq(
      ("t1", Seq("Goal1", "Goal2")),
      ("t1", Seq("Goal1", "Goal3")),
      ("t2", Seq("Goal1", "Goal2")),
      ("t3", Seq("Goal2", "Goal3"))).toDF("teamid","goals")

    data.show(truncate = false)
    data.printSchema()

    import spark.implicits._

    val sumgoals = new GoalAggregator
    val result = data.groupBy("teamid").agg(sumgoals(col("goals")))

    result.show(truncate = false)

  }
}

///////////////UDAF/////////////////

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

class GoalAggregator extends UserDefinedAggregateFunction{

  override def inputSchema: org.apache.spark.sql.types.StructType =
  StructType(StructField("value", ArrayType(StringType)) :: Nil)

  override def bufferSchema: StructType = StructType(
      StructField("combined", MapType(StringType,IntegerType)) :: Nil
  )

  override def dataType: DataType = MapType(StringType,IntegerType)

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0, Map[String, Integer]())
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val mapbuf = buffer.getAs[Map[String, Int]](0)
    val arrayinput = input.getAs[Array[String]](0)
    val result = mapbuf ++ arrayinput.map(goal => {
      val cnt  = mapbuf.get(goal).getOrElse(0) + 1
      goal -> cnt
    })
    buffer.update(0, result)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val map1 = buffer1.getAs[Map[String, Int]](0)
    val map2 = buffer2.getAs[Map[String, Int]](0)
    val result = map1 ++ map2.map { case (k,v) =>
      val cnt = map1.get(k).getOrElse(0) + 1
      k -> cnt
    }
    buffer1.update(0, result)
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[Map[String, Int]](0)
  }
}