You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Adrian Mocanu <am...@verticalscope.com> on 2014/03/26 16:34:31 UTC

closures & moving averages (state)

I'm passing a moving average function during the map phase like this:
  val average= new Sma(window=3)
stream.map(x=> average.addNumber(x))

where
  class Sma extends Serializable { .. }

I also tried to put creation of object average in an object like I saw in another post:
 object Average {
    val smaFn = new VSRTXYSimpleMovingAverage[(String, Long)](3)
 }
Every time  average.addNumber is called it is a new instance.
How can I preserve state of average object?

Thanks
-Adrian


RE: closures & moving averages (state)

Posted by Adrian Mocanu <am...@verticalscope.com>.
Tried with reduce and it's giving me pretty weird results that make no sense ie:  1  number for an entire RDD

  val smaStream= inputStream.reduce{ case(t1,t2) =>
    {
      val sma= average.addDataPoint(t1)
      sma
    }}


Tried with transform and that worked correctly, but unfortunately, it works 1 RDD at a time so the moving average is reset when the next consecutive RDD is read .. as if a new instance of the Average class is created for each RDD.

Is there a way to have 1 global variable of custom type (ie my case Average type) .. somewhat like accumulators, but not incrementable in parallel - it wouldn't make sense for a moving average.

The reason I want to apply a moving average function to a stream is so that  the tuples remain in Spark and benefit from its fault tolerant mechanisms.

My guess is that currently this is not possible, but I'll wait for one of the Spark creators to comment on this.

-A

From: Benjamin Black [mailto:b@b3k.us]
Sent: March-26-14 11:50 AM
To: user@spark.apache.org
Subject: Re: closures & moving averages (state)

Perhaps you want reduce rather than map?

On Wednesday, March 26, 2014, Adrian Mocanu <am...@verticalscope.com>> wrote:
I'm passing a moving average function during the map phase like this:
  val average= new Sma(window=3)
stream.map(x=> average.addNumber(x))

where
  class Sma extends Serializable { .. }

I also tried to put creation of object average in an object like I saw in another post:
 object Average {
    val smaFn = new VSRTXYSimpleMovingAverage[(String, Long)](3)
 }
Every time  average.addNumber is called it is a new instance.
How can I preserve state of average object?

Thanks
-Adrian


Re: closures & moving averages (state)

Posted by Benjamin Black <b...@b3k.us>.
Perhaps you want reduce rather than map?

On Wednesday, March 26, 2014, Adrian Mocanu <am...@verticalscope.com>
wrote:

>  I'm passing a moving average function during the map phase like this:
>
>   val average= new Sma(window=3)
>
> stream.map(x=> average.addNumber(x))
>
>
>
> where
>
>   class Sma extends Serializable { .. }
>
>
>
> I also tried to put creation of object average in an object like I saw in
> another post:
>
>  object Average {
>
>     val smaFn = new VSRTXYSimpleMovingAverage[(String, Long)](3)
>
>  }
>
>  Every time  average.addNumber is called it is a new instance.
>
> How can I preserve state of average object?
>
>
>
> Thanks
>
> -Adrian
>
>
>