You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by tog <gu...@gmail.com> on 2015/07/02 18:37:27 UTC

sliding

Hi

Sorry for this scala/spark newbie question. I am creating RDD which
represent large time series this way:
val data = sc.textFile("somefile.csv")

case class Event(
    time:       Double,
    x:          Double,
    vztot:      Double
)

val events = data.filter(s => !s.startsWith("GMT")).map{s =>
    val r = s.split(";")
...
    Event(time, x, vztot )
}

I would like to process those RDD in order to reduce them by some
filtering. For this I noticed that sliding could help but I was not able to
use it so far. Here is what I did:

import org.apache.spark.mllib.rdd.RDDFunctions._

val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =>
Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))

Thanks for your help


-- 
PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net

Re: sliding

Posted by tog <gu...@gmail.com>.
Understood. Thanks for your great help

Cheers
Guillaume

On 2 July 2015 at 23:23, Feynman Liang <fl...@databricks.com> wrote:

> Consider an example dataset [a, b, c, d, e, f]
>
> After sliding(3), you get [(a,b,c), (b,c,d), (c, d, e), (d, e, f)]
>
> After zipWithIndex: [((a,b,c), 0), ((b,c,d), 1), ((c, d, e), 2), ((d, e,
> f), 3)]
>
> After filter: [((a,b,c), 0), ((d, e, f), 3)], which is what I'm assuming
> you want (non-overlapping buckets)? You can then do something like
> .map(func(_._1)) to apply func (e.g. min, max, mean) to the 3-tuples.
>
> On Thu, Jul 2, 2015 at 3:20 PM, tog <gu...@gmail.com> wrote:
>
>> Well it did reduce the length of my serie of events. I will have to dig
>> what it did actually ;-)
>>
>> I would assume that it took one out of 3 value, is that correct ?
>> Would it be possible to control a bit more how the value assigned to the
>> bucket is computed for example take the first element, the min, the max,
>> mean ... any other function.
>>
>> Thanks for putting me on the right track
>>
>> On 2 July 2015 at 22:56, Feynman Liang <fl...@databricks.com> wrote:
>>
>>> How about:
>>>
>>> events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0)
>>>
>>> That would group the RDD into adjacent buckets of size 3.
>>>
>>> On Thu, Jul 2, 2015 at 2:33 PM, tog <gu...@gmail.com> wrote:
>>>
>>>> Was complaining about the Seq ...
>>>>
>>>> Moved it to
>>>> val eventsfiltered = events.sliding(3).map(s  => Event(s(0).time,
>>>> (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0))
>>>>
>>>> and that is working.
>>>>
>>>> Anyway this is not what I wanted to do, my goal was more to implement
>>>> bucketing to shorten the time serie.
>>>>
>>>>
>>>> On 2 July 2015 at 18:25, Feynman Liang <fl...@databricks.com> wrote:
>>>>
>>>>> What's the error you are getting?
>>>>>
>>>>> On Thu, Jul 2, 2015 at 9:37 AM, tog <gu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> Sorry for this scala/spark newbie question. I am creating RDD which
>>>>>> represent large time series this way:
>>>>>> val data = sc.textFile("somefile.csv")
>>>>>>
>>>>>> case class Event(
>>>>>>     time:       Double,
>>>>>>     x:          Double,
>>>>>>     vztot:      Double
>>>>>> )
>>>>>>
>>>>>> val events = data.filter(s => !s.startsWith("GMT")).map{s =>
>>>>>>     val r = s.split(";")
>>>>>> ...
>>>>>>     Event(time, x, vztot )
>>>>>> }
>>>>>>
>>>>>> I would like to process those RDD in order to reduce them by some
>>>>>> filtering. For this I noticed that sliding could help but I was not able to
>>>>>> use it so far. Here is what I did:
>>>>>>
>>>>>> import org.apache.spark.mllib.rdd.RDDFunctions._
>>>>>>
>>>>>> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =>
>>>>>> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))
>>>>>>
>>>>>> Thanks for your help
>>>>>>
>>>>>>
>>>>>> --
>>>>>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>>>
>>>
>>>
>>
>>
>> --
>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>
>
>


-- 
PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net

Re: sliding

Posted by Feynman Liang <fl...@databricks.com>.
Consider an example dataset [a, b, c, d, e, f]

After sliding(3), you get [(a,b,c), (b,c,d), (c, d, e), (d, e, f)]

After zipWithIndex: [((a,b,c), 0), ((b,c,d), 1), ((c, d, e), 2), ((d, e,
f), 3)]

After filter: [((a,b,c), 0), ((d, e, f), 3)], which is what I'm assuming
you want (non-overlapping buckets)? You can then do something like
.map(func(_._1)) to apply func (e.g. min, max, mean) to the 3-tuples.

On Thu, Jul 2, 2015 at 3:20 PM, tog <gu...@gmail.com> wrote:

> Well it did reduce the length of my serie of events. I will have to dig
> what it did actually ;-)
>
> I would assume that it took one out of 3 value, is that correct ?
> Would it be possible to control a bit more how the value assigned to the
> bucket is computed for example take the first element, the min, the max,
> mean ... any other function.
>
> Thanks for putting me on the right track
>
> On 2 July 2015 at 22:56, Feynman Liang <fl...@databricks.com> wrote:
>
>> How about:
>>
>> events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0)
>>
>> That would group the RDD into adjacent buckets of size 3.
>>
>> On Thu, Jul 2, 2015 at 2:33 PM, tog <gu...@gmail.com> wrote:
>>
>>> Was complaining about the Seq ...
>>>
>>> Moved it to
>>> val eventsfiltered = events.sliding(3).map(s  => Event(s(0).time,
>>> (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0))
>>>
>>> and that is working.
>>>
>>> Anyway this is not what I wanted to do, my goal was more to implement
>>> bucketing to shorten the time serie.
>>>
>>>
>>> On 2 July 2015 at 18:25, Feynman Liang <fl...@databricks.com> wrote:
>>>
>>>> What's the error you are getting?
>>>>
>>>> On Thu, Jul 2, 2015 at 9:37 AM, tog <gu...@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> Sorry for this scala/spark newbie question. I am creating RDD which
>>>>> represent large time series this way:
>>>>> val data = sc.textFile("somefile.csv")
>>>>>
>>>>> case class Event(
>>>>>     time:       Double,
>>>>>     x:          Double,
>>>>>     vztot:      Double
>>>>> )
>>>>>
>>>>> val events = data.filter(s => !s.startsWith("GMT")).map{s =>
>>>>>     val r = s.split(";")
>>>>> ...
>>>>>     Event(time, x, vztot )
>>>>> }
>>>>>
>>>>> I would like to process those RDD in order to reduce them by some
>>>>> filtering. For this I noticed that sliding could help but I was not able to
>>>>> use it so far. Here is what I did:
>>>>>
>>>>> import org.apache.spark.mllib.rdd.RDDFunctions._
>>>>>
>>>>> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =>
>>>>> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))
>>>>>
>>>>> Thanks for your help
>>>>>
>>>>>
>>>>> --
>>>>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>>
>>
>>
>
>
> --
> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>

Re: sliding

Posted by tog <gu...@gmail.com>.
Well it did reduce the length of my serie of events. I will have to dig
what it did actually ;-)

I would assume that it took one out of 3 value, is that correct ?
Would it be possible to control a bit more how the value assigned to the
bucket is computed for example take the first element, the min, the max,
mean ... any other function.

Thanks for putting me on the right track

On 2 July 2015 at 22:56, Feynman Liang <fl...@databricks.com> wrote:

> How about:
>
> events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0)
>
> That would group the RDD into adjacent buckets of size 3.
>
> On Thu, Jul 2, 2015 at 2:33 PM, tog <gu...@gmail.com> wrote:
>
>> Was complaining about the Seq ...
>>
>> Moved it to
>> val eventsfiltered = events.sliding(3).map(s  => Event(s(0).time,
>> (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0))
>>
>> and that is working.
>>
>> Anyway this is not what I wanted to do, my goal was more to implement
>> bucketing to shorten the time serie.
>>
>>
>> On 2 July 2015 at 18:25, Feynman Liang <fl...@databricks.com> wrote:
>>
>>> What's the error you are getting?
>>>
>>> On Thu, Jul 2, 2015 at 9:37 AM, tog <gu...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> Sorry for this scala/spark newbie question. I am creating RDD which
>>>> represent large time series this way:
>>>> val data = sc.textFile("somefile.csv")
>>>>
>>>> case class Event(
>>>>     time:       Double,
>>>>     x:          Double,
>>>>     vztot:      Double
>>>> )
>>>>
>>>> val events = data.filter(s => !s.startsWith("GMT")).map{s =>
>>>>     val r = s.split(";")
>>>> ...
>>>>     Event(time, x, vztot )
>>>> }
>>>>
>>>> I would like to process those RDD in order to reduce them by some
>>>> filtering. For this I noticed that sliding could help but I was not able to
>>>> use it so far. Here is what I did:
>>>>
>>>> import org.apache.spark.mllib.rdd.RDDFunctions._
>>>>
>>>> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =>
>>>> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))
>>>>
>>>> Thanks for your help
>>>>
>>>>
>>>> --
>>>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>>>
>>>
>>>
>>
>>
>> --
>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>
>
>


-- 
PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net

Re: sliding

Posted by Feynman Liang <fl...@databricks.com>.
How about:

events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0)

That would group the RDD into adjacent buckets of size 3.

On Thu, Jul 2, 2015 at 2:33 PM, tog <gu...@gmail.com> wrote:

> Was complaining about the Seq ...
>
> Moved it to
> val eventsfiltered = events.sliding(3).map(s  => Event(s(0).time,
> (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0))
>
> and that is working.
>
> Anyway this is not what I wanted to do, my goal was more to implement
> bucketing to shorten the time serie.
>
>
> On 2 July 2015 at 18:25, Feynman Liang <fl...@databricks.com> wrote:
>
>> What's the error you are getting?
>>
>> On Thu, Jul 2, 2015 at 9:37 AM, tog <gu...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> Sorry for this scala/spark newbie question. I am creating RDD which
>>> represent large time series this way:
>>> val data = sc.textFile("somefile.csv")
>>>
>>> case class Event(
>>>     time:       Double,
>>>     x:          Double,
>>>     vztot:      Double
>>> )
>>>
>>> val events = data.filter(s => !s.startsWith("GMT")).map{s =>
>>>     val r = s.split(";")
>>> ...
>>>     Event(time, x, vztot )
>>> }
>>>
>>> I would like to process those RDD in order to reduce them by some
>>> filtering. For this I noticed that sliding could help but I was not able to
>>> use it so far. Here is what I did:
>>>
>>> import org.apache.spark.mllib.rdd.RDDFunctions._
>>>
>>> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =>
>>> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))
>>>
>>> Thanks for your help
>>>
>>>
>>> --
>>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>>
>>
>>
>
>
> --
> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>

Re: sliding

Posted by tog <gu...@gmail.com>.
Was complaining about the Seq ...

Moved it to
val eventsfiltered = events.sliding(3).map(s  => Event(s(0).time,
(s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0))

and that is working.

Anyway this is not what I wanted to do, my goal was more to implement
bucketing to shorten the time serie.


On 2 July 2015 at 18:25, Feynman Liang <fl...@databricks.com> wrote:

> What's the error you are getting?
>
> On Thu, Jul 2, 2015 at 9:37 AM, tog <gu...@gmail.com> wrote:
>
>> Hi
>>
>> Sorry for this scala/spark newbie question. I am creating RDD which
>> represent large time series this way:
>> val data = sc.textFile("somefile.csv")
>>
>> case class Event(
>>     time:       Double,
>>     x:          Double,
>>     vztot:      Double
>> )
>>
>> val events = data.filter(s => !s.startsWith("GMT")).map{s =>
>>     val r = s.split(";")
>> ...
>>     Event(time, x, vztot )
>> }
>>
>> I would like to process those RDD in order to reduce them by some
>> filtering. For this I noticed that sliding could help but I was not able to
>> use it so far. Here is what I did:
>>
>> import org.apache.spark.mllib.rdd.RDDFunctions._
>>
>> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =>
>> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))
>>
>> Thanks for your help
>>
>>
>> --
>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>
>
>


-- 
PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net

Re: sliding

Posted by Feynman Liang <fl...@databricks.com>.
What's the error you are getting?

On Thu, Jul 2, 2015 at 9:37 AM, tog <gu...@gmail.com> wrote:

> Hi
>
> Sorry for this scala/spark newbie question. I am creating RDD which
> represent large time series this way:
> val data = sc.textFile("somefile.csv")
>
> case class Event(
>     time:       Double,
>     x:          Double,
>     vztot:      Double
> )
>
> val events = data.filter(s => !s.startsWith("GMT")).map{s =>
>     val r = s.split(";")
> ...
>     Event(time, x, vztot )
> }
>
> I would like to process those RDD in order to reduce them by some
> filtering. For this I noticed that sliding could help but I was not able to
> use it so far. Here is what I did:
>
> import org.apache.spark.mllib.rdd.RDDFunctions._
>
> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =>
> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))
>
> Thanks for your help
>
>
> --
> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>