You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tristan Marechaux <tr...@walnut-algo.com> on 2017/08/14 09:43:04 UTC

Resampling a timeserie stuck on a GroupByKey

Hi all,

I wrote a Beam Pipeline written with the python SDK that resample a
timeseries containing data points everery minute to a 5-minutes timeserie.

My pipeline looks like:
input_data |
WindowInto(FixedWindows(size=timedelta(minutes=5).total_seconds())) |
CombineGlobaly(resample_function)

When I run it with the local or DataFlow runner with a small dataset, it
works and does what I want.

But when I try to run it on the DataFlow runner with a bigger dataset (1
700 000 datapoints timestamped over 15 years) it stay stuck for hours on
the GroupByKey step of CombineGlobaly.

My question is : Did I do something wrong with the design of my pipeline?

PS: Can someone invite me to the slack channel?
-- 

Tristan Marechaux

Data Scientist | *Walnut Algorithms*

Mobile : +33 627804399 <+33627804399>

Email: tristan.marechaux@walnut-algo.com

Web: www.walnutalgorithms.com

Re: Resampling a timeserie stuck on a GroupByKey

Posted by Lukasz Cwik <lc...@google.com>.
Do you have some job ids that you could share?

On Wed, Aug 16, 2017 at 1:18 PM, Tristan Marechaux <
tristan.marechaux@walnut-algo.com> wrote:

> Thanks for the invitation and for the answer.
>
> I tried the Count resample function and I still have the same issue, so I
> guess it doesn't come from my resample function, but here is the code in
> case :
>
> def resample_function(candles):
>
>     sorted_candles = sorted(filter(lambda x: x.date is not None, candles), key=lambda candle: candle.date)
>     if len(sorted_candles) > 0:
>         return Candle(
>             sorted_candles[-1].date,
>             sorted_candles[0].open,
>             max(candle.high for candle in candles),
>             min(candle.low for candle in candles),
>             sorted_candles[-1].close,
>             sum((candle.volume for candle in candles), .0)
>         )
>
>
> The fact is that the pipeline seems stucked on the GroupByKey inside the
> CombineGlobaly PTransform before the call of my resample_function (if the
> GCP web interface is accurate).
>
> I tried the with to have in my pipeline only native python type with the
> CountCombineFn and it's still stucked.
>
> Here is what I can see on my GCP console (this screenshot shows 36 minutes
> by I waited for 5 hours to be sure) :
> [image: Selection_070.png]
>
>
> On Wed, Aug 16, 2017 at 1:08 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> I have invited you to the slack channel.
>>
>> 2 million data points doesn't seem like it should be an issue.
>> Have you considered trying a simpler combiner like Count to see if the
>> bottleneck is with the combiner that you are supplying?
>> Also, could you share the code for what resample_function does?
>>
>> On Mon, Aug 14, 2017 at 2:43 AM, Tristan Marechaux <
>> tristan.marechaux@walnut-algo.com> wrote:
>>
>>> Hi all,
>>>
>>> I wrote a Beam Pipeline written with the python SDK that resample a
>>> timeseries containing data points everery minute to a 5-minutes timeserie.
>>>
>>> My pipeline looks like:
>>> input_data | WindowInto(FixedWindows(size=timedelta(minutes=5).total_seconds()))
>>> | CombineGlobaly(resample_function)
>>>
>>> When I run it with the local or DataFlow runner with a small dataset, it
>>> works and does what I want.
>>>
>>> But when I try to run it on the DataFlow runner with a bigger dataset (1
>>> 700 000 datapoints timestamped over 15 years) it stay stuck for hours on
>>> the GroupByKey step of CombineGlobaly.
>>>
>>> My question is : Did I do something wrong with the design of my pipeline?
>>>
>>> PS: Can someone invite me to the slack channel?
>>> --
>>>
>>> Tristan Marechaux
>>>
>>> Data Scientist | *Walnut Algorithms*
>>>
>>> Mobile : +33 627804399 <+33627804399>
>>>
>>> Email: tristan.marechaux@walnut-algo.com
>>>
>>> Web: www.walnutalgorithms.com
>>>
>>
>> --
>
> Tristan Marechaux
>
> Data Scientist | *Walnut Algorithms*
>
> Mobile : +33 627804399 <+33627804399>
>
> Email: tristan.marechaux@walnut-algo.com
>
> Web: www.walnutalgorithms.com
>

Re: Resampling a timeserie stuck on a GroupByKey

Posted by Tristan Marechaux <tr...@walnut-algo.com>.
Thanks for the invitation and for the answer.

I tried the Count resample function and I still have the same issue, so I
guess it doesn't come from my resample function, but here is the code in
case :

def resample_function(candles):

    sorted_candles = sorted(filter(lambda x: x.date is not None,
candles), key=lambda candle: candle.date)
    if len(sorted_candles) > 0:
        return Candle(
            sorted_candles[-1].date,
            sorted_candles[0].open,
            max(candle.high for candle in candles),
            min(candle.low for candle in candles),
            sorted_candles[-1].close,
            sum((candle.volume for candle in candles), .0)
        )


The fact is that the pipeline seems stucked on the GroupByKey inside the
CombineGlobaly PTransform before the call of my resample_function (if the
GCP web interface is accurate).

I tried the with to have in my pipeline only native python type with the
CountCombineFn and it's still stucked.

Here is what I can see on my GCP console (this screenshot shows 36 minutes
by I waited for 5 hours to be sure) :
[image: Selection_070.png]


On Wed, Aug 16, 2017 at 1:08 AM Lukasz Cwik <lc...@google.com> wrote:

> I have invited you to the slack channel.
>
> 2 million data points doesn't seem like it should be an issue.
> Have you considered trying a simpler combiner like Count to see if the
> bottleneck is with the combiner that you are supplying?
> Also, could you share the code for what resample_function does?
>
> On Mon, Aug 14, 2017 at 2:43 AM, Tristan Marechaux <
> tristan.marechaux@walnut-algo.com> wrote:
>
>> Hi all,
>>
>> I wrote a Beam Pipeline written with the python SDK that resample a
>> timeseries containing data points everery minute to a 5-minutes timeserie.
>>
>> My pipeline looks like:
>> input_data |
>> WindowInto(FixedWindows(size=timedelta(minutes=5).total_seconds())) |
>> CombineGlobaly(resample_function)
>>
>> When I run it with the local or DataFlow runner with a small dataset, it
>> works and does what I want.
>>
>> But when I try to run it on the DataFlow runner with a bigger dataset (1
>> 700 000 datapoints timestamped over 15 years) it stay stuck for hours on
>> the GroupByKey step of CombineGlobaly.
>>
>> My question is : Did I do something wrong with the design of my pipeline?
>>
>> PS: Can someone invite me to the slack channel?
>> --
>>
>> Tristan Marechaux
>>
>> Data Scientist | *Walnut Algorithms*
>>
>> Mobile : +33 627804399 <+33627804399>
>>
>> Email: tristan.marechaux@walnut-algo.com
>>
>> Web: www.walnutalgorithms.com
>>
>
> --

Tristan Marechaux

Data Scientist | *Walnut Algorithms*

Mobile : +33 627804399 <+33627804399>

Email: tristan.marechaux@walnut-algo.com

Web: www.walnutalgorithms.com

Re: Resampling a timeserie stuck on a GroupByKey

Posted by Lukasz Cwik <lc...@google.com>.
I have invited you to the slack channel.

2 million data points doesn't seem like it should be an issue.
Have you considered trying a simpler combiner like Count to see if the
bottleneck is with the combiner that you are supplying?
Also, could you share the code for what resample_function does?

On Mon, Aug 14, 2017 at 2:43 AM, Tristan Marechaux <
tristan.marechaux@walnut-algo.com> wrote:

> Hi all,
>
> I wrote a Beam Pipeline written with the python SDK that resample a
> timeseries containing data points everery minute to a 5-minutes timeserie.
>
> My pipeline looks like:
> input_data | WindowInto(FixedWindows(size=timedelta(minutes=5).total_seconds()))
> | CombineGlobaly(resample_function)
>
> When I run it with the local or DataFlow runner with a small dataset, it
> works and does what I want.
>
> But when I try to run it on the DataFlow runner with a bigger dataset (1
> 700 000 datapoints timestamped over 15 years) it stay stuck for hours on
> the GroupByKey step of CombineGlobaly.
>
> My question is : Did I do something wrong with the design of my pipeline?
>
> PS: Can someone invite me to the slack channel?
> --
>
> Tristan Marechaux
>
> Data Scientist | *Walnut Algorithms*
>
> Mobile : +33 627804399 <+33627804399>
>
> Email: tristan.marechaux@walnut-algo.com
>
> Web: www.walnutalgorithms.com
>