You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Deenar Toraskar <de...@gmail.com> on 2016/01/25 23:36:35 UTC
Generic Dataset Aggregator
Hi All
https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
I have been converting my UDAFs to Dataset (Dataset's are cool BTW)
Aggregators. I have an ArraySum aggregator that does an element wise sum or
arrays. I have got the simple version working, but the Generic version
fails with the following error, not sure what I am doing wrong.
scala> import sqlContext.implicits._
scala> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N]
= new GenericArraySumAggregator(f).toColumn
<console>:34: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing sqlContext.implicits._ Support for serializing
other types will be added in future releases.
def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I,
N] = new GenericArraySumAggregator(f).toColumn
^
object ArraySumAggregator extends Aggregator[Seq[Float], Seq[Float],
Seq[Float]] with Serializable {
def zero: Seq[Float] = Nil
// The initial value.
def reduce(currentSum: Seq[Float], currentRow: Seq[Float]) =
sumArray(currentSum, currentRow)
def merge(sum: Seq[Float], row: Seq[Float]) = sumArray(sum, row)
def finish(b: Seq[Float]) = b // Return the final result.
def sumArray(a: Seq[Float], b: Seq[Float]): Seq[Float] = {
(a, b) match {
case (Nil, Nil) => Nil
case (Nil, row) => row
case (sum, Nil) => sum
case (sum, row) => (a, b).zipped.map { case (a, b) => a + b }
}
}
}
class GenericArraySumAggregator[I, N : Numeric](f: I => N) extends
Aggregator[Seq[I], Seq[N], Seq[N]] with Serializable {
val numeric = implicitly[Numeric[N]]
override def zero: Seq[N] = Nil
override def reduce(b: Seq[N], a: Seq[I]): Seq[N] = sumArray(b,
a.map( x => f(x))) //numeric.plus(b, f(a))
override def merge(b1: Seq[N],b2: Seq[N]): Seq[N] = sumArray(b1, b2)
override def finish(reduction: Seq[N]): Seq[N] = reduction
def sumArray(a: Seq[N], b: Seq[N]): Seq[N] = {
(a, b) match {
case (Nil, Nil) => Nil
case (Nil, row) => row
case (sum, Nil) => sum
case (sum, row) => (a, b).zipped.map { case (a, b) => numeric.plus(a, b) }
}
}
}
Regards
Deenar
Re: Generic Dataset Aggregator
Posted by Arkadiusz Bicz <ar...@gmail.com>.
Hi Deenar,
You just need to encapsulate Array in Case Class ( you can not define
case class inside spark shell as it can not be inner class)
import com.hsbc.rsl.spark.aggregation.MinVectorAggFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.TypedColumn
case class ResultSmallA(tradeId: String, tradeVersion: String, values:
Array[Double])
class AggregateResults extends Aggregator[ResultSmallA, ResultSmallA,
ResultSmallA] with Serializable {
def zero: ResultSmallA = ResultSmallA("", "", Array[Double](0))
// The initial value.
def reduce(b: ResultSmallA, a: ResultSmallA) =
ResultSmallA(b.tradeId, b.tradeVersion, min.mergeArrays(a.values,
b.values))
// Add an element to the running total
def merge(b: ResultSmallA, a: ResultSmallA) =
ResultSmallA(b.tradeId, b.tradeVersion, (a.values,
b.values).zipped.map { case (a, b) => a+ b }
// Merge intermediate values.
def finish(b: ResultSmallA) = b
}
def sumRes : TypedColumn[ResultSmallA, ResultSmallA] = new
AggregateResults().toColumn
import sqlContext.implicits._
val dsResults = Seq(ResultSmallA("1", "1", Array[Double](1.0,2.0)),
ResultSmallA("1", "1", Array[Double](1.0,2.0)) ).toDS()
dsResults.groupBy(_.tradeId).agg(sumRes)
Best Regards,
Arkadiusz Bicz
https://uk.linkedin.com/in/arkadiuszbicz
On Mon, Jan 25, 2016 at 10:36 PM, Deenar Toraskar
<de...@gmail.com> wrote:
> Hi All
>
> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
>
> I have been converting my UDAFs to Dataset (Dataset's are cool BTW)
> Aggregators. I have an ArraySum aggregator that does an element wise sum or
> arrays. I have got the simple version working, but the Generic version fails
> with the following error, not sure what I am doing wrong.
>
> scala> import sqlContext.implicits._
>
> scala> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N]
> = new GenericArraySumAggregator(f).toColumn
>
> <console>:34: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing sqlContext.implicits._ Support for serializing other
> types will be added in future releases.
>
> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I,
> N] = new GenericArraySumAggregator(f).toColumn
>
>
> ^
>
> object ArraySumAggregator extends Aggregator[Seq[Float], Seq[Float],
> Seq[Float]] with Serializable {
> def zero: Seq[Float] = Nil
> // The initial value.
> def reduce(currentSum: Seq[Float], currentRow: Seq[Float]) =
> sumArray(currentSum, currentRow)
> def merge(sum: Seq[Float], row: Seq[Float]) = sumArray(sum, row)
> def finish(b: Seq[Float]) = b // Return the final result.
> def sumArray(a: Seq[Float], b: Seq[Float]): Seq[Float] = {
> (a, b) match {
> case (Nil, Nil) => Nil
> case (Nil, row) => row
> case (sum, Nil) => sum
> case (sum, row) => (a, b).zipped.map { case (a, b) => a + b }
> }
> }
> }
>
> class GenericArraySumAggregator[I, N : Numeric](f: I => N) extends
> Aggregator[Seq[I], Seq[N], Seq[N]] with Serializable {
> val numeric = implicitly[Numeric[N]]
> override def zero: Seq[N] = Nil
> override def reduce(b: Seq[N], a: Seq[I]): Seq[N] = sumArray(b, a.map( x
> => f(x))) //numeric.plus(b, f(a))
> override def merge(b1: Seq[N],b2: Seq[N]): Seq[N] = sumArray(b1, b2)
> override def finish(reduction: Seq[N]): Seq[N] = reduction
> def sumArray(a: Seq[N], b: Seq[N]): Seq[N] = {
> (a, b) match {
> case (Nil, Nil) => Nil
> case (Nil, row) => row
> case (sum, Nil) => sum
> case (sum, row) => (a, b).zipped.map { case (a, b) => numeric.plus(a,
> b) }
> }
> }
> }
>
> Regards
>
> Deenar
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org