You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@zeppelin.apache.org by "Vannson, Raphael" <Ra...@Thinkbiganalytics.com> on 2018/02/27 23:19:14 UTC

Cannot define UDAF in %spark interpreter

Hello,

I am having trouble defining a UDAF, using the same code in spark-shell in :paste mode works fine.

Environment:
 - Amazon EMR
 - Apache Zeppelin Version 0.7.3
 - Spark version 2.2.1
 - Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)

1) Is there a way to configure the zeppelin %spark interpreter to do the equivalent of spark-shell's :paste mode?
2) If not, is there a workaround to be able to define UDAFs in Zeppelin's %spark interpreter?

Thanks!
Raphael




***PARAGRAPH INPUT:***
%spark

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer

class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
    // Input schema
    override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)
    
    // Intermediate buffer schema
    override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
    
    //Output schema
    override def dataType: DataType = StringType
    
    // Deterministic UDAF
    override def deterministic: Boolean = true
    
    
    
    // How to initialize the intermediate processing buffer for each group:
    // We simply create a List[Double] which will hold the observations (y)
    // of each group
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = Array.emptyDoubleArray
    }
    
    // What to do with each new row within the group:
    // Here we append each new observation of the group 
    // in a List[Double]
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        // Put the observations collected into a List
        var values = new ListBuffer[Double]()
        values.appendAll(buffer.getAs[List[Double]](0))
        
        // Get the new value for the current row
        val newValue = input.getDouble(0)
        
        // Append the new value to the buffer and return it
        values.append(newValue)
        buffer.update(0, values)
    }
  
  
    // How to merge 2 buffers located on 2 separate executor hosts or JVMs:
    // Simply append one List at the end of another
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        var values = new ListBuffer[Double]()
        values ++= buffer1.getAs[List[Double]](0)
        values ++= buffer2.getAs[List[Double]](0)
        buffer1.update(0, values)
  }
  
  
  
  override def evaluate(buffer: Row): String = {
      val observations = buffer.getSeq[Double](0)
      observations.size.toString
  }
}



***PARAGRAPH OUTPUT:***
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer
<console>:12: error: not found: type UserDefinedAggregateFunction
       class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
                                                   ^
<console>:14: error: not found: type StructType
           override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)
                                     ^
<console>:14: error: not found: value StructType
           override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)
                                                  ^
<console>:14: error: not found: value StructField
           override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)
                                                             ^
<console>:14: error: not found: value DoubleType
           override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)
                                                                              ^
<console>:17: error: not found: type StructType
           override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
                                      ^
<console>:17: error: not found: value StructType
           override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
                                                   ^
<console>:17: error: not found: value StructField
           override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
<console>:17: error: not found: value ArrayType
           override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
                                                                                          ^
<console>:17: error: not found: value DoubleType
           override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
                                                                                                    ^
<console>:20: error: not found: type DataType
           override def dataType: DataType = StringType
                                  ^
<console>:20: error: not found: value StringType
           override def dataType: DataType = StringType
                                             ^
<console>:30: error: not found: type MutableAggregationBuffer
           override def initialize(buffer: MutableAggregationBuffer): Unit = {
                                           ^
<console>:37: error: not found: type MutableAggregationBuffer
           override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
                                       ^
<console>:37: error: not found: type Row
           override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
                                                                        ^
<console>:39: error: not found: type ListBuffer
               var values = new ListBuffer[Double]()
                                ^
<console>:53: error: not found: type MutableAggregationBuffer
           override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
                                       ^
<console>:53: error: not found: type Row
           override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
                                                                          ^
<console>:54: error: not found: type ListBuffer
               var values = new ListBuffer[Double]()
                                ^
<console>:62: error: not found: type Row
         override def evaluate(buffer: Row): String = {
                                       ^

 


Re: Cannot define UDAF in %spark interpreter

Posted by "Vannson, Raphael" <Ra...@Thinkbiganalytics.com>.
Hello Paul,

Many thanks for your quick answer. This did the trick!
Fantastic!

Best,
Raphael



***PARAGRAPH INPUT:***
val AggregatedChangepointAnalyzer = new UserDefinedAggregateFunction {
…
}

***PARAGRAPH OUTPUT:***
AggregatedChangepointAnalyzer: org.apache.spark.sql.expressions.UserDefinedAggregateFunction{def evaluate(buffer: org.apache.spark.sql.Row): String} = $$$$79b2515edf74bd80cfc9d8ac1ba563c6$$$$anon$1@3b65afbc



I was then able to use the UDAF easily:
***PARAGRAPH INPUT:***
val cpt_df = df.groupBy("foo", "bar ", "baz", "bok").agg(AggregatedChangepointAnalyzer(col("y")).as("cpt"))
cpt_df.show

cpt_df: org.apache.spark.sql.DataFrame = [foo: string, bar: string ... 3 more fields]
+--------+--------+--------+----------+---+
|foo     |bar     |baz     | bok      |cpt|
+--------+--------+--------+----------+---+
|some    | secret | thing  | here     | 40|
+--------+--------+--------+----------+---+




From: Paul Brenner <pb...@placeiq.com>
Date: Tuesday, February 27, 2018 at 3:31 PM
To: Raphael Vannson <Ra...@Thinkbiganalytics.com>, "users@zeppelin.apache.org" <us...@zeppelin.apache.org>
Subject: Cannot define UDAF in %spark interpreter

[https://share.polymail.io/v2/z/a/NWE5NWU5NTdmN2Y5/ROsxnbrMSYqGdOuaYkRq7vFSwJ97WreGD-Dfi3zj_k7RT9GXsy7LJYxWVOSOxXNnopoYW22sBBaRxUGSCFmhLwx727JO_WGuGh8CZ5M6sOuFnUq9DZv6uloiPnfuhKSpaFMgs_T8eBORw_R9_ouLQgOanPF5xyctX24AtKNGHT8=.png]
Unfortunately, I don’t know why code that is working for you in spark shell isn’t working in Zeppelin. But if you are looking for a quick fix perhaps this could help?

I’ve had luck defining my UDAFs in zeppelin like:
val myUDAF = new UserDefinedAggregateFunction {}



So for example the following code compiles fine for me in zeppelin:

val FractionOfDayCoverage = new UserDefinedAggregateFunction {


  // Input Data Type Schema
  def inputSchema: StructType = StructType(Array(StructField("seconds", LongType)))

  // Intermediate Schema
  def bufferSchema = StructType(Array(
    StructField("times", ArrayType(LongType))))

  // Returned Data Type .
  def dataType = DoubleType

  // Self-explaining
  def deterministic = true

  // This function is called whenever key changes
  def initialize(buffer: MutableAggregationBuffer) = {
    var timeArray = new ListBuffer[Long]()
    buffer.update(0,timeArray)
  }

  // Iterate over each entry of a group
  def update(buffer: MutableAggregationBuffer, input: Row) = {
    if (!(input.isNullAt(0))){
    var timeArray = new ListBuffer[Long]()
    timeArray ++= buffer.getAs[List[Long]](0)
    timeArray +=  input.getLong(0)
    buffer.update(0,timeArray)
  }}

  // Merge two partial aggregates
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    var timeArray = new ListBuffer[Long]()
    timeArray ++= buffer1.getAs[List[Long]](0)
    timeArray ++= buffer2.getAs[List[Long]](0)
    buffer1.update(0,timeArray)
  }
  // Called after all the entries are exhausted.
    def evaluate(buffer: Row) = {
        var timeArray = new ListBuffer[Long]()
        timeArray ++= buffer.getAs[List[Long]](0).filter(x => x != null)
        val times = timeArray.toArray
        scala.util.Sorting.quickSort(times)
        var intStart = times(0) - 30*60
        var intEnd = times(0) + 30*60
        var seen = 0L
        for (t <- times) {
            if (t > intEnd + 30*60) {
                seen += (intEnd - intStart)
                intStart = t - 30*60
                intEnd = t + 30*60
            } else {
                intEnd = t + 30*60
            }
        }
        seen += intEnd - intStart
        math.min(seen.toDouble/(24*60*60), 1)
  }
}


I’m using zeppelin 0.7.2 and spark 2.0.1 (I think) so perhaps there is a version issue somewhere?

[https://ci3.googleusercontent.com/proxy/tFn1I-GEOnccUtv8DHHEc49-6g3x3CbuQKzbfl2Z1BObEy0Qz6QebJimpP96TK3Za5MXwXTuwBZaobKp22nYAG3NdxAC0Q=s0-d-e1-ft#https://marketing.placeiq.net/images/placeiq.png]<http://www.placeiq.com/>

Paul Brenner

[https://ci4.googleusercontent.com/proxy/490PXYv9O6OiIp_DL4vuabJqVn53fMon5xNYZdftCVea9ySR2LcFDHe6Cdntb2G68uDAuA6FgLny8wKWLFWpsrPAt_FtLaE=s0-d-e1-ft#https://marketing.placeiq.net/images/twitter1.png]<https://twitter.com/placeiq>

[https://ci3.googleusercontent.com/proxy/fztHf1lRKLQYcAxebqfp2PYXCwVap3GobHVIbyp0j3NcuJOY16bUAZBibVOFf-fd1GsiuhrOfYy6dSwhlCwWU8ZUlw9OX5I=s0-d-e1-ft#https://marketing.placeiq.net/images/facebook.png]<https://www.facebook.com/PlaceIQ>

[https://ci5.googleusercontent.com/proxy/H26ThD7R6DOqxoLTgzi6k5SMrHoF2Tj44xI_7XlD9KfOIiGwe1WIMc5iQBxUBA9EuIyJMdaRXrhZTOrnkrn8O9Rf1FP9UQU=s0-d-e1-ft#https://marketing.placeiq.net/images/linkedin.png]<https://www.linkedin.com/company/placeiq>

DATA SCIENTIST

(217) 390-3033



[PlaceIQ:CES 2018]


On Tue, Feb 27, 2018 at 6:19 PM Vannson Raphael <Vannson Raphael <mailto:Vannson%20Raphael%20%3cRaphael.Vannson@thinkbiganalytics.com%3e> > wrote:


Hello,

I am having trouble defining a UDAF, using the same code in spark-shell in :paste mode works fine.

Environment:
- Amazon EMR
- Apache Zeppelin Version 0.7.3
- Spark version 2.2.1
- Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)

1) Is there a way to configure the zeppelin %spark interpreter to do the equivalent of spark-shell's :paste mode?
2) If not, is there a workaround to be able to define UDAFs in Zeppelin's %spark interpreter?

Thanks!
Raphael




***PARAGRAPH INPUT:***
%spark

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer

class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
// Input schema
override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)

// Intermediate buffer schema
override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)

//Output schema
override def dataType: DataType = StringType

// Deterministic UDAF
override def deterministic: Boolean = true



// How to initialize the intermediate processing buffer for each group:
// We simply create a List[Double] which will hold the observations (y)
// of each group
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.emptyDoubleArray
}

// What to do with each new row within the group:
// Here we append each new observation of the group
// in a List[Double]
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// Put the observations collected into a List
var values = new ListBuffer[Double]()
values.appendAll(buffer.getAs[List[Double]](0))

// Get the new value for the current row
val newValue = input.getDouble(0)

// Append the new value to the buffer and return it
values.append(newValue)
buffer.update(0, values)
}


// How to merge 2 buffers located on 2 separate executor hosts or JVMs:
// Simply append one List at the end of another
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
var values = new ListBuffer[Double]()
values ++= buffer1.getAs[List[Double]](0)
values ++= buffer2.getAs[List[Double]](0)
buffer1.update(0, values)
}



override def evaluate(buffer: Row): String = {
val observations = buffer.getSeq[Double](0)
observations.size.toString
}
}



***PARAGRAPH OUTPUT:***
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.ListBuffer
:12: error: not found: type UserDefinedAggregateFunction
class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
^
:14: error: not found: type StructType
override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)
^
:14: error: not found: value StructType
override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)
^
:14: error: not found: value StructField
override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)
^
:14: error: not found: value DoubleType
override def inputSchema: StructType = StructType(StructField("y", DoubleType) :: Nil)
^
:17: error: not found: type StructType
override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
^
:17: error: not found: value StructType
override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
^
:17: error: not found: value StructField
override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
:17: error: not found: value ArrayType
override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
^
:17: error: not found: value DoubleType
override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
^
:20: error: not found: type DataType
override def dataType: DataType = StringType
^
:20: error: not found: value StringType
override def dataType: DataType = StringType
^
:30: error: not found: type MutableAggregationBuffer
override def initialize(buffer: MutableAggregationBuffer): Unit = {
^
:37: error: not found: type MutableAggregationBuffer
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
^
:37: error: not found: type Row
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
^
:39: error: not found: type ListBuffer
var values = new ListBuffer[Double]()
^
:53: error: not found: type MutableAggregationBuffer
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
^
:53: error: not found: type Row
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
^
:54: error: not found: type ListBuffer
var values = new ListBuffer[Double]()
^
:62: error: not found: type Row
override def evaluate(buffer: Row): String = {
^






Cannot define UDAF in %spark interpreter

Posted by Paul Brenner <pb...@placeiq.com>.
Unfortunately, I don’t know why code that is working for you in spark shell isn’t working in Zeppelin. But if you are looking for a quick fix perhaps this could help?

I’ve had luck defining my UDAFs in zeppelin like:
val myUDAF = new UserDefinedAggregateFunction {}

So for example the following code compiles fine for me in zeppelin:

val FractionOfDayCoverage = new UserDefinedAggregateFunction {

  
  // Input Data Type Schema
  def inputSchema: StructType = StructType(Array(StructField("seconds", LongType)))

  // Intermediate Schema
  def bufferSchema = StructType(Array(
    StructField("times", ArrayType(LongType))))

  // Returned Data Type .
  def dataType = DoubleType
  
  // Self-explaining
  def deterministic = true
  
  // This function is called whenever key changes
  def initialize(buffer: MutableAggregationBuffer) = {
    var timeArray = new ListBuffer[Long]()
    buffer.update(0,timeArray)
  }
  
  // Iterate over each entry of a group
  def update(buffer: MutableAggregationBuffer, input: Row) = {
    if (!(input.isNullAt(0))){
    var timeArray = new ListBuffer[Long]()
    timeArray ++= buffer.getAs[List[Long]](0)
    timeArray +=  input.getLong(0)
    buffer.update(0,timeArray)
  }}
  
  // Merge two partial aggregates
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    var timeArray = new ListBuffer[Long]()
    timeArray ++= buffer1.getAs[List[Long]](0)
    timeArray ++= buffer2.getAs[List[Long]](0)
    buffer1.update(0,timeArray)
  }
  // Called after all the entries are exhausted.
    def evaluate(buffer: Row) = {
        var timeArray = new ListBuffer[Long]()
        timeArray ++= buffer.getAs[List[Long]](0).filter(x => x != null)
        val times = timeArray.toArray
        scala.util.Sorting.quickSort(times)
        var intStart = times(0) - 30*60
        var intEnd = times(0) + 30*60
        var seen = 0L
        for (t <- times) {
            if (t > intEnd + 30*60) {
                seen += (intEnd - intStart)
                intStart = t - 30*60
                intEnd = t + 30*60
            } else {
                intEnd = t + 30*60
            }
        }
        seen += intEnd - intStart
        math.min(seen.toDouble/(24*60*60), 1)
  }
}

I’m using zeppelin 0.7.2 and spark 2.0.1 (I think) so perhaps there is a version issue somewhere?

( http://www.placeiq.com/ ) ( http://www.placeiq.com/ ) ( http://www.placeiq.com/ ) *Paul Brenner* ( https://twitter.com/placeiq ) ( https://twitter.com/placeiq ) ( https://twitter.com/placeiq ) ( https://www.facebook.com/PlaceIQ ) ( https://www.facebook.com/PlaceIQ ) ( https://www.linkedin.com/company/placeiq ) ( https://www.linkedin.com/company/placeiq ) DATA SCIENTIST (217) 390-3033 

( http://www.placeiq.com/2015/05/26/placeiq-named-winner-of-prestigious-2015-oracle-data-cloud-activate-award/ ) ( http://placeiq.com/2015/12/18/accuracy-vs-precision-in-location-data-mma-webinar/ ) ( http://placeiq.com/2015/12/18/accuracy-vs-precision-in-location-data-mma-webinar/ ) ( http://placeiq.com/2015/12/18/accuracy-vs-precision-in-location-data-mma-webinar/ ) ( http://placeiq.com/2015/12/18/accuracy-vs-precision-in-location-data-mma-webinar/ ) ( http://placeiq.com/2016/03/08/measuring-addressable-tv-campaigns-is-now-possible/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://placeiq.com/2016/04/13/placeiq-joins-the-network-advertising-initiative-nai-as-100th-member/ ) ( http://pages.placeiq.com/Location-Data-Accuracy-Whitepaper-Download.html?utm_source=Signature&utm_medium=Email&utm_campaign=AccuracyWP ) ( http://placeiq.com/2016/08/03/placeiq-bolsters-location-intelligence-platform-with-mastercard-insights/ ) ( http://placeiq.com/2016/10/26/the-making-of-a-location-data-industry-milestone/ ) ( http://placeiq.com/2016/12/07/placeiq-introduces-landmark-a-groundbreaking-offering-that-delivers-access-to-the-highest-quality-location-data-for-insights-that-fuel-limitless-business-decisions/ ) ( http://placeiq.com/2016/12/07/placeiq-introduces-landmark-a-groundbreaking-offering-that-delivers-access-to-the-highest-quality-location-data-for-insights-that-fuel-limitless-business-decisions/ ) ( https://www.placeiq.com/2017/05/placeiqs-landmark-powers-location-based-insight-innovation-for-ansible-gstv-havas-media-the-media-kitchen-and-more/ ) ( http://pages.placeiq.com/CES2018_MeetingRequest.html ) PlaceIQ:CES 2018 ( http://pages.placeiq.com/2017-Integrated-Marketing-Whitepaper_LP_Download.html )

On Tue, Feb 27, 2018 at 6:19 PM Vannson Raphael < Vannson Raphael ( Vannson Raphael <Ra...@thinkbiganalytics.com> ) > wrote:

> 
> 
> 
> Hello,
> 
> I am having trouble defining a UDAF, using the same code in spark-shell in
> :paste mode works fine.
> 
> Environment:
> - Amazon EMR
> - Apache Zeppelin Version 0.7.3
> - Spark version 2.2.1
> - Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)
> 
> 1) Is there a way to configure the zeppelin %spark interpreter to do the
> equivalent of spark-shell's :paste mode?
> 2) If not, is there a workaround to be able to define UDAFs in Zeppelin's
> %spark interpreter?
> 
> Thanks!
> Raphael
> 
> 
> 
> 
> ***PARAGRAPH INPUT:***
> %spark
> 
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
> UserDefinedAggregateFunction}
> import org.apache.spark.sql.Row
> import scala.collection.mutable.WrappedArray
> import scala.collection.mutable.ListBuffer
> 
> class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
> 
> // Input schema
> override def inputSchema: StructType = StructType(StructField("y",
> DoubleType) :: Nil)
> 
> // Intermediate buffer schema
> override def bufferSchema: StructType =
> StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
> 
> //Output schema
> override def dataType: DataType = StringType
> 
> // Deterministic UDAF
> override def deterministic: Boolean = true
> 
> 
> 
> // How to initialize the intermediate processing buffer for each group:
> // We simply create a List[Double] which will hold the observations (y)
> // of each group
> override def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = Array.emptyDoubleArray
> }
> 
> // What to do with each new row within the group:
> // Here we append each new observation of the group
> // in a List[Double]
> override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
> {
> // Put the observations collected into a List
> var values = new ListBuffer[Double]()
> values.appendAll(buffer.getAs[List[Double]](0))
> 
> // Get the new value for the current row
> val newValue = input.getDouble(0)
> 
> // Append the new value to the buffer and return it
> values.append(newValue)
> buffer.update(0, values)
> }
> 
> 
> // How to merge 2 buffers located on 2 separate executor hosts or JVMs:
> // Simply append one List at the end of another
> override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
> = {
> var values = new ListBuffer[Double]()
> values ++= buffer1.getAs[List[Double]](0)
> values ++= buffer2.getAs[List[Double]](0)
> buffer1.update(0, values)
> }
> 
> 
> 
> override def evaluate(buffer: Row): String = {
> val observations = buffer.getSeq[Double](0)
> observations.size.toString
> }
> }
> 
> 
> 
> ***PARAGRAPH OUTPUT:***
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
> UserDefinedAggregateFunction}
> import org.apache.spark.sql.Row
> import scala.collection.mutable.WrappedArray
> import scala.collection.mutable.ListBuffer
> :12: error: not found: type UserDefinedAggregateFunction
> class AggregatedChangepointAnalyzer extends UserDefinedAggregateFunction {
> 
> ^
> :14: error: not found: type StructType
> override def inputSchema: StructType = StructType(StructField("y",
> DoubleType) :: Nil)
> ^
> :14: error: not found: value StructType
> override def inputSchema: StructType = StructType(StructField("y",
> DoubleType) :: Nil)
> ^
> :14: error: not found: value StructField
> override def inputSchema: StructType = StructType(StructField("y",
> DoubleType) :: Nil)
> ^
> :14: error: not found: value DoubleType
> override def inputSchema: StructType = StructType(StructField("y",
> DoubleType) :: Nil)
> ^
> :17: error: not found: type StructType
> override def bufferSchema: StructType =
> StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
> ^
> :17: error: not found: value StructType
> override def bufferSchema: StructType =
> StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
> ^
> :17: error: not found: value StructField
> override def bufferSchema: StructType =
> StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
> :17: error: not found: value ArrayType
> override def bufferSchema: StructType =
> StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
> ^
> :17: error: not found: value DoubleType
> override def bufferSchema: StructType =
> StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
> ^
> :20: error: not found: type DataType
> override def dataType: DataType = StringType
> ^
> :20: error: not found: value StringType
> override def dataType: DataType = StringType
> ^
> :30: error: not found: type MutableAggregationBuffer
> override def initialize(buffer: MutableAggregationBuffer): Unit = {
> ^
> :37: error: not found: type MutableAggregationBuffer
> override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
> {
> ^
> :37: error: not found: type Row
> override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
> {
> ^
> :39: error: not found: type ListBuffer
> var values = new ListBuffer[Double]()
> ^
> :53: error: not found: type MutableAggregationBuffer
> override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
> = {
> ^
> :53: error: not found: type Row
> override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
> = {
> ^
> :54: error: not found: type ListBuffer
> var values = new ListBuffer[Double]()
> ^
> :62: error: not found: type Row
> override def evaluate(buffer: Row): String = {
> ^
> 
> 
> 
> 
> 
>